shadow_rs/host/descriptor/
shared_buf.rs

1//! A buffer for files that need to share a buffer with other files. Example use-cases are pipes and
2//! unix sockets. This buffer supports notifying files when readers or writers are added or removed.
3
4use linux_api::errno::Errno;
5
6use crate::utility::byte_queue::ByteQueue;
7use crate::utility::callback_queue::{CallbackQueue, EventSource, Handle};
8
9pub struct SharedBuf {
10    queue: ByteQueue,
11    max_len: usize,
12    state: BufferState,
13    num_readers: u16,
14    num_writers: u16,
15    event_source: EventSource<(BufferState, BufferState, BufferSignals)>,
16}
17
18impl SharedBuf {
19    pub fn new(max_len: usize) -> Self {
20        assert_ne!(max_len, 0);
21        Self {
22            queue: ByteQueue::new(4096),
23            max_len,
24            state: BufferState::WRITABLE | BufferState::NO_READERS | BufferState::NO_WRITERS,
25            num_readers: 0,
26            num_writers: 0,
27            event_source: EventSource::new(),
28        }
29    }
30
31    pub fn has_data(&self) -> bool {
32        self.queue.has_chunks()
33    }
34
35    pub fn max_len(&self) -> usize {
36        self.max_len
37    }
38
39    pub fn space_available(&self) -> usize {
40        self.max_len - self.queue.num_bytes()
41    }
42
43    /// Register as a reader. The [`ReaderHandle`] must be returned to the buffer later with
44    /// [`remove_reader()`](Self::remove_reader).
45    pub fn add_reader(&mut self, cb_queue: &mut CallbackQueue) -> ReaderHandle {
46        self.num_readers += 1;
47        self.refresh_state(BufferSignals::empty(), cb_queue);
48        ReaderHandle {}
49    }
50
51    pub fn remove_reader(&mut self, handle: ReaderHandle, cb_queue: &mut CallbackQueue) {
52        self.num_readers -= 1;
53        // don't run the handle's drop impl
54        std::mem::forget(handle);
55        self.refresh_state(BufferSignals::empty(), cb_queue);
56    }
57
58    pub fn num_readers(&self) -> u16 {
59        self.num_readers
60    }
61
62    /// Register as a writer. The [`WriterHandle`] must be returned to the buffer later with
63    /// [`remove_writer()`](Self::remove_writer).
64    pub fn add_writer(&mut self, cb_queue: &mut CallbackQueue) -> WriterHandle {
65        self.num_writers += 1;
66        self.refresh_state(BufferSignals::empty(), cb_queue);
67        WriterHandle {}
68    }
69
70    pub fn remove_writer(&mut self, handle: WriterHandle, cb_queue: &mut CallbackQueue) {
71        self.num_writers -= 1;
72        // don't run the handle's drop impl
73        std::mem::forget(handle);
74        self.refresh_state(BufferSignals::empty(), cb_queue);
75    }
76
77    pub fn num_writers(&self) -> u16 {
78        self.num_writers
79    }
80
81    pub fn peek<W: std::io::Write>(&self, bytes: W) -> Result<(usize, usize), std::io::Error> {
82        let (num_copied, num_removed_from_buf) = match self.queue.peek(bytes)? {
83            Some((num_copied, num_removed_from_buf, _chunk_type)) => {
84                (num_copied, num_removed_from_buf)
85            }
86            None => (0, 0),
87        };
88        Ok((num_copied, num_removed_from_buf))
89    }
90
91    pub fn read<W: std::io::Write>(
92        &mut self,
93        bytes: W,
94        cb_queue: &mut CallbackQueue,
95    ) -> Result<(usize, usize), std::io::Error> {
96        let (num_copied, num_removed_from_buf) = match self.queue.pop(bytes)? {
97            Some((num_copied, num_removed_from_buf, _chunk_type)) => {
98                (num_copied, num_removed_from_buf)
99            }
100            None => (0, 0),
101        };
102        self.refresh_state(BufferSignals::empty(), cb_queue);
103
104        Ok((num_copied, num_removed_from_buf))
105    }
106
107    pub fn write_stream<R: std::io::Read>(
108        &mut self,
109        bytes: R,
110        len: usize,
111        cb_queue: &mut CallbackQueue,
112    ) -> Result<usize, std::io::Error> {
113        if len == 0 {
114            return Ok(0);
115        }
116
117        if self.space_available() == 0 {
118            return Err(Errno::EAGAIN.into());
119        }
120
121        let written = self
122            .queue
123            .push_stream(bytes.take(self.space_available().try_into().unwrap()))?;
124
125        let signals = if written > 0 {
126            BufferSignals::BUFFER_GREW
127        } else {
128            BufferSignals::empty()
129        };
130        self.refresh_state(signals, cb_queue);
131
132        Ok(written)
133    }
134
135    pub fn write_packet<R: std::io::Read>(
136        &mut self,
137        mut bytes: R,
138        len: usize,
139        cb_queue: &mut CallbackQueue,
140    ) -> Result<(), std::io::Error> {
141        if len > self.max_len() {
142            // the socket could never send this packet, even if the buffer was empty
143            return Err(Errno::EMSGSIZE.into());
144        }
145
146        if len > self.space_available() {
147            return Err(Errno::EAGAIN.into());
148        }
149
150        self.queue.push_packet(bytes.by_ref(), len)?;
151
152        self.refresh_state(BufferSignals::BUFFER_GREW, cb_queue);
153
154        Ok(())
155    }
156
157    pub fn add_listener(
158        &mut self,
159        monitoring_state: BufferState,
160        monitoring_signals: BufferSignals,
161        notify_fn: impl Fn(BufferState, BufferSignals, &mut CallbackQueue) + Send + Sync + 'static,
162    ) -> BufferHandle {
163        self.event_source
164            .add_listener(move |(state, changed, signals), cb_queue| {
165                // true if any of the bits we're monitoring have changed
166                let flipped = monitoring_state.intersects(changed);
167
168                // filter the signals to only the ones we're monitoring
169                let signals = signals.intersection(monitoring_signals);
170
171                if !flipped && signals.is_empty() {
172                    return;
173                }
174
175                (notify_fn)(state, signals, cb_queue)
176            })
177    }
178
179    pub fn state(&self) -> BufferState {
180        self.state
181    }
182
183    /// Refresh the shared buffer's state and optionally send any signals. These two functionalities
184    /// are combined into a single method since a state change and signals can be emitted as a
185    /// single event, improving performance.
186    fn refresh_state(&mut self, signals: BufferSignals, cb_queue: &mut CallbackQueue) {
187        let state_mask = BufferState::READABLE
188            | BufferState::WRITABLE
189            | BufferState::NO_READERS
190            | BufferState::NO_WRITERS;
191
192        let mut new_state = BufferState::empty();
193
194        new_state.set(BufferState::READABLE, self.has_data());
195        new_state.set(BufferState::WRITABLE, self.space_available() > 0);
196        new_state.set(BufferState::NO_READERS, self.num_readers() == 0);
197        new_state.set(BufferState::NO_WRITERS, self.num_writers() == 0);
198
199        self.update_state(state_mask, new_state, signals, cb_queue);
200    }
201
202    fn update_state(
203        &mut self,
204        mask: BufferState,
205        state: BufferState,
206        signals: BufferSignals,
207        cb_queue: &mut CallbackQueue,
208    ) {
209        let old_state = self.state;
210
211        // remove the masked flags, then copy the masked flags
212        self.state.remove(mask);
213        self.state.insert(state & mask);
214
215        self.handle_state_change(old_state, signals, cb_queue);
216    }
217
218    fn handle_state_change(
219        &mut self,
220        old_state: BufferState,
221        signals: BufferSignals,
222        cb_queue: &mut CallbackQueue,
223    ) {
224        let states_changed = self.state ^ old_state;
225
226        // if nothing changed
227        if states_changed.is_empty() && signals.is_empty() {
228            return;
229        }
230
231        self.event_source
232            .notify_listeners((self.state, states_changed, signals), cb_queue);
233    }
234}
235
236impl Drop for SharedBuf {
237    fn drop(&mut self) {
238        // don't show the following warning message if panicking
239        if std::thread::panicking() {
240            return;
241        }
242
243        // listeners waiting for `NO_READERS` or `NO_WRITERS` status changes will never be notified
244        if self.num_readers != 0 || self.num_writers != 0 {
245            // panic in debug builds since the backtrace will be helpful for debugging
246            debug_panic!(
247                "Dropping SharedBuf while it still has {} readers and {} writers.",
248                self.num_readers,
249                self.num_writers,
250            );
251        }
252    }
253}
254
255bitflags::bitflags! {
256    #[derive(Default, Copy, Clone, Debug)]
257    pub struct BufferState: u8 {
258        /// There is data waiting in the buffer.
259        const READABLE = 0b00000001;
260        /// There is available buffer space.
261        const WRITABLE = 0b00000010;
262        /// The buffer has no readers.
263        const NO_READERS = 0b00000100;
264        /// The buffer has no writers.
265        const NO_WRITERS = 0b00001000;
266    }
267}
268
269bitflags::bitflags! {
270    /// Buffer-related signals that listeners can watch for.
271    #[derive(Default, Copy, Clone, Debug)]
272    pub struct BufferSignals: u8 {
273        /// The buffer now has additional data available to read.
274        const BUFFER_GREW = 1 << 0;
275    }
276}
277
278pub type BufferHandle = Handle<(BufferState, BufferState, BufferSignals)>;
279
280/// A handle that signifies that the owner is acting as a reader for the buffer. The handle must be
281/// returned to the buffer later with [`SharedBuf::remove_reader()`].
282///
283/// Handles aren't linked to specific buffers, so make sure to only return the handle to the same
284/// buffer which you acquired the handle from.
285// do not implement copy or clone
286pub struct ReaderHandle;
287
288/// See [`ReaderHandle`].
289// do not implement copy or clone
290pub struct WriterHandle;
291
292impl Drop for ReaderHandle {
293    fn drop(&mut self) {
294        // don't show the following warning message if panicking
295        if std::thread::panicking() {
296            return;
297        }
298
299        // panic in debug builds since the backtrace will be helpful for debugging
300        debug_panic!(
301            "Dropping ReaderHandle without returning it to SharedBuf. \
302             This likely indicates a bug in Shadow."
303        );
304    }
305}
306
307impl Drop for WriterHandle {
308    fn drop(&mut self) {
309        // don't show the following warning message if panicking
310        if std::thread::panicking() {
311            return;
312        }
313
314        // panic in debug builds since the backtrace will be helpful for debugging
315        debug_panic!(
316            "Dropping WriterHandle without returning it to SharedBuf. \
317             This likely indicates a bug in Shadow."
318        );
319    }
320}