jobserver/
unix.rs

1use libc::c_int;
2
3use crate::FromEnvErrorInner;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::mem;
7use std::mem::MaybeUninit;
8use std::os::unix::prelude::*;
9use std::path::Path;
10use std::process::Command;
11use std::ptr;
12use std::sync::{
13    atomic::{AtomicBool, Ordering},
14    Arc, Once,
15};
16use std::thread::{self, Builder, JoinHandle};
17use std::time::Duration;
18
19#[derive(Debug)]
20/// This preserves the `--jobserver-auth` type at creation time,
21/// so auth type will be passed down to and inherit from sub-Make processes correctly.
22///
23/// See <https://github.com/rust-lang/jobserver-rs/issues/99> for details.
24enum ClientCreationArg {
25    Fds { read: c_int, write: c_int },
26    Fifo(Box<Path>),
27}
28
29#[derive(Debug)]
30pub struct Client {
31    read: File,
32    write: File,
33    creation_arg: ClientCreationArg,
34    /// It is set to `None` if the pipe is shared with other processes, so it
35    /// cannot support non-blocking mode.
36    ///
37    /// If it is set to `Some`, then it can only go from
38    /// `Some(false)` -> `Some(true)` but not the other way around,
39    /// since that could cause a race condition.
40    is_non_blocking: Option<AtomicBool>,
41}
42
43#[derive(Debug)]
44pub struct Acquired {
45    byte: u8,
46}
47
48impl Client {
49    pub fn new(mut limit: usize) -> io::Result<Client> {
50        let client = unsafe { Client::mk()? };
51
52        // I don't think the character written here matters, but I could be
53        // wrong!
54        const BUFFER: [u8; 128] = [b'|'; 128];
55
56        let mut write = &client.write;
57
58        set_nonblocking(write.as_raw_fd(), true)?;
59
60        while limit > 0 {
61            let n = limit.min(BUFFER.len());
62
63            write.write_all(&BUFFER[..n])?;
64            limit -= n;
65        }
66
67        set_nonblocking(write.as_raw_fd(), false)?;
68
69        Ok(client)
70    }
71
72    unsafe fn mk() -> io::Result<Client> {
73        let mut pipes = [0; 2];
74
75        // Attempt atomically-create-with-cloexec if we can on Linux,
76        // detected by using the `syscall` function in `libc` to try to work
77        // with as many kernels/glibc implementations as possible.
78        #[cfg(target_os = "linux")]
79        {
80            static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
81            if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
82                match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
83                    -1 => {
84                        let err = io::Error::last_os_error();
85                        if err.raw_os_error() == Some(libc::ENOSYS) {
86                            PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
87                        } else {
88                            return Err(err);
89                        }
90                    }
91                    _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
92                }
93            }
94        }
95
96        cvt(libc::pipe(pipes.as_mut_ptr()))?;
97        drop(set_cloexec(pipes[0], true));
98        drop(set_cloexec(pipes[1], true));
99        Ok(Client::from_fds(pipes[0], pipes[1]))
100    }
101
102    pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
103        if let Some(client) = Self::from_fifo(s)? {
104            return Ok(client);
105        }
106        if let Some(client) = Self::from_pipe(s, check_pipe)? {
107            return Ok(client);
108        }
109        Err(FromEnvErrorInner::CannotParse(format!(
110            "expected `fifo:PATH` or `R,W`, found `{s}`"
111        )))
112    }
113
114    /// `--jobserver-auth=fifo:PATH`
115    fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
116        let mut parts = s.splitn(2, ':');
117        if parts.next().unwrap() != "fifo" {
118            return Ok(None);
119        }
120        let path_str = parts.next().ok_or_else(|| {
121            FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
122        })?;
123        let path = Path::new(path_str);
124
125        let open_file = || {
126            // Opening with read write is necessary, since opening with
127            // read-only or write-only could block the thread until another
128            // thread opens it with write-only or read-only (or RDWR)
129            // correspondingly.
130            OpenOptions::new()
131                .read(true)
132                .write(true)
133                .open(path)
134                .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
135        };
136
137        Ok(Some(Client {
138            read: open_file()?,
139            write: open_file()?,
140            creation_arg: ClientCreationArg::Fifo(path.into()),
141            is_non_blocking: Some(AtomicBool::new(false)),
142        }))
143    }
144
145    /// `--jobserver-auth=R,W`
146    unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
147        let mut parts = s.splitn(2, ',');
148        let read = parts.next().unwrap();
149        let write = match parts.next() {
150            Some(w) => w,
151            None => return Ok(None),
152        };
153        let read = read
154            .parse()
155            .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
156        let write = write
157            .parse()
158            .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
159
160        // If either or both of these file descriptors are negative,
161        // it means the jobserver is disabled for this process.
162        if read < 0 {
163            return Err(FromEnvErrorInner::NegativeFd(read));
164        }
165        if write < 0 {
166            return Err(FromEnvErrorInner::NegativeFd(write));
167        }
168
169        let creation_arg = ClientCreationArg::Fds { read, write };
170
171        // Ok so we've got two integers that look like file descriptors, but
172        // for extra sanity checking let's see if they actually look like
173        // valid files and instances of a pipe if feature enabled before we
174        // return the client.
175        //
176        // If we're called from `make` *without* the leading + on our rule
177        // then we'll have `MAKEFLAGS` env vars but won't actually have
178        // access to the file descriptors.
179        //
180        // `NotAPipe` is a worse error, return it if it's reported for any of the two fds.
181        match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
182            (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
183            (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
184            (read_err, write_err) => {
185                read_err?;
186                write_err?;
187
188                // Optimization: Try converting it to a fifo by using /dev/fd
189                //
190                // On linux, opening `/dev/fd/$fd` returns a fd with a new file description,
191                // so we can set `O_NONBLOCK` on it without affecting other processes.
192                //
193                // On macOS, opening `/dev/fd/$fd` seems to be the same as `File::try_clone`.
194                //
195                // I tested this on macOS 14 and Linux 6.5.13
196                #[cfg(target_os = "linux")]
197                if let (Ok(read), Ok(write)) = (
198                    File::open(format!("/dev/fd/{}", read)),
199                    OpenOptions::new()
200                        .write(true)
201                        .open(format!("/dev/fd/{}", write)),
202                ) {
203                    return Ok(Some(Client {
204                        read,
205                        write,
206                        creation_arg,
207                        is_non_blocking: Some(AtomicBool::new(false)),
208                    }));
209                }
210            }
211        }
212
213        Ok(Some(Client {
214            read: clone_fd_and_set_cloexec(read)?,
215            write: clone_fd_and_set_cloexec(write)?,
216            creation_arg,
217            is_non_blocking: None,
218        }))
219    }
220
221    unsafe fn from_fds(read: c_int, write: c_int) -> Client {
222        Client {
223            read: File::from_raw_fd(read),
224            write: File::from_raw_fd(write),
225            creation_arg: ClientCreationArg::Fds { read, write },
226            is_non_blocking: None,
227        }
228    }
229
230    pub fn acquire(&self) -> io::Result<Acquired> {
231        // Ignore interrupts and keep trying if that happens
232        loop {
233            if let Some(token) = self.acquire_allow_interrupts()? {
234                return Ok(token);
235            }
236        }
237    }
238
239    /// Block waiting for a token, returning `None` if we're interrupted with
240    /// EINTR.
241    fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
242        // We don't actually know if the file descriptor here is set in
243        // blocking or nonblocking mode. AFAIK all released versions of
244        // `make` use blocking fds for the jobserver, but the unreleased
245        // version of `make` doesn't. In the unreleased version jobserver
246        // fds are set to nonblocking and combined with `pselect`
247        // internally.
248        //
249        // Here we try to be compatible with both strategies. We optimistically
250        // try to read from the file descriptor which then may block, return
251        // a token or indicate that polling is needed.
252        // Blocking reads (if possible) allows the kernel to be more selective
253        // about which readers to wake up when a token is written to the pipe.
254        //
255        // We use `poll` here to block this thread waiting for read
256        // readiness, and then afterwards we perform the `read` itself. If
257        // the `read` returns that it would block then we start over and try
258        // again.
259        //
260        // Also note that we explicitly don't handle EINTR here. That's used
261        // to shut us down, so we otherwise punt all errors upwards.
262        unsafe {
263            let mut fd: libc::pollfd = mem::zeroed();
264            let mut read = &self.read;
265            fd.fd = read.as_raw_fd();
266            fd.events = libc::POLLIN;
267            loop {
268                let mut buf = [0];
269                match read.read(&mut buf) {
270                    Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
271                    Ok(_) => {
272                        return Err(io::Error::new(
273                            io::ErrorKind::UnexpectedEof,
274                            "early EOF on jobserver pipe",
275                        ));
276                    }
277                    Err(e) => match e.kind() {
278                        io::ErrorKind::WouldBlock => { /* fall through to polling */ }
279                        io::ErrorKind::Interrupted => return Ok(None),
280                        _ => return Err(e),
281                    },
282                }
283
284                loop {
285                    fd.revents = 0;
286                    if libc::poll(&mut fd, 1, -1) == -1 {
287                        let e = io::Error::last_os_error();
288                        return match e.kind() {
289                            io::ErrorKind::Interrupted => Ok(None),
290                            _ => Err(e),
291                        };
292                    }
293                    if fd.revents != 0 {
294                        break;
295                    }
296                }
297            }
298        }
299    }
300
301    pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
302        let mut buf = [0];
303        let mut fifo = &self.read;
304
305        if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
306            if !is_non_blocking.load(Ordering::Relaxed) {
307                set_nonblocking(fifo.as_raw_fd(), true)?;
308                is_non_blocking.store(true, Ordering::Relaxed);
309            }
310        } else {
311            return Err(io::ErrorKind::Unsupported.into());
312        }
313
314        loop {
315            match fifo.read(&mut buf) {
316                Ok(1) => break Ok(Some(Acquired { byte: buf[0] })),
317                Ok(_) => {
318                    break Err(io::Error::new(
319                        io::ErrorKind::UnexpectedEof,
320                        "early EOF on jobserver pipe",
321                    ))
322                }
323
324                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None),
325                Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
326
327                Err(err) => break Err(err),
328            }
329        }
330    }
331
332    pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
333        // Note that the fd may be nonblocking but we're going to go ahead
334        // and assume that the writes here are always nonblocking (we can
335        // always quickly release a token). If that turns out to not be the
336        // case we'll get an error anyway!
337        let byte = data.map(|d| d.byte).unwrap_or(b'+');
338        match (&self.write).write(&[byte])? {
339            1 => Ok(()),
340            _ => Err(io::Error::new(
341                io::ErrorKind::Other,
342                "failed to write token back to jobserver",
343            )),
344        }
345    }
346
347    pub fn string_arg(&self) -> String {
348        match &self.creation_arg {
349            ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
350            ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
351        }
352    }
353
354    pub fn available(&self) -> io::Result<usize> {
355        let mut len = MaybeUninit::<c_int>::uninit();
356        cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
357        Ok(unsafe { len.assume_init() } as usize)
358    }
359
360    pub fn configure(&self, cmd: &mut Command) {
361        if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
362            // We `File::open`ed it when inheriting from environment,
363            // so no need to set cloexec for fifo.
364            return;
365        }
366        // Here we basically just want to say that in the child process
367        // we'll configure the read/write file descriptors to *not* be
368        // cloexec, so they're inherited across the exec and specified as
369        // integers through `string_arg` above.
370        let read = self.read.as_raw_fd();
371        let write = self.write.as_raw_fd();
372        unsafe {
373            cmd.pre_exec(move || {
374                set_cloexec(read, false)?;
375                set_cloexec(write, false)?;
376                Ok(())
377            });
378        }
379    }
380}
381
382#[derive(Debug)]
383pub struct Helper {
384    thread: JoinHandle<()>,
385    state: Arc<super::HelperState>,
386}
387
388pub(crate) fn spawn_helper(
389    client: crate::Client,
390    state: Arc<super::HelperState>,
391    mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
392) -> io::Result<Helper> {
393    static USR1_INIT: Once = Once::new();
394    let mut err = None;
395    USR1_INIT.call_once(|| unsafe {
396        let mut new: libc::sigaction = mem::zeroed();
397        #[cfg(target_os = "aix")]
398        {
399            new.sa_union.__su_sigaction = sigusr1_handler;
400        }
401        #[cfg(not(target_os = "aix"))]
402        {
403            new.sa_sigaction = sigusr1_handler as usize;
404        }
405        new.sa_flags = libc::SA_SIGINFO as _;
406        if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
407            err = Some(io::Error::last_os_error());
408        }
409    });
410
411    if let Some(e) = err.take() {
412        return Err(e);
413    }
414
415    let state2 = state.clone();
416    let thread = Builder::new().spawn(move || {
417        state2.for_each_request(|helper| loop {
418            match client.inner.acquire_allow_interrupts() {
419                Ok(Some(data)) => {
420                    break f(Ok(crate::Acquired {
421                        client: client.inner.clone(),
422                        data,
423                        disabled: false,
424                    }));
425                }
426                Err(e) => break f(Err(e)),
427                Ok(None) if helper.lock().producer_done => break,
428                Ok(None) => {}
429            }
430        });
431    })?;
432
433    Ok(Helper { thread, state })
434}
435
436impl Helper {
437    pub fn join(self) {
438        let dur = Duration::from_millis(10);
439        let mut state = self.state.lock();
440        debug_assert!(state.producer_done);
441
442        // We need to join our helper thread, and it could be blocked in one
443        // of two locations. First is the wait for a request, but the
444        // initial drop of `HelperState` will take care of that. Otherwise
445        // it may be blocked in `client.acquire()`. We actually have no way
446        // of interrupting that, so resort to `pthread_kill` as a fallback.
447        // This signal should interrupt any blocking `read` call with
448        // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
449        //
450        // Note that we don't do this forever though since there's a chance
451        // of bugs, so only do this opportunistically to make a best effort
452        // at clearing ourselves up.
453        for _ in 0..100 {
454            if state.consumer_done {
455                break;
456            }
457            unsafe {
458                // Ignore the return value here of `pthread_kill`,
459                // apparently on OSX if you kill a dead thread it will
460                // return an error, but on other platforms it may not. In
461                // that sense we don't actually know if this will succeed or
462                // not!
463                libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
464            }
465            state = self
466                .state
467                .cvar
468                .wait_timeout(state, dur)
469                .unwrap_or_else(|e| e.into_inner())
470                .0;
471            thread::yield_now(); // we really want the other thread to run
472        }
473
474        // If we managed to actually see the consumer get done, then we can
475        // definitely wait for the thread. Otherwise it's... off in the ether
476        // I guess?
477        if state.consumer_done {
478            drop(self.thread.join());
479        }
480    }
481}
482
483unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
484    match libc::fcntl(fd, libc::F_GETFD) {
485        -1 => Err(FromEnvErrorInner::CannotOpenFd(
486            fd,
487            io::Error::last_os_error(),
488        )),
489        _ => Ok(()),
490    }
491}
492
493unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
494    if check_pipe {
495        let mut stat = mem::zeroed();
496        if libc::fstat(fd, &mut stat) == -1 {
497            let last_os_error = io::Error::last_os_error();
498            fcntl_check(fd)?;
499            Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
500        } else {
501            // On android arm and i686 mode_t is u16 and st_mode is u32,
502            // this generates a type mismatch when S_IFIFO (declared as mode_t)
503            // is used in operations with st_mode, so we use this workaround
504            // to get the value of S_IFIFO with the same type of st_mode.
505            #[allow(unused_assignments)]
506            let mut s_ififo = stat.st_mode;
507            s_ififo = libc::S_IFIFO as _;
508            if stat.st_mode & s_ififo == s_ififo {
509                return Ok(());
510            }
511            Err(FromEnvErrorInner::NotAPipe(fd, None))
512        }
513    } else {
514        fcntl_check(fd)
515    }
516}
517
518fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> {
519    // Safety: fd is a valid fd dand it remains open until returns
520    unsafe { BorrowedFd::borrow_raw(fd) }
521        .try_clone_to_owned()
522        .map(File::from)
523        .map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err))
524}
525
526fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
527    unsafe {
528        let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
529        let new = if set {
530            previous | libc::FD_CLOEXEC
531        } else {
532            previous & !libc::FD_CLOEXEC
533        };
534        if new != previous {
535            cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
536        }
537        Ok(())
538    }
539}
540
541fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
542    let status_flag = if set { libc::O_NONBLOCK } else { 0 };
543
544    unsafe {
545        cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
546    }
547
548    Ok(())
549}
550
551fn cvt(t: c_int) -> io::Result<c_int> {
552    if t == -1 {
553        Err(io::Error::last_os_error())
554    } else {
555        Ok(t)
556    }
557}
558
559extern "C" fn sigusr1_handler(
560    _signum: c_int,
561    _info: *mut libc::siginfo_t,
562    _ptr: *mut libc::c_void,
563) {
564    // nothing to do
565}
566
567#[cfg(test)]
568mod test {
569    use super::Client as ClientImp;
570
571    use crate::{test::run_named_fifo_try_acquire_tests, Client};
572
573    use std::{
574        fs::File,
575        io::{self, Write},
576        os::unix::io::AsRawFd,
577        sync::Arc,
578    };
579
580    fn from_imp_client(imp: ClientImp) -> Client {
581        Client {
582            inner: Arc::new(imp),
583        }
584    }
585
586    fn new_client_from_fifo() -> (Client, String) {
587        let file = tempfile::NamedTempFile::new().unwrap();
588        let fifo_path = file.path().to_owned();
589        file.close().unwrap(); // Remove the NamedTempFile to create fifo
590
591        nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();
592
593        let arg = format!("fifo:{}", fifo_path.to_str().unwrap());
594
595        (
596            ClientImp::from_fifo(&arg)
597                .unwrap()
598                .map(from_imp_client)
599                .unwrap(),
600            arg,
601        )
602    }
603
604    fn new_client_from_pipe() -> (Client, String) {
605        let (read, write) = nix::unistd::pipe().unwrap();
606        let read = File::from(read);
607        let mut write = File::from(write);
608
609        write.write_all(b"1").unwrap();
610
611        let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());
612
613        (
614            unsafe { ClientImp::from_pipe(&arg, true) }
615                .unwrap()
616                .map(from_imp_client)
617                .unwrap(),
618            arg,
619        )
620    }
621
622    #[test]
623    fn test_try_acquire_named_fifo() {
624        run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
625    }
626
627    #[test]
628    fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
629        #[cfg(not(target_os = "linux"))]
630        assert_eq!(
631            new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
632            io::ErrorKind::Unsupported
633        );
634
635        #[cfg(target_os = "linux")]
636        {
637            let client = new_client_from_pipe().0;
638            client.acquire().unwrap().drop_without_releasing();
639            run_named_fifo_try_acquire_tests(&client);
640        }
641    }
642
643    #[test]
644    fn test_string_arg() {
645        let (client, arg) = new_client_from_fifo();
646        assert_eq!(client.inner.string_arg(), arg);
647
648        let (client, arg) = new_client_from_pipe();
649        assert_eq!(client.inner.string_arg(), arg);
650    }
651}