1use libc::c_int;
2
3use crate::FromEnvErrorInner;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::mem;
7use std::mem::MaybeUninit;
8use std::os::unix::prelude::*;
9use std::path::Path;
10use std::process::Command;
11use std::ptr;
12use std::sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc, Once,
15};
16use std::thread::{self, Builder, JoinHandle};
17use std::time::Duration;
18
19#[derive(Debug)]
20enum ClientCreationArg {
25 Fds { read: c_int, write: c_int },
26 Fifo(Box<Path>),
27}
28
29#[derive(Debug)]
30pub struct Client {
31 read: File,
32 write: File,
33 creation_arg: ClientCreationArg,
34 is_non_blocking: Option<AtomicBool>,
41}
42
43#[derive(Debug)]
44pub struct Acquired {
45 byte: u8,
46}
47
48impl Client {
49 pub fn new(mut limit: usize) -> io::Result<Client> {
50 let client = unsafe { Client::mk()? };
51
52 const BUFFER: [u8; 128] = [b'|'; 128];
55
56 let mut write = &client.write;
57
58 set_nonblocking(write.as_raw_fd(), true)?;
59
60 while limit > 0 {
61 let n = limit.min(BUFFER.len());
62
63 write.write_all(&BUFFER[..n])?;
64 limit -= n;
65 }
66
67 set_nonblocking(write.as_raw_fd(), false)?;
68
69 Ok(client)
70 }
71
72 unsafe fn mk() -> io::Result<Client> {
73 let mut pipes = [0; 2];
74
75 #[cfg(target_os = "linux")]
79 {
80 static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
81 if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
82 match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
83 -1 => {
84 let err = io::Error::last_os_error();
85 if err.raw_os_error() == Some(libc::ENOSYS) {
86 PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
87 } else {
88 return Err(err);
89 }
90 }
91 _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
92 }
93 }
94 }
95
96 cvt(libc::pipe(pipes.as_mut_ptr()))?;
97 drop(set_cloexec(pipes[0], true));
98 drop(set_cloexec(pipes[1], true));
99 Ok(Client::from_fds(pipes[0], pipes[1]))
100 }
101
102 pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
103 if let Some(client) = Self::from_fifo(s)? {
104 return Ok(client);
105 }
106 if let Some(client) = Self::from_pipe(s, check_pipe)? {
107 return Ok(client);
108 }
109 Err(FromEnvErrorInner::CannotParse(format!(
110 "expected `fifo:PATH` or `R,W`, found `{s}`"
111 )))
112 }
113
114 fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
116 let mut parts = s.splitn(2, ':');
117 if parts.next().unwrap() != "fifo" {
118 return Ok(None);
119 }
120 let path_str = parts.next().ok_or_else(|| {
121 FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
122 })?;
123 let path = Path::new(path_str);
124
125 let open_file = || {
126 OpenOptions::new()
131 .read(true)
132 .write(true)
133 .open(path)
134 .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
135 };
136
137 Ok(Some(Client {
138 read: open_file()?,
139 write: open_file()?,
140 creation_arg: ClientCreationArg::Fifo(path.into()),
141 is_non_blocking: Some(AtomicBool::new(false)),
142 }))
143 }
144
145 unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
147 let mut parts = s.splitn(2, ',');
148 let read = parts.next().unwrap();
149 let write = match parts.next() {
150 Some(w) => w,
151 None => return Ok(None),
152 };
153 let read = read
154 .parse()
155 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
156 let write = write
157 .parse()
158 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
159
160 if read < 0 {
163 return Err(FromEnvErrorInner::NegativeFd(read));
164 }
165 if write < 0 {
166 return Err(FromEnvErrorInner::NegativeFd(write));
167 }
168
169 let creation_arg = ClientCreationArg::Fds { read, write };
170
171 match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
182 (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
183 (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
184 (read_err, write_err) => {
185 read_err?;
186 write_err?;
187
188 #[cfg(target_os = "linux")]
197 if let (Ok(read), Ok(write)) = (
198 File::open(format!("/dev/fd/{}", read)),
199 OpenOptions::new()
200 .write(true)
201 .open(format!("/dev/fd/{}", write)),
202 ) {
203 return Ok(Some(Client {
204 read,
205 write,
206 creation_arg,
207 is_non_blocking: Some(AtomicBool::new(false)),
208 }));
209 }
210 }
211 }
212
213 Ok(Some(Client {
214 read: clone_fd_and_set_cloexec(read)?,
215 write: clone_fd_and_set_cloexec(write)?,
216 creation_arg,
217 is_non_blocking: None,
218 }))
219 }
220
221 unsafe fn from_fds(read: c_int, write: c_int) -> Client {
222 Client {
223 read: File::from_raw_fd(read),
224 write: File::from_raw_fd(write),
225 creation_arg: ClientCreationArg::Fds { read, write },
226 is_non_blocking: None,
227 }
228 }
229
230 pub fn acquire(&self) -> io::Result<Acquired> {
231 loop {
233 if let Some(token) = self.acquire_allow_interrupts()? {
234 return Ok(token);
235 }
236 }
237 }
238
239 fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
242 unsafe {
263 let mut fd: libc::pollfd = mem::zeroed();
264 let mut read = &self.read;
265 fd.fd = read.as_raw_fd();
266 fd.events = libc::POLLIN;
267 loop {
268 let mut buf = [0];
269 match read.read(&mut buf) {
270 Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
271 Ok(_) => {
272 return Err(io::Error::new(
273 io::ErrorKind::UnexpectedEof,
274 "early EOF on jobserver pipe",
275 ));
276 }
277 Err(e) => match e.kind() {
278 io::ErrorKind::WouldBlock => { }
279 io::ErrorKind::Interrupted => return Ok(None),
280 _ => return Err(e),
281 },
282 }
283
284 loop {
285 fd.revents = 0;
286 if libc::poll(&mut fd, 1, -1) == -1 {
287 let e = io::Error::last_os_error();
288 return match e.kind() {
289 io::ErrorKind::Interrupted => Ok(None),
290 _ => Err(e),
291 };
292 }
293 if fd.revents != 0 {
294 break;
295 }
296 }
297 }
298 }
299 }
300
301 pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
302 let mut buf = [0];
303 let mut fifo = &self.read;
304
305 if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
306 if !is_non_blocking.load(Ordering::Relaxed) {
307 set_nonblocking(fifo.as_raw_fd(), true)?;
308 is_non_blocking.store(true, Ordering::Relaxed);
309 }
310 } else {
311 return Err(io::ErrorKind::Unsupported.into());
312 }
313
314 loop {
315 match fifo.read(&mut buf) {
316 Ok(1) => break Ok(Some(Acquired { byte: buf[0] })),
317 Ok(_) => {
318 break Err(io::Error::new(
319 io::ErrorKind::UnexpectedEof,
320 "early EOF on jobserver pipe",
321 ))
322 }
323
324 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None),
325 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
326
327 Err(err) => break Err(err),
328 }
329 }
330 }
331
332 pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
333 let byte = data.map(|d| d.byte).unwrap_or(b'+');
338 match (&self.write).write(&[byte])? {
339 1 => Ok(()),
340 _ => Err(io::Error::new(
341 io::ErrorKind::Other,
342 "failed to write token back to jobserver",
343 )),
344 }
345 }
346
347 pub fn string_arg(&self) -> String {
348 match &self.creation_arg {
349 ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
350 ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
351 }
352 }
353
354 pub fn available(&self) -> io::Result<usize> {
355 let mut len = MaybeUninit::<c_int>::uninit();
356 cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
357 Ok(unsafe { len.assume_init() } as usize)
358 }
359
360 pub fn configure(&self, cmd: &mut Command) {
361 if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
362 return;
365 }
366 let read = self.read.as_raw_fd();
371 let write = self.write.as_raw_fd();
372 unsafe {
373 cmd.pre_exec(move || {
374 set_cloexec(read, false)?;
375 set_cloexec(write, false)?;
376 Ok(())
377 });
378 }
379 }
380}
381
382#[derive(Debug)]
383pub struct Helper {
384 thread: JoinHandle<()>,
385 state: Arc<super::HelperState>,
386}
387
388pub(crate) fn spawn_helper(
389 client: crate::Client,
390 state: Arc<super::HelperState>,
391 mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
392) -> io::Result<Helper> {
393 static USR1_INIT: Once = Once::new();
394 let mut err = None;
395 USR1_INIT.call_once(|| unsafe {
396 let mut new: libc::sigaction = mem::zeroed();
397 #[cfg(target_os = "aix")]
398 {
399 new.sa_union.__su_sigaction = sigusr1_handler;
400 }
401 #[cfg(not(target_os = "aix"))]
402 {
403 new.sa_sigaction = sigusr1_handler as usize;
404 }
405 new.sa_flags = libc::SA_SIGINFO as _;
406 if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
407 err = Some(io::Error::last_os_error());
408 }
409 });
410
411 if let Some(e) = err.take() {
412 return Err(e);
413 }
414
415 let state2 = state.clone();
416 let thread = Builder::new().spawn(move || {
417 state2.for_each_request(|helper| loop {
418 match client.inner.acquire_allow_interrupts() {
419 Ok(Some(data)) => {
420 break f(Ok(crate::Acquired {
421 client: client.inner.clone(),
422 data,
423 disabled: false,
424 }));
425 }
426 Err(e) => break f(Err(e)),
427 Ok(None) if helper.lock().producer_done => break,
428 Ok(None) => {}
429 }
430 });
431 })?;
432
433 Ok(Helper { thread, state })
434}
435
436impl Helper {
437 pub fn join(self) {
438 let dur = Duration::from_millis(10);
439 let mut state = self.state.lock();
440 debug_assert!(state.producer_done);
441
442 for _ in 0..100 {
454 if state.consumer_done {
455 break;
456 }
457 unsafe {
458 libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
464 }
465 state = self
466 .state
467 .cvar
468 .wait_timeout(state, dur)
469 .unwrap_or_else(|e| e.into_inner())
470 .0;
471 thread::yield_now(); }
473
474 if state.consumer_done {
478 drop(self.thread.join());
479 }
480 }
481}
482
483unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
484 match libc::fcntl(fd, libc::F_GETFD) {
485 -1 => Err(FromEnvErrorInner::CannotOpenFd(
486 fd,
487 io::Error::last_os_error(),
488 )),
489 _ => Ok(()),
490 }
491}
492
493unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
494 if check_pipe {
495 let mut stat = mem::zeroed();
496 if libc::fstat(fd, &mut stat) == -1 {
497 let last_os_error = io::Error::last_os_error();
498 fcntl_check(fd)?;
499 Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
500 } else {
501 #[allow(unused_assignments)]
506 let mut s_ififo = stat.st_mode;
507 s_ififo = libc::S_IFIFO as _;
508 if stat.st_mode & s_ififo == s_ififo {
509 return Ok(());
510 }
511 Err(FromEnvErrorInner::NotAPipe(fd, None))
512 }
513 } else {
514 fcntl_check(fd)
515 }
516}
517
518fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> {
519 unsafe { BorrowedFd::borrow_raw(fd) }
521 .try_clone_to_owned()
522 .map(File::from)
523 .map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err))
524}
525
526fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
527 unsafe {
528 let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
529 let new = if set {
530 previous | libc::FD_CLOEXEC
531 } else {
532 previous & !libc::FD_CLOEXEC
533 };
534 if new != previous {
535 cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
536 }
537 Ok(())
538 }
539}
540
541fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
542 let status_flag = if set { libc::O_NONBLOCK } else { 0 };
543
544 unsafe {
545 cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
546 }
547
548 Ok(())
549}
550
551fn cvt(t: c_int) -> io::Result<c_int> {
552 if t == -1 {
553 Err(io::Error::last_os_error())
554 } else {
555 Ok(t)
556 }
557}
558
559extern "C" fn sigusr1_handler(
560 _signum: c_int,
561 _info: *mut libc::siginfo_t,
562 _ptr: *mut libc::c_void,
563) {
564 }
566
567#[cfg(test)]
568mod test {
569 use super::Client as ClientImp;
570
571 use crate::{test::run_named_fifo_try_acquire_tests, Client};
572
573 use std::{
574 fs::File,
575 io::{self, Write},
576 os::unix::io::AsRawFd,
577 sync::Arc,
578 };
579
580 fn from_imp_client(imp: ClientImp) -> Client {
581 Client {
582 inner: Arc::new(imp),
583 }
584 }
585
586 fn new_client_from_fifo() -> (Client, String) {
587 let file = tempfile::NamedTempFile::new().unwrap();
588 let fifo_path = file.path().to_owned();
589 file.close().unwrap(); nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();
592
593 let arg = format!("fifo:{}", fifo_path.to_str().unwrap());
594
595 (
596 ClientImp::from_fifo(&arg)
597 .unwrap()
598 .map(from_imp_client)
599 .unwrap(),
600 arg,
601 )
602 }
603
604 fn new_client_from_pipe() -> (Client, String) {
605 let (read, write) = nix::unistd::pipe().unwrap();
606 let read = File::from(read);
607 let mut write = File::from(write);
608
609 write.write_all(b"1").unwrap();
610
611 let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());
612
613 (
614 unsafe { ClientImp::from_pipe(&arg, true) }
615 .unwrap()
616 .map(from_imp_client)
617 .unwrap(),
618 arg,
619 )
620 }
621
622 #[test]
623 fn test_try_acquire_named_fifo() {
624 run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
625 }
626
627 #[test]
628 fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
629 #[cfg(not(target_os = "linux"))]
630 assert_eq!(
631 new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
632 io::ErrorKind::Unsupported
633 );
634
635 #[cfg(target_os = "linux")]
636 {
637 let client = new_client_from_pipe().0;
638 client.acquire().unwrap().drop_without_releasing();
639 run_named_fifo_try_acquire_tests(&client);
640 }
641 }
642
643 #[test]
644 fn test_string_arg() {
645 let (client, arg) = new_client_from_fifo();
646 assert_eq!(client.inner.string_arg(), arg);
647
648 let (client, arg) = new_client_from_pipe();
649 assert_eq!(client.inner.string_arg(), arg);
650 }
651}