shadow_rs/host/descriptor/
shared_buf.rs
1use linux_api::errno::Errno;
5
6use crate::utility::byte_queue::ByteQueue;
7use crate::utility::callback_queue::{CallbackQueue, EventSource, Handle};
8
9pub struct SharedBuf {
10 queue: ByteQueue,
11 max_len: usize,
12 state: BufferState,
13 num_readers: u16,
14 num_writers: u16,
15 event_source: EventSource<(BufferState, BufferState, BufferSignals)>,
16}
17
18impl SharedBuf {
19 pub fn new(max_len: usize) -> Self {
20 assert_ne!(max_len, 0);
21 Self {
22 queue: ByteQueue::new(4096),
23 max_len,
24 state: BufferState::WRITABLE | BufferState::NO_READERS | BufferState::NO_WRITERS,
25 num_readers: 0,
26 num_writers: 0,
27 event_source: EventSource::new(),
28 }
29 }
30
31 pub fn has_data(&self) -> bool {
32 self.queue.has_chunks()
33 }
34
35 pub fn max_len(&self) -> usize {
36 self.max_len
37 }
38
39 pub fn space_available(&self) -> usize {
40 self.max_len - self.queue.num_bytes()
41 }
42
43 pub fn add_reader(&mut self, cb_queue: &mut CallbackQueue) -> ReaderHandle {
46 self.num_readers += 1;
47 self.refresh_state(BufferSignals::empty(), cb_queue);
48 ReaderHandle {}
49 }
50
51 pub fn remove_reader(&mut self, handle: ReaderHandle, cb_queue: &mut CallbackQueue) {
52 self.num_readers -= 1;
53 std::mem::forget(handle);
55 self.refresh_state(BufferSignals::empty(), cb_queue);
56 }
57
58 pub fn num_readers(&self) -> u16 {
59 self.num_readers
60 }
61
62 pub fn add_writer(&mut self, cb_queue: &mut CallbackQueue) -> WriterHandle {
65 self.num_writers += 1;
66 self.refresh_state(BufferSignals::empty(), cb_queue);
67 WriterHandle {}
68 }
69
70 pub fn remove_writer(&mut self, handle: WriterHandle, cb_queue: &mut CallbackQueue) {
71 self.num_writers -= 1;
72 std::mem::forget(handle);
74 self.refresh_state(BufferSignals::empty(), cb_queue);
75 }
76
77 pub fn num_writers(&self) -> u16 {
78 self.num_writers
79 }
80
81 pub fn peek<W: std::io::Write>(&self, bytes: W) -> Result<(usize, usize), std::io::Error> {
82 let (num_copied, num_removed_from_buf) = match self.queue.peek(bytes)? {
83 Some((num_copied, num_removed_from_buf, _chunk_type)) => {
84 (num_copied, num_removed_from_buf)
85 }
86 None => (0, 0),
87 };
88 Ok((num_copied, num_removed_from_buf))
89 }
90
91 pub fn read<W: std::io::Write>(
92 &mut self,
93 bytes: W,
94 cb_queue: &mut CallbackQueue,
95 ) -> Result<(usize, usize), std::io::Error> {
96 let (num_copied, num_removed_from_buf) = match self.queue.pop(bytes)? {
97 Some((num_copied, num_removed_from_buf, _chunk_type)) => {
98 (num_copied, num_removed_from_buf)
99 }
100 None => (0, 0),
101 };
102 self.refresh_state(BufferSignals::empty(), cb_queue);
103
104 Ok((num_copied, num_removed_from_buf))
105 }
106
107 pub fn write_stream<R: std::io::Read>(
108 &mut self,
109 bytes: R,
110 len: usize,
111 cb_queue: &mut CallbackQueue,
112 ) -> Result<usize, std::io::Error> {
113 if len == 0 {
114 return Ok(0);
115 }
116
117 if self.space_available() == 0 {
118 return Err(Errno::EAGAIN.into());
119 }
120
121 let written = self
122 .queue
123 .push_stream(bytes.take(self.space_available().try_into().unwrap()))?;
124
125 let signals = if written > 0 {
126 BufferSignals::BUFFER_GREW
127 } else {
128 BufferSignals::empty()
129 };
130 self.refresh_state(signals, cb_queue);
131
132 Ok(written)
133 }
134
135 pub fn write_packet<R: std::io::Read>(
136 &mut self,
137 mut bytes: R,
138 len: usize,
139 cb_queue: &mut CallbackQueue,
140 ) -> Result<(), std::io::Error> {
141 if len > self.max_len() {
142 return Err(Errno::EMSGSIZE.into());
144 }
145
146 if len > self.space_available() {
147 return Err(Errno::EAGAIN.into());
148 }
149
150 self.queue.push_packet(bytes.by_ref(), len)?;
151
152 self.refresh_state(BufferSignals::BUFFER_GREW, cb_queue);
153
154 Ok(())
155 }
156
157 pub fn add_listener(
158 &mut self,
159 monitoring_state: BufferState,
160 monitoring_signals: BufferSignals,
161 notify_fn: impl Fn(BufferState, BufferSignals, &mut CallbackQueue) + Send + Sync + 'static,
162 ) -> BufferHandle {
163 self.event_source
164 .add_listener(move |(state, changed, signals), cb_queue| {
165 let flipped = monitoring_state.intersects(changed);
167
168 let signals = signals.intersection(monitoring_signals);
170
171 if !flipped && signals.is_empty() {
172 return;
173 }
174
175 (notify_fn)(state, signals, cb_queue)
176 })
177 }
178
179 pub fn state(&self) -> BufferState {
180 self.state
181 }
182
183 fn refresh_state(&mut self, signals: BufferSignals, cb_queue: &mut CallbackQueue) {
187 let state_mask = BufferState::READABLE
188 | BufferState::WRITABLE
189 | BufferState::NO_READERS
190 | BufferState::NO_WRITERS;
191
192 let mut new_state = BufferState::empty();
193
194 new_state.set(BufferState::READABLE, self.has_data());
195 new_state.set(BufferState::WRITABLE, self.space_available() > 0);
196 new_state.set(BufferState::NO_READERS, self.num_readers() == 0);
197 new_state.set(BufferState::NO_WRITERS, self.num_writers() == 0);
198
199 self.update_state(state_mask, new_state, signals, cb_queue);
200 }
201
202 fn update_state(
203 &mut self,
204 mask: BufferState,
205 state: BufferState,
206 signals: BufferSignals,
207 cb_queue: &mut CallbackQueue,
208 ) {
209 let old_state = self.state;
210
211 self.state.remove(mask);
213 self.state.insert(state & mask);
214
215 self.handle_state_change(old_state, signals, cb_queue);
216 }
217
218 fn handle_state_change(
219 &mut self,
220 old_state: BufferState,
221 signals: BufferSignals,
222 cb_queue: &mut CallbackQueue,
223 ) {
224 let states_changed = self.state ^ old_state;
225
226 if states_changed.is_empty() && signals.is_empty() {
228 return;
229 }
230
231 self.event_source
232 .notify_listeners((self.state, states_changed, signals), cb_queue);
233 }
234}
235
236impl Drop for SharedBuf {
237 fn drop(&mut self) {
238 if std::thread::panicking() {
240 return;
241 }
242
243 if self.num_readers != 0 || self.num_writers != 0 {
245 debug_panic!(
247 "Dropping SharedBuf while it still has {} readers and {} writers.",
248 self.num_readers,
249 self.num_writers,
250 );
251 }
252 }
253}
254
255bitflags::bitflags! {
256 #[derive(Default, Copy, Clone, Debug)]
257 pub struct BufferState: u8 {
258 const READABLE = 0b00000001;
260 const WRITABLE = 0b00000010;
262 const NO_READERS = 0b00000100;
264 const NO_WRITERS = 0b00001000;
266 }
267}
268
269bitflags::bitflags! {
270 #[derive(Default, Copy, Clone, Debug)]
272 pub struct BufferSignals: u8 {
273 const BUFFER_GREW = 1 << 0;
275 }
276}
277
278pub type BufferHandle = Handle<(BufferState, BufferState, BufferSignals)>;
279
280pub struct ReaderHandle;
287
288pub struct WriterHandle;
291
292impl Drop for ReaderHandle {
293 fn drop(&mut self) {
294 if std::thread::panicking() {
296 return;
297 }
298
299 debug_panic!(
301 "Dropping ReaderHandle without returning it to SharedBuf. \
302 This likely indicates a bug in Shadow."
303 );
304 }
305}
306
307impl Drop for WriterHandle {
308 fn drop(&mut self) {
309 if std::thread::panicking() {
311 return;
312 }
313
314 debug_panic!(
316 "Dropping WriterHandle without returning it to SharedBuf. \
317 This likely indicates a bug in Shadow."
318 );
319 }
320}