1use libc::c_int;
2
3use crate::FromEnvErrorInner;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::mem;
7use std::mem::MaybeUninit;
8use std::os::unix::prelude::*;
9use std::path::Path;
10use std::process::Command;
11use std::ptr;
12use std::sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc, Once,
15};
16use std::thread::{self, Builder, JoinHandle};
17use std::time::Duration;
18
19#[derive(Debug)]
20enum ClientCreationArg {
25 Fds { read: c_int, write: c_int },
26 Fifo(Box<Path>),
27}
28
29#[derive(Debug)]
30pub struct Client {
31 read: File,
32 write: File,
33 creation_arg: ClientCreationArg,
34 is_non_blocking: Option<AtomicBool>,
41}
42
43#[derive(Debug)]
44pub struct Acquired {
45 byte: u8,
46}
47
48impl Client {
49 pub fn new(mut limit: usize) -> io::Result<Client> {
50 let client = unsafe { Client::mk()? };
51
52 const BUFFER: [u8; 128] = [b'|'; 128];
55
56 let mut write = &client.write;
57
58 set_nonblocking(write.as_raw_fd(), true)?;
59
60 while limit > 0 {
61 let n = limit.min(BUFFER.len());
62
63 write.write_all(&BUFFER[..n])?;
64 limit -= n;
65 }
66
67 set_nonblocking(write.as_raw_fd(), false)?;
68
69 Ok(client)
70 }
71
72 unsafe fn mk() -> io::Result<Client> {
73 let mut pipes = [0; 2];
74
75 #[cfg(target_os = "linux")]
79 {
80 static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
81 if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
82 match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
83 -1 => {
84 let err = io::Error::last_os_error();
85 if err.raw_os_error() == Some(libc::ENOSYS) {
86 PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
87 } else {
88 return Err(err);
89 }
90 }
91 _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
92 }
93 }
94 }
95
96 cvt(libc::pipe(pipes.as_mut_ptr()))?;
97 drop(set_cloexec(pipes[0], true));
98 drop(set_cloexec(pipes[1], true));
99 Ok(Client::from_fds(pipes[0], pipes[1]))
100 }
101
102 pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
103 if let Some(client) = Self::from_fifo(s)? {
104 return Ok(client);
105 }
106 if let Some(client) = Self::from_pipe(s, check_pipe)? {
107 return Ok(client);
108 }
109 Err(FromEnvErrorInner::CannotParse(format!(
110 "expected `fifo:PATH` or `R,W`, found `{s}`"
111 )))
112 }
113
114 fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
116 let mut parts = s.splitn(2, ':');
117 if parts.next().unwrap() != "fifo" {
118 return Ok(None);
119 }
120 let path_str = parts.next().ok_or_else(|| {
121 FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
122 })?;
123 let path = Path::new(path_str);
124
125 let open_file = || {
126 OpenOptions::new()
131 .read(true)
132 .write(true)
133 .open(path)
134 .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
135 };
136
137 Ok(Some(Client {
138 read: open_file()?,
139 write: open_file()?,
140 creation_arg: ClientCreationArg::Fifo(path.into()),
141 is_non_blocking: Some(AtomicBool::new(false)),
142 }))
143 }
144
145 unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
147 let mut parts = s.splitn(2, ',');
148 let read = parts.next().unwrap();
149 let write = match parts.next() {
150 Some(w) => w,
151 None => return Ok(None),
152 };
153 let read = read
154 .parse()
155 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
156 let write = write
157 .parse()
158 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
159
160 if read < 0 {
163 return Err(FromEnvErrorInner::NegativeFd(read));
164 }
165 if write < 0 {
166 return Err(FromEnvErrorInner::NegativeFd(write));
167 }
168
169 let creation_arg = ClientCreationArg::Fds { read, write };
170
171 match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
182 (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
183 (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
184 (read_err, write_err) => {
185 read_err?;
186 write_err?;
187
188 #[cfg(target_os = "linux")]
197 if let (Ok(read), Ok(write)) = (
198 File::open(format!("/dev/fd/{}", read)),
199 OpenOptions::new()
200 .write(true)
201 .open(format!("/dev/fd/{}", write)),
202 ) {
203 return Ok(Some(Client {
204 read,
205 write,
206 creation_arg,
207 is_non_blocking: Some(AtomicBool::new(false)),
208 }));
209 }
210 }
211 }
212
213 Ok(Some(Client {
214 read: clone_fd_and_set_cloexec(read)?,
215 write: clone_fd_and_set_cloexec(write)?,
216 creation_arg,
217 is_non_blocking: None,
218 }))
219 }
220
221 unsafe fn from_fds(read: c_int, write: c_int) -> Client {
222 Client {
223 read: File::from_raw_fd(read),
224 write: File::from_raw_fd(write),
225 creation_arg: ClientCreationArg::Fds { read, write },
226 is_non_blocking: None,
227 }
228 }
229
230 pub fn acquire(&self) -> io::Result<Acquired> {
231 loop {
233 if let Some(token) = self.acquire_allow_interrupts()? {
234 return Ok(token);
235 }
236 }
237 }
238
239 fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
242 unsafe {
263 let mut fd: libc::pollfd = mem::zeroed();
264 let mut read = &self.read;
265 fd.fd = read.as_raw_fd();
266 fd.events = libc::POLLIN;
267 loop {
268 let mut buf = [0];
269 match read.read(&mut buf) {
270 Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
271 Ok(_) => {
272 return Err(io::Error::new(
273 io::ErrorKind::UnexpectedEof,
274 "early EOF on jobserver pipe",
275 ));
276 }
277 Err(e) => match e.kind() {
278 io::ErrorKind::WouldBlock => { }
279 io::ErrorKind::Interrupted => return Ok(None),
280 _ => return Err(e),
281 },
282 }
283
284 loop {
285 fd.revents = 0;
286 if libc::poll(&mut fd, 1, -1) == -1 {
287 let e = io::Error::last_os_error();
288 return match e.kind() {
289 io::ErrorKind::Interrupted => Ok(None),
290 _ => Err(e),
291 };
292 }
293 if fd.revents != 0 {
294 break;
295 }
296 }
297 }
298 }
299 }
300
301 pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
302 let mut buf = [0];
303 let mut fifo = &self.read;
304
305 if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
306 if !is_non_blocking.load(Ordering::Relaxed) {
307 set_nonblocking(fifo.as_raw_fd(), true)?;
308 is_non_blocking.store(true, Ordering::Relaxed);
309 }
310 } else {
311 return Err(io::ErrorKind::Unsupported.into());
312 }
313
314 loop {
315 match fifo.read(&mut buf) {
316 Ok(1) => break Ok(Some(Acquired { byte: buf[0] })),
317 Ok(_) => {
318 break Err(io::Error::new(
319 io::ErrorKind::UnexpectedEof,
320 "early EOF on jobserver pipe",
321 ))
322 }
323
324 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None),
325 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
326
327 Err(err) => break Err(err),
328 }
329 }
330 }
331
332 pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
333 let byte = data.map(|d| d.byte).unwrap_or(b'+');
338 match (&self.write).write(&[byte])? {
339 1 => Ok(()),
340 _ => Err(io::Error::new(
341 io::ErrorKind::Other,
342 "failed to write token back to jobserver",
343 )),
344 }
345 }
346
347 pub fn string_arg(&self) -> String {
348 match &self.creation_arg {
349 ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
350 ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
351 }
352 }
353
354 pub fn available(&self) -> io::Result<usize> {
355 let mut len = MaybeUninit::<c_int>::uninit();
356 cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
357 Ok(unsafe { len.assume_init() } as usize)
358 }
359
360 pub fn configure(&self, cmd: &mut Command) {
361 if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
362 return;
365 }
366 let read = self.read.as_raw_fd();
371 let write = self.write.as_raw_fd();
372 unsafe {
373 cmd.pre_exec(move || {
374 set_cloexec(read, false)?;
375 set_cloexec(write, false)?;
376 Ok(())
377 });
378 }
379 }
380}
381
382#[derive(Debug)]
383pub struct Helper {
384 thread: JoinHandle<()>,
385 state: Arc<super::HelperState>,
386}
387
388pub(crate) fn spawn_helper(
389 client: crate::Client,
390 state: Arc<super::HelperState>,
391 mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
392) -> io::Result<Helper> {
393 static USR1_INIT: Once = Once::new();
394 let mut err = None;
395 USR1_INIT.call_once(|| unsafe {
396 let mut new: libc::sigaction = mem::zeroed();
397 new.sa_sigaction = sigusr1_handler as usize;
398 new.sa_flags = libc::SA_SIGINFO as _;
399 if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
400 err = Some(io::Error::last_os_error());
401 }
402 });
403
404 if let Some(e) = err.take() {
405 return Err(e);
406 }
407
408 let state2 = state.clone();
409 let thread = Builder::new().spawn(move || {
410 state2.for_each_request(|helper| loop {
411 match client.inner.acquire_allow_interrupts() {
412 Ok(Some(data)) => {
413 break f(Ok(crate::Acquired {
414 client: client.inner.clone(),
415 data,
416 disabled: false,
417 }));
418 }
419 Err(e) => break f(Err(e)),
420 Ok(None) if helper.lock().producer_done => break,
421 Ok(None) => {}
422 }
423 });
424 })?;
425
426 Ok(Helper { thread, state })
427}
428
429impl Helper {
430 pub fn join(self) {
431 let dur = Duration::from_millis(10);
432 let mut state = self.state.lock();
433 debug_assert!(state.producer_done);
434
435 for _ in 0..100 {
447 if state.consumer_done {
448 break;
449 }
450 unsafe {
451 libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
457 }
458 state = self
459 .state
460 .cvar
461 .wait_timeout(state, dur)
462 .unwrap_or_else(|e| e.into_inner())
463 .0;
464 thread::yield_now(); }
466
467 if state.consumer_done {
471 drop(self.thread.join());
472 }
473 }
474}
475
476unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
477 match libc::fcntl(fd, libc::F_GETFD) {
478 -1 => Err(FromEnvErrorInner::CannotOpenFd(
479 fd,
480 io::Error::last_os_error(),
481 )),
482 _ => Ok(()),
483 }
484}
485
486unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
487 if check_pipe {
488 let mut stat = mem::zeroed();
489 if libc::fstat(fd, &mut stat) == -1 {
490 let last_os_error = io::Error::last_os_error();
491 fcntl_check(fd)?;
492 Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
493 } else {
494 #[allow(unused_assignments)]
499 let mut s_ififo = stat.st_mode;
500 s_ififo = libc::S_IFIFO as _;
501 if stat.st_mode & s_ififo == s_ififo {
502 return Ok(());
503 }
504 Err(FromEnvErrorInner::NotAPipe(fd, None))
505 }
506 } else {
507 fcntl_check(fd)
508 }
509}
510
511fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> {
512 unsafe { BorrowedFd::borrow_raw(fd) }
514 .try_clone_to_owned()
515 .map(File::from)
516 .map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err))
517}
518
519fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
520 unsafe {
521 let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
522 let new = if set {
523 previous | libc::FD_CLOEXEC
524 } else {
525 previous & !libc::FD_CLOEXEC
526 };
527 if new != previous {
528 cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
529 }
530 Ok(())
531 }
532}
533
534fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
535 let status_flag = if set { libc::O_NONBLOCK } else { 0 };
536
537 unsafe {
538 cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
539 }
540
541 Ok(())
542}
543
544fn cvt(t: c_int) -> io::Result<c_int> {
545 if t == -1 {
546 Err(io::Error::last_os_error())
547 } else {
548 Ok(t)
549 }
550}
551
552extern "C" fn sigusr1_handler(
553 _signum: c_int,
554 _info: *mut libc::siginfo_t,
555 _ptr: *mut libc::c_void,
556) {
557 }
559
560#[cfg(test)]
561mod test {
562 use super::Client as ClientImp;
563
564 use crate::{test::run_named_fifo_try_acquire_tests, Client};
565
566 use std::{
567 fs::File,
568 io::{self, Write},
569 os::unix::io::AsRawFd,
570 sync::Arc,
571 };
572
573 fn from_imp_client(imp: ClientImp) -> Client {
574 Client {
575 inner: Arc::new(imp),
576 }
577 }
578
579 fn new_client_from_fifo() -> (Client, String) {
580 let file = tempfile::NamedTempFile::new().unwrap();
581 let fifo_path = file.path().to_owned();
582 file.close().unwrap(); nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();
585
586 let arg = format!("fifo:{}", fifo_path.to_str().unwrap());
587
588 (
589 ClientImp::from_fifo(&arg)
590 .unwrap()
591 .map(from_imp_client)
592 .unwrap(),
593 arg,
594 )
595 }
596
597 fn new_client_from_pipe() -> (Client, String) {
598 let (read, write) = nix::unistd::pipe().unwrap();
599 let read = File::from(read);
600 let mut write = File::from(write);
601
602 write.write_all(b"1").unwrap();
603
604 let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());
605
606 (
607 unsafe { ClientImp::from_pipe(&arg, true) }
608 .unwrap()
609 .map(from_imp_client)
610 .unwrap(),
611 arg,
612 )
613 }
614
615 #[test]
616 fn test_try_acquire_named_fifo() {
617 run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
618 }
619
620 #[test]
621 fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
622 #[cfg(not(target_os = "linux"))]
623 assert_eq!(
624 new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
625 io::ErrorKind::Unsupported
626 );
627
628 #[cfg(target_os = "linux")]
629 {
630 let client = new_client_from_pipe().0;
631 client.acquire().unwrap().drop_without_releasing();
632 run_named_fifo_try_acquire_tests(&client);
633 }
634 }
635
636 #[test]
637 fn test_string_arg() {
638 let (client, arg) = new_client_from_fifo();
639 assert_eq!(client.inner.string_arg(), arg);
640
641 let (client, arg) = new_client_from_pipe();
642 assert_eq!(client.inner.string_arg(), arg);
643 }
644}