signal_hook/low_level/channel.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
//! A restricted channel to pass data from signal handler.
//!
//! When trying to communicate data from signal handler to the outside world, one can use an atomic
//! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
//! larger data.
//!
//! This module provides a channel that can be used for that purpose. It is used by certain
//! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
//! actions. In general, this is not a ready-made end-user API.
//!
//! # How does it work
//!
//! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
//! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
//! full one. Outside of signal handler, it can take the value out of the full queue and return the
//! slot to the empty queue.
//!
//! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
//! stored in an atomic variable.
//!
//! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
//! filled).
//!
//! # Fallible allocation of a slot
//!
//! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
//! such case, there's no way to send the new value out of the handler (there's no way to safely
//! wait for a slot to appear, because the handler can be blocking the thread that is responsible
//! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
//! of signals together if they are not consumed by application fast enough and there are no free
//! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
//! system will yield a signal.
//!
//! This assumes that separate signals don't share the same buffer and that there's only one reader
//! (using multiple readers is still safe, but it is possible that all slots would be inside the
//! readers, but already empty, so the above argument would not hold).
// TODO: Other sizes? Does anyone need more than 5 slots?
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicU16, Ordering};
const SLOTS: usize = 5;
const BITS: u16 = 3;
const MASK: u16 = 0b111;
fn get(n: u16, idx: u16) -> u16 {
(n >> (BITS * idx)) & MASK
}
fn set(n: u16, idx: u16, v: u16) -> u16 {
let v = v << (BITS * idx);
let mask = MASK << (BITS * idx);
(n & !mask) | v
}
fn enqueue(q: &AtomicU16, val: u16) {
let mut current = q.load(Ordering::Relaxed);
loop {
let empty = (0..SLOTS as u16)
.find(|i| get(current, *i) == 0)
.expect("No empty slot available");
let modified = set(current, empty, val);
match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) {
Ok(_) => break,
Err(changed) => current = changed, // And retry with the changed value
}
}
}
fn dequeue(q: &AtomicU16) -> Option<u16> {
let mut current = q.load(Ordering::Relaxed);
loop {
let val = current & MASK;
// It's completely empty
if val == 0 {
break None;
}
let modified = current >> BITS;
match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => break Some(val),
Err(changed) => current = changed,
}
}
}
/// A restricted async-signal-safe channel
///
/// This is a bit like the usual channel used for inter-thread communication, but with several
/// restrictions:
///
/// * There's a limited number of slots (currently 5).
/// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
/// returned. If there's no space for a value, the value is silently dropped.
///
/// In exchange for that, all the operations on that channel are async-signal-safe. That means it
/// is possible to use it to communicate between a signal handler and the rest of the world with it
/// (specifically, it's designed to send information from the handler to the rest of the
/// application). The throwing out of values when full is in line with collating of the same type
/// in kernel (you should not use the same channel for multiple different signals).
///
/// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
/// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
/// at the same time). The channel is not responsible for wakeups.
///
/// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
/// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
///
/// The code was *not* tuned for performance (signals are not expected to happen often).
pub struct Channel<T> {
storage: [UnsafeCell<Option<T>>; SLOTS],
empty: AtomicU16,
full: AtomicU16,
}
impl<T> Channel<T> {
/// Creates a new channel with nothing in it.
pub fn new() -> Self {
let storage = Default::default();
let me = Self {
storage,
empty: AtomicU16::new(0),
full: AtomicU16::new(0),
};
for i in 1..SLOTS + 1 {
enqueue(&me.empty, i as u16);
}
me
}
/// Inserts a value into the channel.
///
/// If the value doesn't fit, it is silently dropped. Never blocks.
pub fn send(&self, val: T) {
if let Some(empty_idx) = dequeue(&self.empty) {
unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
enqueue(&self.full, empty_idx);
}
}
/// Takes a value from the channel.
///
/// Or returns `None` if the channel is empty. Never blocks.
pub fn recv(&self) -> Option<T> {
dequeue(&self.full).map(|idx| {
let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
.take()
.expect("Full slot with nothing in it");
enqueue(&self.empty, idx);
result
})
}
}
impl<T> Default for Channel<T> {
fn default() -> Self {
Self::new()
}
}
unsafe impl<T: Send> Send for Channel<T> {}
// Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
// on them.
unsafe impl<T: Send> Sync for Channel<T> {}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread;
use super::*;
#[test]
fn new_empty() {
let channel = Channel::<usize>::new();
assert!(channel.recv().is_none());
assert!(channel.recv().is_none());
}
#[test]
fn pass_value() {
let channel = Channel::new();
channel.send(42);
assert_eq!(42, channel.recv().unwrap());
assert!(channel.recv().is_none());
}
#[test]
fn multiple() {
let channel = Channel::new();
for i in 0..1000 {
channel.send(i);
assert_eq!(i, channel.recv().unwrap());
assert!(channel.recv().is_none());
}
}
#[test]
fn overflow() {
let channel = Channel::new();
for i in 0..10 {
channel.send(i);
}
for i in 0..5 {
assert_eq!(i, channel.recv().unwrap());
}
assert!(channel.recv().is_none());
}
#[test]
fn multi_thread() {
let channel = Arc::new(Channel::<usize>::new());
let sender = thread::spawn({
let channel = Arc::clone(&channel);
move || {
for i in 0..4 {
channel.send(i);
}
}
});
let mut results = Vec::new();
while results.len() < 4 {
results.extend(channel.recv());
}
assert_eq!(vec![0, 1, 2, 3], results);
sender.join().unwrap();
}
}