signal_hook/iterator/
backend.rs

1//! A backend module for implementing the iterator like
2//! [`iterator`][crate::iterator] module and the asynchronous
3//! adapter crates.
4//!
5//! This module contains generic types which abstract over the concrete
6//! IO type for the self-pipe. The motivation for having this abstraction
7//! are the adapter crates for different asynchronous runtimes. The runtimes
8//! provide their own wrappers for [`std::os::unix::net::UnixStream`]
9//! which should be used as the internal self pipe. But large parts of the
10//! remaining functionality doesn't depend directly onto the IO type and can
11//! be reused.
12//!
13//! See also the [`SignalDelivery::with_pipe`] method for more information
14//! about requirements the IO types have to fulfill.
15//!
16//! As a regular user you shouldn't need to use the types in this module.
17//! Use the [`Signals`][crate::iterator::Signals] struct or one of the types
18//! contained in the adapter libraries instead.
19
20use std::borrow::{Borrow, BorrowMut};
21use std::fmt::{Debug, Formatter, Result as FmtResult};
22use std::io::Error;
23use std::mem::MaybeUninit;
24use std::os::unix::io::AsRawFd;
25use std::ptr;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28
29use libc::{self, c_int};
30
31use super::exfiltrator::Exfiltrator;
32use crate::low_level::pipe::{self, WakeMethod};
33use crate::SigId;
34
35/// Maximal signal number we support.
36const MAX_SIGNUM: usize = 128;
37
38trait SelfPipeWrite: Debug + Send + Sync {
39    fn wake_readers(&self);
40}
41
42impl<W: AsRawFd + Debug + Send + Sync> SelfPipeWrite for W {
43    fn wake_readers(&self) {
44        pipe::wake(self.as_raw_fd(), WakeMethod::Send);
45    }
46}
47
48#[derive(Debug)]
49struct DeliveryState {
50    closed: AtomicBool,
51    registered_signal_ids: Mutex<Vec<Option<SigId>>>,
52}
53
54impl DeliveryState {
55    fn new() -> Self {
56        let ids = (0..MAX_SIGNUM).map(|_| None).collect();
57        Self {
58            closed: AtomicBool::new(false),
59            registered_signal_ids: Mutex::new(ids),
60        }
61    }
62}
63
64impl Drop for DeliveryState {
65    fn drop(&mut self) {
66        let lock = self.registered_signal_ids.lock().unwrap();
67        for id in lock.iter().filter_map(|s| *s) {
68            crate::low_level::unregister(id);
69        }
70    }
71}
72
73struct PendingSignals<E: Exfiltrator> {
74    exfiltrator: E,
75    slots: [E::Storage; MAX_SIGNUM],
76}
77
78impl<E: Exfiltrator> PendingSignals<E> {
79    fn new(exfiltrator: E) -> Self {
80        // Unfortunately, Default is not implemented for long arrays :-(
81        //
82        // Note that if the default impl panics, the already existing instances are leaked.
83        let mut slots = MaybeUninit::<[E::Storage; MAX_SIGNUM]>::uninit();
84        for i in 0..MAX_SIGNUM {
85            unsafe {
86                let slot: *mut E::Storage = slots.as_mut_ptr() as *mut _;
87                let slot = slot.add(i);
88                ptr::write(slot, E::Storage::default());
89            }
90        }
91
92        Self {
93            exfiltrator,
94            slots: unsafe { slots.assume_init() },
95        }
96    }
97}
98
99/// An internal trait to hide adding new signals into a Handle behind a dynamic dispatch.
100trait AddSignal: Debug + Send + Sync {
101    fn add_signal(
102        self: Arc<Self>,
103        write: Arc<dyn SelfPipeWrite>,
104        signal: c_int,
105    ) -> Result<SigId, Error>;
106}
107
108// Implemented manually because 1.36.0 doesn't yet support Debug for [X; BIG_NUMBER].
109impl<E: Exfiltrator> Debug for PendingSignals<E> {
110    fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
111        fmt.debug_struct("PendingSignals")
112            .field("exfiltrator", &self.exfiltrator)
113            // While the array does not, the slice does implement Debug
114            .field("slots", &&self.slots[..])
115            .finish()
116    }
117}
118
119impl<E: Exfiltrator> AddSignal for PendingSignals<E> {
120    fn add_signal(
121        self: Arc<Self>,
122        write: Arc<dyn SelfPipeWrite>,
123        signal: c_int,
124    ) -> Result<SigId, Error> {
125        assert!(signal >= 0);
126        assert!(
127            (signal as usize) < MAX_SIGNUM,
128            "Signal number {} too large. If your OS really supports such signal, file a bug",
129            signal,
130        );
131        assert!(
132            self.exfiltrator.supports_signal(signal),
133            "Signal {} not supported by exfiltrator {:?}",
134            signal,
135            self.exfiltrator,
136        );
137        self.exfiltrator.init(&self.slots[signal as usize], signal);
138
139        let action = move |act: &_| {
140            let slot = &self.slots[signal as usize];
141            let ex = &self.exfiltrator;
142            ex.store(slot, signal, act);
143            write.wake_readers();
144        };
145        let id = unsafe { signal_hook_registry::register_sigaction(signal, action) }?;
146        Ok(id)
147    }
148}
149
150/// A struct to control an instance of an associated type
151/// (like for example [`Signals`][super::Signals]).
152///
153/// It allows to register more signal handlers and to shutdown the signal
154/// delivery. You can [`clone`][Handle::clone] this type which isn't a
155/// very expensive operation. The cloned instances can be shared between
156/// multiple threads.
157#[derive(Debug, Clone)]
158pub struct Handle {
159    pending: Arc<dyn AddSignal>,
160    write: Arc<dyn SelfPipeWrite>,
161    delivery_state: Arc<DeliveryState>,
162}
163
164impl Handle {
165    fn new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self
166    where
167        W: 'static + SelfPipeWrite,
168    {
169        Self {
170            pending,
171            write: Arc::new(write),
172            delivery_state: Arc::new(DeliveryState::new()),
173        }
174    }
175
176    /// Registers another signal to the set watched by the associated instance.
177    ///
178    /// # Notes
179    ///
180    /// * This is safe to call concurrently from whatever thread.
181    /// * This is *not* safe to call from within a signal handler.
182    /// * If the signal number was already registered previously, this is a no-op.
183    /// * If this errors, the original set of signals is left intact.
184    ///
185    /// # Panics
186    ///
187    /// * If the given signal is [forbidden][crate::FORBIDDEN].
188    /// * If the signal number is negative or larger than internal limit. The limit should be
189    ///   larger than any supported signal the OS supports.
190    /// * If the relevant [`Exfiltrator`] does not support this particular signal. The default
191    ///   [`SignalOnly`] one supports all signals.
192    pub fn add_signal(&self, signal: c_int) -> Result<(), Error> {
193        let mut lock = self.delivery_state.registered_signal_ids.lock().unwrap();
194        // Already registered, ignoring
195        if lock[signal as usize].is_some() {
196            return Ok(());
197        }
198
199        let id = Arc::clone(&self.pending).add_signal(Arc::clone(&self.write), signal)?;
200
201        lock[signal as usize] = Some(id);
202
203        Ok(())
204    }
205
206    /// Closes the associated instance.
207    ///
208    /// This is meant to signalize termination of the signal delivery process.
209    /// After calling close:
210    ///
211    /// * [`is_closed`][Handle::is_closed] will return true.
212    /// * All currently blocking operations of associated instances
213    ///   are interrupted and terminate.
214    /// * Any further operations will not block.
215    /// * Further signals may or may not be returned from the iterators. However, if any are
216    ///   returned, these are real signals that happened.
217    ///
218    /// The goal is to be able to shut down any background thread that handles only the signals.
219    pub fn close(&self) {
220        self.delivery_state.closed.store(true, Ordering::SeqCst);
221        self.write.wake_readers();
222    }
223
224    /// Is it closed?
225    ///
226    /// See [`close`][Handle::close].
227    pub fn is_closed(&self) -> bool {
228        self.delivery_state.closed.load(Ordering::SeqCst)
229    }
230}
231
232/// A struct for delivering received signals to the main program flow.
233/// The self-pipe IO type is generic. See the
234/// [`with_pipe`][SignalDelivery::with_pipe] method for requirements
235/// for the IO type.
236#[derive(Debug)]
237pub struct SignalDelivery<R, E: Exfiltrator> {
238    read: R,
239    handle: Handle,
240    pending: Arc<PendingSignals<E>>,
241}
242
243impl<R, E: Exfiltrator> SignalDelivery<R, E>
244where
245    R: 'static + AsRawFd + Send + Sync,
246{
247    /// Creates the `SignalDelivery` structure.
248    ///
249    /// The read and write arguments must be the ends of a suitable pipe type. These are used
250    /// for communication between the signal handler and main program flow.
251    ///
252    /// Registers all the signals listed. The same restrictions (panics, errors) apply as with
253    /// [`add_signal`][Handle::add_signal].
254    ///
255    /// # Requirements for the pipe type
256    ///
257    /// * Must support [`send`](https://man7.org/linux/man-pages/man2/send.2.html) for
258    ///   asynchronously writing bytes to the write end
259    /// * Must support [`recv`](https://man7.org/linux/man-pages/man2/recv.2.html) for
260    ///   reading bytes from the read end
261    ///
262    /// So UnixStream is a good choice for this.
263    pub fn with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error>
264    where
265        I: IntoIterator<Item = S>,
266        S: Borrow<c_int>,
267        W: 'static + AsRawFd + Debug + Send + Sync,
268    {
269        let pending = Arc::new(PendingSignals::new(exfiltrator));
270        let pending_add_signal = Arc::clone(&pending);
271        let handle = Handle::new(write, pending_add_signal);
272        let me = Self {
273            read,
274            handle,
275            pending,
276        };
277        for sig in signals {
278            me.handle.add_signal(*sig.borrow())?;
279        }
280        Ok(me)
281    }
282
283    /// Get a reference to the read end of the self pipe
284    ///
285    /// You may use this method to register the underlying file descriptor
286    /// with an eventing system (e. g. epoll) to get notified if there are
287    /// bytes in the pipe. If the event system reports the file descriptor
288    /// ready for reading you can then call [`pending`][SignalDelivery::pending]
289    /// to get the arrived signals.
290    pub fn get_read(&self) -> &R {
291        &self.read
292    }
293
294    /// Get a mutable reference to the read end of the self pipe
295    ///
296    /// See the [`get_read`][SignalDelivery::get_read] method for some additional
297    /// information.
298    pub fn get_read_mut(&mut self) -> &mut R {
299        &mut self.read
300    }
301
302    /// Drains all data from the internal self-pipe. This method will never block.
303    fn flush(&mut self) {
304        const SIZE: usize = 1024;
305        let mut buff = [0u8; SIZE];
306
307        unsafe {
308            // Draining the data in the self pipe. We ignore all errors on purpose. This
309            // should not be something like closed file descriptor. It could EAGAIN, but
310            // that's OK in case we say MSG_DONTWAIT. If it's EINTR, then it's OK too,
311            // it'll only create a spurious wakeup.
312            #[cfg(target_os = "aix")]
313            let nowait_flag = libc::MSG_NONBLOCK;
314            #[cfg(not(target_os = "aix"))]
315            let nowait_flag = libc::MSG_DONTWAIT;
316            while libc::recv(
317                self.read.as_raw_fd(),
318                buff.as_mut_ptr() as *mut libc::c_void,
319                SIZE,
320                nowait_flag,
321            ) > 0
322            {}
323        }
324    }
325
326    /// Returns an iterator of already received signals.
327    ///
328    /// This returns an iterator over all the signal numbers of the signals received since last
329    /// time they were read (out of the set registered by this `SignalDelivery` instance). Note
330    /// that they are returned in arbitrary order and a signal number is returned only once even
331    /// if it was received multiple times.
332    ///
333    /// This method returns immediately (does not block) and may produce an empty iterator if
334    /// there are no signals ready.
335    pub fn pending(&mut self) -> Pending<E> {
336        self.flush();
337        Pending::new(Arc::clone(&self.pending))
338    }
339
340    /// Checks the reading end of the self pipe for available signals.
341    ///
342    /// If there are no signals available or this instance was already closed it returns
343    /// [`Option::None`]. If there are some signals it returns a [`Pending`]
344    /// instance wrapped inside a [`Option::Some`]. However, due to implementation details,
345    /// this still can produce an empty iterator.
346    ///
347    /// This method doesn't check the reading end by itself but uses the passed in callback.
348    /// This method blocks if and only if the callback blocks trying to read some bytes.
349    pub fn poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error>
350    where
351        F: FnMut(&mut R) -> Result<bool, Error>,
352    {
353        if self.handle.is_closed() {
354            return Ok(None);
355        }
356
357        match has_signals(self.get_read_mut()) {
358            Ok(false) => Ok(None),
359            Ok(true) => Ok(Some(self.pending())),
360            Err(err) => Err(err),
361        }
362    }
363
364    /// Get a [`Handle`] for this `SignalDelivery` instance.
365    ///
366    /// This can be used to add further signals or close the whole
367    /// signal delivery mechanism.
368    pub fn handle(&self) -> Handle {
369        self.handle.clone()
370    }
371}
372
373/// The iterator of one batch of signals.
374///
375/// This is returned by the [`pending`][SignalDelivery::pending] method.
376#[derive(Debug)]
377pub struct Pending<E: Exfiltrator> {
378    pending: Arc<PendingSignals<E>>,
379    position: usize,
380}
381
382impl<E: Exfiltrator> Pending<E> {
383    fn new(pending: Arc<PendingSignals<E>>) -> Self {
384        Self {
385            pending,
386            position: 0,
387        }
388    }
389}
390
391impl<E: Exfiltrator> Iterator for Pending<E> {
392    type Item = E::Output;
393
394    fn next(&mut self) -> Option<E::Output> {
395        while self.position < self.pending.slots.len() {
396            let sig = self.position;
397            let slot = &self.pending.slots[sig];
398            let result = self.pending.exfiltrator.load(slot, sig as c_int);
399            if result.is_some() {
400                return result;
401            } else {
402                self.position += 1;
403            }
404        }
405
406        None
407    }
408}
409
410/// Possible results of the [`poll_signal`][SignalIterator::poll_signal] function.
411pub enum PollResult<O> {
412    /// A signal arrived
413    Signal(O),
414    /// There are no signals yet but there may arrive some in the future
415    Pending,
416    /// The iterator was closed. There won't be any signals reported from now on.
417    Closed,
418    /// An error happened during polling for arrived signals.
419    Err(Error),
420}
421
422/// An infinite iterator of received signals.
423pub struct SignalIterator<SD, E: Exfiltrator> {
424    signals: SD,
425    iter: Pending<E>,
426}
427
428impl<SD, E: Exfiltrator> SignalIterator<SD, E> {
429    /// Create a new infinite iterator for signals registered with the passed
430    /// in [`SignalDelivery`] instance.
431    pub fn new<R>(mut signals: SD) -> Self
432    where
433        SD: BorrowMut<SignalDelivery<R, E>>,
434        R: 'static + AsRawFd + Send + Sync,
435    {
436        let iter = signals.borrow_mut().pending();
437        Self { signals, iter }
438    }
439
440    /// Return a signal if there is one or tell the caller that there is none at the moment.
441    ///
442    /// You have to pass in a callback which checks the underlying reading end of the pipe if
443    /// there may be any pending signals. This callback may or may not block. If the callback
444    /// returns [`true`] this method will try to fetch the next signal and return it as a
445    /// [`PollResult::Signal`]. If the callback returns [`false`] the method will return
446    /// [`PollResult::Pending`] and assume it will be called again at a later point in time.
447    /// The callback may be called any number of times by this function.
448    ///
449    /// If the iterator was closed by the [`close`][Handle::close] method of the associated
450    /// [`Handle`] this method will return [`PollResult::Closed`].
451    pub fn poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output>
452    where
453        SD: BorrowMut<SignalDelivery<R, E>>,
454        R: 'static + AsRawFd + Send + Sync,
455        F: FnMut(&mut R) -> Result<bool, Error>,
456    {
457        // The loop is necessary because it is possible that a signal was already consumed
458        // by a previous pending iterator due to the asynchronous nature of signals and
459        // always moving to the end of the iterator before calling has_more.
460        while !self.signals.borrow_mut().handle.is_closed() {
461            if let Some(result) = self.iter.next() {
462                return PollResult::Signal(result);
463            }
464
465            match self.signals.borrow_mut().poll_pending(has_signals) {
466                Ok(Some(pending)) => self.iter = pending,
467                Ok(None) => return PollResult::Pending,
468                Err(err) => return PollResult::Err(err),
469            }
470        }
471
472        PollResult::Closed
473    }
474
475    /// Get a shareable [`Handle`] for this instance.
476    ///
477    /// This can be used to add further signals or terminate the whole
478    /// signal iteration using the [`close`][Handle::close] method.
479    pub fn handle<R>(&self) -> Handle
480    where
481        SD: Borrow<SignalDelivery<R, E>>,
482        R: 'static + AsRawFd + Send + Sync,
483    {
484        self.signals.borrow().handle()
485    }
486}
487
488/// A signal iterator which consumes a [`SignalDelivery`] instance and takes
489/// ownership of it.
490pub type OwningSignalIterator<R, E> = SignalIterator<SignalDelivery<R, E>, E>;
491
492/// A signal iterator which takes a mutable reference to a [`SignalDelivery`]
493/// instance.
494pub type RefSignalIterator<'a, R, E> = SignalIterator<&'a mut SignalDelivery<R, E>, E>;