signal_hook/low_level/
channel.rs

1//! A restricted channel to pass data from signal handler.
2//!
3//! When trying to communicate data from signal handler to the outside world, one can use an atomic
4//! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
5//! larger data.
6//!
7//! This module provides a channel that can be used for that purpose. It is used by certain
8//! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
9//! actions. In general, this is not a ready-made end-user API.
10//!
11//! # How does it work
12//!
13//! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
14//! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
15//! full one. Outside of signal handler, it can take the value out of the full queue and return the
16//! slot to the empty queue.
17//!
18//! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
19//! stored in an atomic variable.
20//!
21//! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
22//! filled).
23//!
24//! # Fallible allocation of a slot
25//!
26//! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
27//! such case, there's no way to send the new value out of the handler (there's no way to safely
28//! wait for a slot to appear, because the handler can be blocking the thread that is responsible
29//! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
30//! of signals together if they are not consumed by application fast enough and there are no free
31//! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
32//! system will yield a signal.
33//!
34//! This assumes that separate signals don't share the same buffer and that there's only one reader
35//! (using multiple readers is still safe, but it is possible that all slots would be inside the
36//! readers, but already empty, so the above argument would not hold).
37
38// TODO: Other sizes? Does anyone need more than 5 slots?
39
40use std::cell::UnsafeCell;
41use std::sync::atomic::{AtomicU16, Ordering};
42
43const SLOTS: usize = 5;
44const BITS: u16 = 3;
45const MASK: u16 = 0b111;
46
47fn get(n: u16, idx: u16) -> u16 {
48    (n >> (BITS * idx)) & MASK
49}
50
51fn set(n: u16, idx: u16, v: u16) -> u16 {
52    let v = v << (BITS * idx);
53    let mask = MASK << (BITS * idx);
54    (n & !mask) | v
55}
56
57fn enqueue(q: &AtomicU16, val: u16) {
58    let mut current = q.load(Ordering::Relaxed);
59    loop {
60        let empty = (0..SLOTS as u16)
61            .find(|i| get(current, *i) == 0)
62            .expect("No empty slot available");
63        let modified = set(current, empty, val);
64        match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) {
65            Ok(_) => break,
66            Err(changed) => current = changed, // And retry with the changed value
67        }
68    }
69}
70
71fn dequeue(q: &AtomicU16) -> Option<u16> {
72    let mut current = q.load(Ordering::Relaxed);
73    loop {
74        let val = current & MASK;
75        // It's completely empty
76        if val == 0 {
77            break None;
78        }
79        let modified = current >> BITS;
80        match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) {
81            Ok(_) => break Some(val),
82            Err(changed) => current = changed,
83        }
84    }
85}
86
87/// A restricted async-signal-safe channel
88///
89/// This is a bit like the usual channel used for inter-thread communication, but with several
90/// restrictions:
91///
92/// * There's a limited number of slots (currently 5).
93/// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
94///   returned. If there's no space for a value, the value is silently dropped.
95///
96/// In exchange for that, all the operations on that channel are async-signal-safe. That means it
97/// is possible to use it to communicate between a signal handler and the rest of the world with it
98/// (specifically, it's designed to send information from the handler to the rest of the
99/// application). The throwing out of values when full is in line with collating of the same type
100/// in kernel (you should not use the same channel for multiple different signals).
101///
102/// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
103/// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
104/// at the same time). The channel is not responsible for wakeups.
105///
106/// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
107/// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
108///
109/// The code was *not* tuned for performance (signals are not expected to happen often).
110pub struct Channel<T> {
111    storage: [UnsafeCell<Option<T>>; SLOTS],
112    empty: AtomicU16,
113    full: AtomicU16,
114}
115
116impl<T> Channel<T> {
117    /// Creates a new channel with nothing in it.
118    pub fn new() -> Self {
119        let storage = Default::default();
120        let me = Self {
121            storage,
122            empty: AtomicU16::new(0),
123            full: AtomicU16::new(0),
124        };
125
126        for i in 1..SLOTS + 1 {
127            enqueue(&me.empty, i as u16);
128        }
129
130        me
131    }
132
133    /// Inserts a value into the channel.
134    ///
135    /// If the value doesn't fit, it is silently dropped. Never blocks.
136    pub fn send(&self, val: T) {
137        if let Some(empty_idx) = dequeue(&self.empty) {
138            unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
139            enqueue(&self.full, empty_idx);
140        }
141    }
142
143    /// Takes a value from the channel.
144    ///
145    /// Or returns `None` if the channel is empty. Never blocks.
146    pub fn recv(&self) -> Option<T> {
147        dequeue(&self.full).map(|idx| {
148            let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
149                .take()
150                .expect("Full slot with nothing in it");
151            enqueue(&self.empty, idx);
152            result
153        })
154    }
155}
156
157impl<T> Default for Channel<T> {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163unsafe impl<T: Send> Send for Channel<T> {}
164
165// Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
166// on them.
167unsafe impl<T: Send> Sync for Channel<T> {}
168
169#[cfg(test)]
170mod tests {
171    use std::sync::Arc;
172    use std::thread;
173
174    use super::*;
175
176    #[test]
177    fn new_empty() {
178        let channel = Channel::<usize>::new();
179        assert!(channel.recv().is_none());
180        assert!(channel.recv().is_none());
181    }
182
183    #[test]
184    fn pass_value() {
185        let channel = Channel::new();
186        channel.send(42);
187        assert_eq!(42, channel.recv().unwrap());
188        assert!(channel.recv().is_none());
189    }
190
191    #[test]
192    fn multiple() {
193        let channel = Channel::new();
194        for i in 0..1000 {
195            channel.send(i);
196            assert_eq!(i, channel.recv().unwrap());
197            assert!(channel.recv().is_none());
198        }
199    }
200
201    #[test]
202    fn overflow() {
203        let channel = Channel::new();
204        for i in 0..10 {
205            channel.send(i);
206        }
207        for i in 0..5 {
208            assert_eq!(i, channel.recv().unwrap());
209        }
210        assert!(channel.recv().is_none());
211    }
212
213    #[test]
214    fn multi_thread() {
215        let channel = Arc::new(Channel::<usize>::new());
216
217        let sender = thread::spawn({
218            let channel = Arc::clone(&channel);
219            move || {
220                for i in 0..4 {
221                    channel.send(i);
222                }
223            }
224        });
225
226        let mut results = Vec::new();
227        while results.len() < 4 {
228            results.extend(channel.recv());
229        }
230
231        assert_eq!(vec![0, 1, 2, 3], results);
232
233        sender.join().unwrap();
234    }
235}