jobserver/
unix.rs

1use libc::c_int;
2
3use crate::FromEnvErrorInner;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::mem;
7use std::mem::MaybeUninit;
8use std::os::unix::prelude::*;
9use std::path::Path;
10use std::process::Command;
11use std::sync::{
12    atomic::{AtomicBool, Ordering},
13    Arc,
14};
15use std::thread::{self, Builder, JoinHandle};
16use std::time::Duration;
17
18#[derive(Debug)]
19/// This preserves the `--jobserver-auth` type at creation time,
20/// so auth type will be passed down to and inherit from sub-Make processes correctly.
21///
22/// See <https://github.com/rust-lang/jobserver-rs/issues/99> for details.
23enum ClientCreationArg {
24    Fds { read: c_int, write: c_int },
25    Fifo(Box<Path>),
26}
27
28#[derive(Debug)]
29pub struct Client {
30    read: File,
31    write: File,
32    creation_arg: ClientCreationArg,
33    /// It is set to `None` if the pipe is shared with other processes, so it
34    /// cannot support non-blocking mode.
35    ///
36    /// If it is set to `Some`, then it can only go from
37    /// `Some(false)` -> `Some(true)` but not the other way around,
38    /// since that could cause a race condition.
39    is_non_blocking: Option<AtomicBool>,
40}
41
42#[derive(Debug)]
43pub struct Acquired {
44    byte: u8,
45}
46
47impl Client {
48    pub fn new(mut limit: usize) -> io::Result<Client> {
49        let client = unsafe { Client::mk()? };
50
51        // I don't think the character written here matters, but I could be
52        // wrong!
53        const BUFFER: [u8; 128] = [b'|'; 128];
54
55        let mut write = &client.write;
56
57        set_nonblocking(write.as_raw_fd(), true)?;
58
59        while limit > 0 {
60            let n = limit.min(BUFFER.len());
61
62            write.write_all(&BUFFER[..n])?;
63            limit -= n;
64        }
65
66        set_nonblocking(write.as_raw_fd(), false)?;
67
68        Ok(client)
69    }
70
71    unsafe fn mk() -> io::Result<Client> {
72        let mut pipes = [0; 2];
73
74        // Atomically-create-with-cloexec on Linux.
75        #[cfg(target_os = "linux")]
76        if libc::pipe2(pipes.as_mut_ptr(), libc::O_CLOEXEC) == -1 {
77            return Err(io::Error::last_os_error());
78        }
79
80        #[cfg(not(target_os = "linux"))]
81        {
82            cvt(libc::pipe(pipes.as_mut_ptr()))?;
83            drop(set_cloexec(pipes[0], true));
84            drop(set_cloexec(pipes[1], true));
85        }
86
87        Ok(Client::from_fds(pipes[0], pipes[1]))
88    }
89
90    pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
91        if let Some(client) = Self::from_fifo(s)? {
92            return Ok(client);
93        }
94        if let Some(client) = Self::from_pipe(s, check_pipe)? {
95            return Ok(client);
96        }
97        Err(FromEnvErrorInner::CannotParse(format!(
98            "expected `fifo:PATH` or `R,W`, found `{s}`"
99        )))
100    }
101
102    /// `--jobserver-auth=fifo:PATH`
103    fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
104        let mut parts = s.splitn(2, ':');
105        if parts.next().unwrap() != "fifo" {
106            return Ok(None);
107        }
108        let path_str = parts.next().ok_or_else(|| {
109            FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
110        })?;
111        let path = Path::new(path_str);
112
113        let open_file = || {
114            // Opening with read write is necessary, since opening with
115            // read-only or write-only could block the thread until another
116            // thread opens it with write-only or read-only (or RDWR)
117            // correspondingly.
118            OpenOptions::new()
119                .read(true)
120                .write(true)
121                .open(path)
122                .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
123        };
124
125        Ok(Some(Client {
126            read: open_file()?,
127            write: open_file()?,
128            creation_arg: ClientCreationArg::Fifo(path.into()),
129            is_non_blocking: Some(AtomicBool::new(false)),
130        }))
131    }
132
133    /// `--jobserver-auth=R,W`
134    unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
135        let mut parts = s.splitn(2, ',');
136        let read = parts.next().unwrap();
137        let write = match parts.next() {
138            Some(w) => w,
139            None => return Ok(None),
140        };
141        let read = read
142            .parse()
143            .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
144        let write = write
145            .parse()
146            .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
147
148        // If either or both of these file descriptors are negative,
149        // it means the jobserver is disabled for this process.
150        if read < 0 {
151            return Err(FromEnvErrorInner::NegativeFd(read));
152        }
153        if write < 0 {
154            return Err(FromEnvErrorInner::NegativeFd(write));
155        }
156
157        let creation_arg = ClientCreationArg::Fds { read, write };
158
159        // Ok so we've got two integers that look like file descriptors, but
160        // for extra sanity checking let's see if they actually look like
161        // valid files and instances of a pipe if feature enabled before we
162        // return the client.
163        //
164        // If we're called from `make` *without* the leading + on our rule
165        // then we'll have `MAKEFLAGS` env vars but won't actually have
166        // access to the file descriptors.
167        //
168        // `NotAPipe` is a worse error, return it if it's reported for any of the two fds.
169        match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
170            (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
171            (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
172            (read_err, write_err) => {
173                read_err?;
174                write_err?;
175
176                // Optimization: Try converting it to a fifo by using /dev/fd
177                //
178                // On linux, opening `/dev/fd/$fd` returns a fd with a new file description,
179                // so we can set `O_NONBLOCK` on it without affecting other processes.
180                //
181                // On macOS, opening `/dev/fd/$fd` seems to be the same as `File::try_clone`.
182                //
183                // I tested this on macOS 14 and Linux 6.5.13
184                #[cfg(target_os = "linux")]
185                if let (Ok(read), Ok(write)) = (
186                    File::open(format!("/dev/fd/{}", read)),
187                    OpenOptions::new()
188                        .write(true)
189                        .open(format!("/dev/fd/{}", write)),
190                ) {
191                    return Ok(Some(Client {
192                        read,
193                        write,
194                        creation_arg,
195                        is_non_blocking: Some(AtomicBool::new(false)),
196                    }));
197                }
198            }
199        }
200
201        Ok(Some(Client {
202            read: clone_fd_and_set_cloexec(read)?,
203            write: clone_fd_and_set_cloexec(write)?,
204            creation_arg,
205            is_non_blocking: None,
206        }))
207    }
208
209    unsafe fn from_fds(read: c_int, write: c_int) -> Client {
210        Client {
211            read: File::from_raw_fd(read),
212            write: File::from_raw_fd(write),
213            creation_arg: ClientCreationArg::Fds { read, write },
214            is_non_blocking: None,
215        }
216    }
217
218    pub fn acquire(&self) -> io::Result<Acquired> {
219        // Ignore interrupts and keep trying if that happens
220        loop {
221            if let Some(token) = self.acquire_allow_interrupts()? {
222                return Ok(token);
223            }
224        }
225    }
226
227    /// Block waiting for a token, returning `None` if we're interrupted with
228    /// EINTR.
229    fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
230        // We don't actually know if the file descriptor here is set in
231        // blocking or nonblocking mode. AFAIK all released versions of
232        // `make` use blocking fds for the jobserver, but the unreleased
233        // version of `make` doesn't. In the unreleased version jobserver
234        // fds are set to nonblocking and combined with `pselect`
235        // internally.
236        //
237        // Here we try to be compatible with both strategies. We optimistically
238        // try to read from the file descriptor which then may block, return
239        // a token or indicate that polling is needed.
240        // Blocking reads (if possible) allows the kernel to be more selective
241        // about which readers to wake up when a token is written to the pipe.
242        //
243        // We use `poll` here to block this thread waiting for read
244        // readiness, and then afterwards we perform the `read` itself. If
245        // the `read` returns that it would block then we start over and try
246        // again.
247        //
248        // Also note that we explicitly don't handle EINTR here. That's used
249        // to shut us down, so we otherwise punt all errors upwards.
250        unsafe {
251            let mut fd: libc::pollfd = mem::zeroed();
252            let mut read = &self.read;
253            fd.fd = read.as_raw_fd();
254            fd.events = libc::POLLIN;
255            loop {
256                let mut buf = [0];
257                match read.read(&mut buf) {
258                    Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
259                    Ok(_) => {
260                        return Err(io::Error::new(
261                            io::ErrorKind::UnexpectedEof,
262                            "early EOF on jobserver pipe",
263                        ));
264                    }
265                    Err(e) => match e.kind() {
266                        io::ErrorKind::WouldBlock => { /* fall through to polling */ }
267                        io::ErrorKind::Interrupted => return Ok(None),
268                        _ => return Err(e),
269                    },
270                }
271
272                loop {
273                    fd.revents = 0;
274                    if libc::poll(&mut fd, 1, -1) == -1 {
275                        let e = io::Error::last_os_error();
276                        return match e.kind() {
277                            io::ErrorKind::Interrupted => Ok(None),
278                            _ => Err(e),
279                        };
280                    }
281                    if fd.revents != 0 {
282                        break;
283                    }
284                }
285            }
286        }
287    }
288
289    pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
290        let mut buf = [0];
291        let mut fifo = &self.read;
292
293        if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
294            if !is_non_blocking.load(Ordering::Relaxed) {
295                set_nonblocking(fifo.as_raw_fd(), true)?;
296                is_non_blocking.store(true, Ordering::Relaxed);
297            }
298        } else {
299            return Err(io::ErrorKind::Unsupported.into());
300        }
301
302        loop {
303            match fifo.read(&mut buf) {
304                Ok(1) => break Ok(Some(Acquired { byte: buf[0] })),
305                Ok(_) => {
306                    break Err(io::Error::new(
307                        io::ErrorKind::UnexpectedEof,
308                        "early EOF on jobserver pipe",
309                    ))
310                }
311
312                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None),
313                Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
314
315                Err(err) => break Err(err),
316            }
317        }
318    }
319
320    pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
321        // Note that the fd may be nonblocking but we're going to go ahead
322        // and assume that the writes here are always nonblocking (we can
323        // always quickly release a token). If that turns out to not be the
324        // case we'll get an error anyway!
325        let byte = data.map(|d| d.byte).unwrap_or(b'+');
326        match (&self.write).write(&[byte])? {
327            1 => Ok(()),
328            _ => Err(io::Error::new(
329                io::ErrorKind::Other,
330                "failed to write token back to jobserver",
331            )),
332        }
333    }
334
335    pub fn string_arg(&self) -> String {
336        match &self.creation_arg {
337            ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
338            ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
339        }
340    }
341
342    pub fn available(&self) -> io::Result<usize> {
343        let mut len = MaybeUninit::<c_int>::uninit();
344        cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
345        Ok(unsafe { len.assume_init() } as usize)
346    }
347
348    pub fn configure(&self, cmd: &mut Command) {
349        if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
350            // We `File::open`ed it when inheriting from environment,
351            // so no need to set cloexec for fifo.
352            return;
353        }
354        // Here we basically just want to say that in the child process
355        // we'll configure the read/write file descriptors to *not* be
356        // cloexec, so they're inherited across the exec and specified as
357        // integers through `string_arg` above.
358        let read = self.read.as_raw_fd();
359        let write = self.write.as_raw_fd();
360        unsafe {
361            cmd.pre_exec(move || {
362                set_cloexec(read, false)?;
363                set_cloexec(write, false)?;
364                Ok(())
365            });
366        }
367    }
368}
369
370#[derive(Debug)]
371pub struct Helper {
372    thread: JoinHandle<()>,
373    state: Arc<super::HelperState>,
374}
375
376pub(crate) fn spawn_helper(
377    client: crate::Client,
378    state: Arc<super::HelperState>,
379    mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
380) -> io::Result<Helper> {
381    #[cfg(not(miri))]
382    {
383        use std::sync::Once;
384
385        static USR1_INIT: Once = Once::new();
386        let mut err = None;
387        USR1_INIT.call_once(|| unsafe {
388            let mut new: libc::sigaction = mem::zeroed();
389            new.sa_sigaction = sigusr1_handler as usize;
390            new.sa_flags = libc::SA_SIGINFO as _;
391            if libc::sigaction(libc::SIGUSR1, &new, std::ptr::null_mut()) != 0 {
392                err = Some(io::Error::last_os_error());
393            }
394        });
395
396        if let Some(e) = err.take() {
397            return Err(e);
398        }
399    }
400
401    let state2 = state.clone();
402    let thread = Builder::new().spawn(move || {
403        state2.for_each_request(|helper| loop {
404            match client.inner.acquire_allow_interrupts() {
405                Ok(Some(data)) => {
406                    break f(Ok(crate::Acquired {
407                        client: client.inner.clone(),
408                        data,
409                        disabled: false,
410                    }));
411                }
412                Err(e) => break f(Err(e)),
413                Ok(None) if helper.lock().producer_done => break,
414                Ok(None) => {}
415            }
416        });
417    })?;
418
419    Ok(Helper { thread, state })
420}
421
422impl Helper {
423    pub fn join(self) {
424        let dur = Duration::from_millis(10);
425        let mut state = self.state.lock();
426        debug_assert!(state.producer_done);
427
428        // We need to join our helper thread, and it could be blocked in one
429        // of two locations. First is the wait for a request, but the
430        // initial drop of `HelperState` will take care of that. Otherwise
431        // it may be blocked in `client.acquire()`. We actually have no way
432        // of interrupting that, so resort to `pthread_kill` as a fallback.
433        // This signal should interrupt any blocking `read` call with
434        // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
435        //
436        // Note that we don't do this forever though since there's a chance
437        // of bugs, so only do this opportunistically to make a best effort
438        // at clearing ourselves up.
439        for _ in 0..100 {
440            if state.consumer_done {
441                break;
442            }
443            #[cfg(not(miri))]
444            unsafe {
445                // Ignore the return value here of `pthread_kill`,
446                // apparently on OSX if you kill a dead thread it will
447                // return an error, but on other platforms it may not. In
448                // that sense we don't actually know if this will succeed or
449                // not!
450                libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
451            }
452            state = self
453                .state
454                .cvar
455                .wait_timeout(state, dur)
456                .unwrap_or_else(|e| e.into_inner())
457                .0;
458            thread::yield_now(); // we really want the other thread to run
459        }
460
461        // If we managed to actually see the consumer get done, then we can
462        // definitely wait for the thread. Otherwise it's... off in the ether
463        // I guess?
464        if state.consumer_done {
465            drop(self.thread.join());
466        }
467    }
468}
469
470unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
471    match libc::fcntl(fd, libc::F_GETFD) {
472        -1 => Err(FromEnvErrorInner::CannotOpenFd(
473            fd,
474            io::Error::last_os_error(),
475        )),
476        _ => Ok(()),
477    }
478}
479
480unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
481    if check_pipe {
482        let mut stat = mem::zeroed();
483        if libc::fstat(fd, &mut stat) == -1 {
484            let last_os_error = io::Error::last_os_error();
485            fcntl_check(fd)?;
486            Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
487        } else {
488            // On android arm and i686 mode_t is u16 and st_mode is u32,
489            // this generates a type mismatch when S_IFIFO (declared as mode_t)
490            // is used in operations with st_mode, so we use this workaround
491            // to get the value of S_IFIFO with the same type of st_mode.
492            #[allow(unused_assignments)]
493            let mut s_ififo = stat.st_mode;
494            s_ififo = libc::S_IFIFO as _;
495            if stat.st_mode & s_ififo == s_ififo {
496                return Ok(());
497            }
498            Err(FromEnvErrorInner::NotAPipe(fd, None))
499        }
500    } else {
501        fcntl_check(fd)
502    }
503}
504
505fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> {
506    // Safety: fd is a valid fd and it remains open until returns
507    unsafe { BorrowedFd::borrow_raw(fd) }
508        .try_clone_to_owned()
509        .map(File::from)
510        .map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err))
511}
512
513fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
514    unsafe {
515        let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
516        let new = if set {
517            previous | libc::FD_CLOEXEC
518        } else {
519            previous & !libc::FD_CLOEXEC
520        };
521        if new != previous {
522            cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
523        }
524        Ok(())
525    }
526}
527
528fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
529    let status_flag = if set { libc::O_NONBLOCK } else { 0 };
530
531    unsafe {
532        cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
533    }
534
535    Ok(())
536}
537
538fn cvt(t: c_int) -> io::Result<c_int> {
539    if t == -1 {
540        Err(io::Error::last_os_error())
541    } else {
542        Ok(t)
543    }
544}
545
546#[cfg(not(miri))]
547extern "C" fn sigusr1_handler(
548    _signum: c_int,
549    _info: *mut libc::siginfo_t,
550    _ptr: *mut libc::c_void,
551) {
552    // nothing to do
553}
554
555#[cfg(test)]
556mod test {
557    use super::Client as ClientImp;
558
559    use crate::{test::run_named_fifo_try_acquire_tests, Client};
560
561    use std::{fs::File, io::Write, os::unix::io::AsRawFd, sync::Arc};
562
563    fn from_imp_client(imp: ClientImp) -> Client {
564        Client {
565            inner: Arc::new(imp),
566        }
567    }
568
569    fn new_client_from_fifo() -> (Client, String) {
570        let file = tempfile::NamedTempFile::new().unwrap();
571        let fifo_path = file.path().to_owned();
572        file.close().unwrap(); // Remove the NamedTempFile to create fifo
573
574        nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();
575
576        let arg = format!("fifo:{}", fifo_path.to_str().unwrap());
577
578        (
579            ClientImp::from_fifo(&arg)
580                .unwrap()
581                .map(from_imp_client)
582                .unwrap(),
583            arg,
584        )
585    }
586
587    fn new_client_from_pipe() -> (Client, String) {
588        let (read, write) = nix::unistd::pipe().unwrap();
589        let read = File::from(read);
590        let mut write = File::from(write);
591
592        write.write_all(b"1").unwrap();
593
594        let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());
595
596        (
597            unsafe { ClientImp::from_pipe(&arg, true) }
598                .unwrap()
599                .map(from_imp_client)
600                .unwrap(),
601            arg,
602        )
603    }
604
605    #[test]
606    fn test_try_acquire_named_fifo() {
607        run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
608    }
609
610    #[test]
611    fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
612        #[cfg(not(target_os = "linux"))]
613        assert_eq!(
614            new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
615            std::io::ErrorKind::Unsupported
616        );
617
618        #[cfg(target_os = "linux")]
619        {
620            let client = new_client_from_pipe().0;
621            client.acquire().unwrap().drop_without_releasing();
622            run_named_fifo_try_acquire_tests(&client);
623        }
624    }
625
626    #[test]
627    fn test_string_arg() {
628        let (client, arg) = new_client_from_fifo();
629        assert_eq!(client.inner.string_arg(), arg);
630
631        let (client, arg) = new_client_from_pipe();
632        assert_eq!(client.inner.string_arg(), arg);
633    }
634}