1use libc::c_int;
2
3use crate::FromEnvErrorInner;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::mem;
7use std::mem::MaybeUninit;
8use std::os::unix::prelude::*;
9use std::path::Path;
10use std::process::Command;
11use std::sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14};
15use std::thread::{self, Builder, JoinHandle};
16use std::time::Duration;
17
18#[derive(Debug)]
19enum ClientCreationArg {
24 Fds { read: c_int, write: c_int },
25 Fifo(Box<Path>),
26}
27
28#[derive(Debug)]
29pub struct Client {
30 read: File,
31 write: File,
32 creation_arg: ClientCreationArg,
33 is_non_blocking: Option<AtomicBool>,
40}
41
42#[derive(Debug)]
43pub struct Acquired {
44 byte: u8,
45}
46
47impl Client {
48 pub fn new(mut limit: usize) -> io::Result<Client> {
49 let client = unsafe { Client::mk()? };
50
51 const BUFFER: [u8; 128] = [b'|'; 128];
54
55 let mut write = &client.write;
56
57 set_nonblocking(write.as_raw_fd(), true)?;
58
59 while limit > 0 {
60 let n = limit.min(BUFFER.len());
61
62 write.write_all(&BUFFER[..n])?;
63 limit -= n;
64 }
65
66 set_nonblocking(write.as_raw_fd(), false)?;
67
68 Ok(client)
69 }
70
71 unsafe fn mk() -> io::Result<Client> {
72 let mut pipes = [0; 2];
73
74 #[cfg(target_os = "linux")]
76 if libc::pipe2(pipes.as_mut_ptr(), libc::O_CLOEXEC) == -1 {
77 return Err(io::Error::last_os_error());
78 }
79
80 #[cfg(not(target_os = "linux"))]
81 {
82 cvt(libc::pipe(pipes.as_mut_ptr()))?;
83 drop(set_cloexec(pipes[0], true));
84 drop(set_cloexec(pipes[1], true));
85 }
86
87 Ok(Client::from_fds(pipes[0], pipes[1]))
88 }
89
90 pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
91 if let Some(client) = Self::from_fifo(s)? {
92 return Ok(client);
93 }
94 if let Some(client) = Self::from_pipe(s, check_pipe)? {
95 return Ok(client);
96 }
97 Err(FromEnvErrorInner::CannotParse(format!(
98 "expected `fifo:PATH` or `R,W`, found `{s}`"
99 )))
100 }
101
102 fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
104 let mut parts = s.splitn(2, ':');
105 if parts.next().unwrap() != "fifo" {
106 return Ok(None);
107 }
108 let path_str = parts.next().ok_or_else(|| {
109 FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
110 })?;
111 let path = Path::new(path_str);
112
113 let open_file = || {
114 OpenOptions::new()
119 .read(true)
120 .write(true)
121 .open(path)
122 .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
123 };
124
125 Ok(Some(Client {
126 read: open_file()?,
127 write: open_file()?,
128 creation_arg: ClientCreationArg::Fifo(path.into()),
129 is_non_blocking: Some(AtomicBool::new(false)),
130 }))
131 }
132
133 unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
135 let mut parts = s.splitn(2, ',');
136 let read = parts.next().unwrap();
137 let write = match parts.next() {
138 Some(w) => w,
139 None => return Ok(None),
140 };
141 let read = read
142 .parse()
143 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
144 let write = write
145 .parse()
146 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
147
148 if read < 0 {
151 return Err(FromEnvErrorInner::NegativeFd(read));
152 }
153 if write < 0 {
154 return Err(FromEnvErrorInner::NegativeFd(write));
155 }
156
157 let creation_arg = ClientCreationArg::Fds { read, write };
158
159 match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
170 (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
171 (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
172 (read_err, write_err) => {
173 read_err?;
174 write_err?;
175
176 #[cfg(target_os = "linux")]
185 if let (Ok(read), Ok(write)) = (
186 File::open(format!("/dev/fd/{}", read)),
187 OpenOptions::new()
188 .write(true)
189 .open(format!("/dev/fd/{}", write)),
190 ) {
191 return Ok(Some(Client {
192 read,
193 write,
194 creation_arg,
195 is_non_blocking: Some(AtomicBool::new(false)),
196 }));
197 }
198 }
199 }
200
201 Ok(Some(Client {
202 read: clone_fd_and_set_cloexec(read)?,
203 write: clone_fd_and_set_cloexec(write)?,
204 creation_arg,
205 is_non_blocking: None,
206 }))
207 }
208
209 unsafe fn from_fds(read: c_int, write: c_int) -> Client {
210 Client {
211 read: File::from_raw_fd(read),
212 write: File::from_raw_fd(write),
213 creation_arg: ClientCreationArg::Fds { read, write },
214 is_non_blocking: None,
215 }
216 }
217
218 pub fn acquire(&self) -> io::Result<Acquired> {
219 loop {
221 if let Some(token) = self.acquire_allow_interrupts()? {
222 return Ok(token);
223 }
224 }
225 }
226
227 fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
230 unsafe {
251 let mut fd: libc::pollfd = mem::zeroed();
252 let mut read = &self.read;
253 fd.fd = read.as_raw_fd();
254 fd.events = libc::POLLIN;
255 loop {
256 let mut buf = [0];
257 match read.read(&mut buf) {
258 Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
259 Ok(_) => {
260 return Err(io::Error::new(
261 io::ErrorKind::UnexpectedEof,
262 "early EOF on jobserver pipe",
263 ));
264 }
265 Err(e) => match e.kind() {
266 io::ErrorKind::WouldBlock => { }
267 io::ErrorKind::Interrupted => return Ok(None),
268 _ => return Err(e),
269 },
270 }
271
272 loop {
273 fd.revents = 0;
274 if libc::poll(&mut fd, 1, -1) == -1 {
275 let e = io::Error::last_os_error();
276 return match e.kind() {
277 io::ErrorKind::Interrupted => Ok(None),
278 _ => Err(e),
279 };
280 }
281 if fd.revents != 0 {
282 break;
283 }
284 }
285 }
286 }
287 }
288
289 pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
290 let mut buf = [0];
291 let mut fifo = &self.read;
292
293 if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
294 if !is_non_blocking.load(Ordering::Relaxed) {
295 set_nonblocking(fifo.as_raw_fd(), true)?;
296 is_non_blocking.store(true, Ordering::Relaxed);
297 }
298 } else {
299 return Err(io::ErrorKind::Unsupported.into());
300 }
301
302 loop {
303 match fifo.read(&mut buf) {
304 Ok(1) => break Ok(Some(Acquired { byte: buf[0] })),
305 Ok(_) => {
306 break Err(io::Error::new(
307 io::ErrorKind::UnexpectedEof,
308 "early EOF on jobserver pipe",
309 ))
310 }
311
312 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None),
313 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
314
315 Err(err) => break Err(err),
316 }
317 }
318 }
319
320 pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
321 let byte = data.map(|d| d.byte).unwrap_or(b'+');
326 match (&self.write).write(&[byte])? {
327 1 => Ok(()),
328 _ => Err(io::Error::new(
329 io::ErrorKind::Other,
330 "failed to write token back to jobserver",
331 )),
332 }
333 }
334
335 pub fn string_arg(&self) -> String {
336 match &self.creation_arg {
337 ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
338 ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
339 }
340 }
341
342 pub fn available(&self) -> io::Result<usize> {
343 let mut len = MaybeUninit::<c_int>::uninit();
344 cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
345 Ok(unsafe { len.assume_init() } as usize)
346 }
347
348 pub fn configure(&self, cmd: &mut Command) {
349 if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
350 return;
353 }
354 let read = self.read.as_raw_fd();
359 let write = self.write.as_raw_fd();
360 unsafe {
361 cmd.pre_exec(move || {
362 set_cloexec(read, false)?;
363 set_cloexec(write, false)?;
364 Ok(())
365 });
366 }
367 }
368}
369
370#[derive(Debug)]
371pub struct Helper {
372 thread: JoinHandle<()>,
373 state: Arc<super::HelperState>,
374}
375
376pub(crate) fn spawn_helper(
377 client: crate::Client,
378 state: Arc<super::HelperState>,
379 mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
380) -> io::Result<Helper> {
381 #[cfg(not(miri))]
382 {
383 use std::sync::Once;
384
385 static USR1_INIT: Once = Once::new();
386 let mut err = None;
387 USR1_INIT.call_once(|| unsafe {
388 let mut new: libc::sigaction = mem::zeroed();
389 new.sa_sigaction = sigusr1_handler as usize;
390 new.sa_flags = libc::SA_SIGINFO as _;
391 if libc::sigaction(libc::SIGUSR1, &new, std::ptr::null_mut()) != 0 {
392 err = Some(io::Error::last_os_error());
393 }
394 });
395
396 if let Some(e) = err.take() {
397 return Err(e);
398 }
399 }
400
401 let state2 = state.clone();
402 let thread = Builder::new().spawn(move || {
403 state2.for_each_request(|helper| loop {
404 match client.inner.acquire_allow_interrupts() {
405 Ok(Some(data)) => {
406 break f(Ok(crate::Acquired {
407 client: client.inner.clone(),
408 data,
409 disabled: false,
410 }));
411 }
412 Err(e) => break f(Err(e)),
413 Ok(None) if helper.lock().producer_done => break,
414 Ok(None) => {}
415 }
416 });
417 })?;
418
419 Ok(Helper { thread, state })
420}
421
422impl Helper {
423 pub fn join(self) {
424 let dur = Duration::from_millis(10);
425 let mut state = self.state.lock();
426 debug_assert!(state.producer_done);
427
428 for _ in 0..100 {
440 if state.consumer_done {
441 break;
442 }
443 #[cfg(not(miri))]
444 unsafe {
445 libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
451 }
452 state = self
453 .state
454 .cvar
455 .wait_timeout(state, dur)
456 .unwrap_or_else(|e| e.into_inner())
457 .0;
458 thread::yield_now(); }
460
461 if state.consumer_done {
465 drop(self.thread.join());
466 }
467 }
468}
469
470unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
471 match libc::fcntl(fd, libc::F_GETFD) {
472 -1 => Err(FromEnvErrorInner::CannotOpenFd(
473 fd,
474 io::Error::last_os_error(),
475 )),
476 _ => Ok(()),
477 }
478}
479
480unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
481 if check_pipe {
482 let mut stat = mem::zeroed();
483 if libc::fstat(fd, &mut stat) == -1 {
484 let last_os_error = io::Error::last_os_error();
485 fcntl_check(fd)?;
486 Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
487 } else {
488 #[allow(unused_assignments)]
493 let mut s_ififo = stat.st_mode;
494 s_ififo = libc::S_IFIFO as _;
495 if stat.st_mode & s_ififo == s_ififo {
496 return Ok(());
497 }
498 Err(FromEnvErrorInner::NotAPipe(fd, None))
499 }
500 } else {
501 fcntl_check(fd)
502 }
503}
504
505fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> {
506 unsafe { BorrowedFd::borrow_raw(fd) }
508 .try_clone_to_owned()
509 .map(File::from)
510 .map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err))
511}
512
513fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
514 unsafe {
515 let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
516 let new = if set {
517 previous | libc::FD_CLOEXEC
518 } else {
519 previous & !libc::FD_CLOEXEC
520 };
521 if new != previous {
522 cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
523 }
524 Ok(())
525 }
526}
527
528fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
529 let status_flag = if set { libc::O_NONBLOCK } else { 0 };
530
531 unsafe {
532 cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
533 }
534
535 Ok(())
536}
537
538fn cvt(t: c_int) -> io::Result<c_int> {
539 if t == -1 {
540 Err(io::Error::last_os_error())
541 } else {
542 Ok(t)
543 }
544}
545
546#[cfg(not(miri))]
547extern "C" fn sigusr1_handler(
548 _signum: c_int,
549 _info: *mut libc::siginfo_t,
550 _ptr: *mut libc::c_void,
551) {
552 }
554
555#[cfg(test)]
556mod test {
557 use super::Client as ClientImp;
558
559 use crate::{test::run_named_fifo_try_acquire_tests, Client};
560
561 use std::{fs::File, io::Write, os::unix::io::AsRawFd, sync::Arc};
562
563 fn from_imp_client(imp: ClientImp) -> Client {
564 Client {
565 inner: Arc::new(imp),
566 }
567 }
568
569 fn new_client_from_fifo() -> (Client, String) {
570 let file = tempfile::NamedTempFile::new().unwrap();
571 let fifo_path = file.path().to_owned();
572 file.close().unwrap(); nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();
575
576 let arg = format!("fifo:{}", fifo_path.to_str().unwrap());
577
578 (
579 ClientImp::from_fifo(&arg)
580 .unwrap()
581 .map(from_imp_client)
582 .unwrap(),
583 arg,
584 )
585 }
586
587 fn new_client_from_pipe() -> (Client, String) {
588 let (read, write) = nix::unistd::pipe().unwrap();
589 let read = File::from(read);
590 let mut write = File::from(write);
591
592 write.write_all(b"1").unwrap();
593
594 let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());
595
596 (
597 unsafe { ClientImp::from_pipe(&arg, true) }
598 .unwrap()
599 .map(from_imp_client)
600 .unwrap(),
601 arg,
602 )
603 }
604
605 #[test]
606 fn test_try_acquire_named_fifo() {
607 run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
608 }
609
610 #[test]
611 fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
612 #[cfg(not(target_os = "linux"))]
613 assert_eq!(
614 new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
615 std::io::ErrorKind::Unsupported
616 );
617
618 #[cfg(target_os = "linux")]
619 {
620 let client = new_client_from_pipe().0;
621 client.acquire().unwrap().drop_without_releasing();
622 run_named_fifo_try_acquire_tests(&client);
623 }
624 }
625
626 #[test]
627 fn test_string_arg() {
628 let (client, arg) = new_client_from_fifo();
629 assert_eq!(client.inner.string_arg(), arg);
630
631 let (client, arg) = new_client_from_pipe();
632 assert_eq!(client.inner.string_arg(), arg);
633 }
634}