shadow_rs/host/descriptor/
pipe.rs1use std::sync::Arc;
2
3use atomic_refcell::AtomicRefCell;
4use linux_api::errno::Errno;
5use linux_api::ioctls::IoctlRequest;
6use linux_api::stat::SFlag;
7use shadow_shim_helper_rs::syscall_types::ForeignPtr;
8
9use crate::cshadow as c;
10use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
11use crate::host::descriptor::shared_buf::{
12 BufferHandle, BufferSignals, BufferState, ReaderHandle, SharedBuf, WriterHandle,
13};
14use crate::host::descriptor::{FileMode, FileSignals, FileState, FileStatus};
15use crate::host::memory_manager::MemoryManager;
16use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
17use crate::host::syscall::types::{SyscallError, SyscallResult};
18use crate::utility::HostTreePointer;
19use crate::utility::callback_queue::CallbackQueue;
20
21pub struct Pipe {
22 buffer: Option<Arc<AtomicRefCell<SharedBuf>>>,
23 event_source: StateEventSource,
24 state: FileState,
25 mode: FileMode,
26 status: FileStatus,
27 write_mode: WriteMode,
28 buffer_event_handle: Option<BufferHandle>,
29 reader_handle: Option<ReaderHandle>,
30 writer_handle: Option<WriterHandle>,
31 has_open_file: bool,
34}
35
36impl Pipe {
37 pub fn new(mode: FileMode, status: FileStatus) -> Self {
40 Self {
41 buffer: None,
42 event_source: StateEventSource::new(),
43 state: FileState::ACTIVE,
44 mode,
45 status,
46 write_mode: WriteMode::Stream,
47 buffer_event_handle: None,
48 reader_handle: None,
49 writer_handle: None,
50 has_open_file: false,
51 }
52 }
53
54 pub fn status(&self) -> FileStatus {
55 self.status
56 }
57
58 pub fn set_status(&mut self, status: FileStatus) {
59 self.status = status;
60 }
61
62 pub fn mode(&self) -> FileMode {
63 self.mode
64 }
65
66 pub fn has_open_file(&self) -> bool {
67 self.has_open_file
68 }
69
70 pub fn supports_sa_restart(&self) -> bool {
71 true
72 }
73
74 pub fn set_has_open_file(&mut self, val: bool) {
75 self.has_open_file = val;
76 }
77
78 pub fn max_size(&self) -> usize {
79 self.buffer.as_ref().unwrap().borrow().max_len()
80 }
81
82 pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
83 if self.state.contains(FileState::CLOSED) {
84 log::warn!("Attempting to close an already-closed pipe");
85 }
86
87 if let Some(h) = self.buffer_event_handle.take() {
89 h.stop_listening()
90 }
91
92 if let Some(writer_handle) = self.writer_handle.take() {
94 self.buffer
95 .as_ref()
96 .unwrap()
97 .borrow_mut()
98 .remove_writer(writer_handle, cb_queue);
99 }
100
101 if let Some(reader_handle) = self.reader_handle.take() {
103 self.buffer
104 .as_ref()
105 .unwrap()
106 .borrow_mut()
107 .remove_reader(reader_handle, cb_queue);
108 }
109
110 self.buffer = None;
112
113 self.update_state(
115 FileState::CLOSED | FileState::ACTIVE | FileState::READABLE | FileState::WRITABLE,
116 FileState::CLOSED,
117 FileSignals::empty(),
118 cb_queue,
119 );
120
121 Ok(())
122 }
123
124 pub fn readv(
125 &mut self,
126 iovs: &[IoVec],
127 offset: Option<libc::off_t>,
128 _flags: libc::c_int,
129 mem: &mut MemoryManager,
130 cb_queue: &mut CallbackQueue,
131 ) -> Result<libc::ssize_t, SyscallError> {
132 if offset.is_some() {
134 return Err(linux_api::errno::Errno::ESPIPE.into());
135 }
136
137 if !self.mode.contains(FileMode::READ) {
139 return Err(linux_api::errno::Errno::EBADF.into());
140 }
141
142 let num_bytes_to_read: libc::size_t = iovs.iter().map(|x| x.len).sum();
143
144 let mut writer = IoVecWriter::new(iovs, mem);
145
146 let (num_copied, _num_removed_from_buf) = self
147 .buffer
148 .as_ref()
149 .unwrap()
150 .borrow_mut()
151 .read(&mut writer, cb_queue)?;
152
153 if num_copied == 0
158 && num_bytes_to_read != 0
159 && self.buffer.as_ref().unwrap().borrow().num_writers() > 0
160 {
161 Err(Errno::EWOULDBLOCK.into())
162 } else {
163 Ok(num_copied.try_into().unwrap())
164 }
165 }
166
167 pub fn writev(
168 &mut self,
169 iovs: &[IoVec],
170 offset: Option<libc::off_t>,
171 _flags: libc::c_int,
172 mem: &mut MemoryManager,
173 cb_queue: &mut CallbackQueue,
174 ) -> Result<libc::ssize_t, SyscallError> {
175 if offset.is_some() {
177 return Err(linux_api::errno::Errno::ESPIPE.into());
178 }
179
180 if !self.mode.contains(FileMode::WRITE) {
182 return Err(linux_api::errno::Errno::EBADF.into());
183 }
184
185 let mut buffer = self.buffer.as_ref().unwrap().borrow_mut();
186
187 if buffer.num_readers() == 0 {
188 return Err(linux_api::errno::Errno::EPIPE.into());
189 }
190
191 if self.write_mode == WriteMode::Packet && !self.status.contains(FileStatus::DIRECT) {
192 self.write_mode = WriteMode::Stream;
194 } else if self.write_mode == WriteMode::Stream && self.status.contains(FileStatus::DIRECT) {
195 if !buffer.has_data() {
199 self.write_mode = WriteMode::Packet;
200 }
201 }
202
203 let len: libc::size_t = iovs.iter().map(|x| x.len).sum();
204
205 let mut reader = IoVecReader::new(iovs, mem);
206
207 let num_copied = match self.write_mode {
208 WriteMode::Stream => buffer.write_stream(&mut reader, len, cb_queue)?,
209 WriteMode::Packet => {
210 let mut num_written = 0;
211
212 loop {
213 let bytes_remaining = len - num_written;
215
216 if bytes_remaining == 0 {
218 break num_written;
219 }
220
221 let bytes_to_write = std::cmp::min(bytes_remaining, libc::PIPE_BUF);
223
224 if let Err(e) = buffer.write_packet(&mut reader, bytes_to_write, cb_queue) {
225 if num_written > 0 {
227 break num_written;
228 }
229 return Err(e.into());
230 }
231
232 num_written += bytes_to_write;
233 }
234 }
235 };
236
237 Ok(num_copied.try_into().unwrap())
238 }
239
240 pub fn ioctl(
241 &mut self,
242 request: IoctlRequest,
243 _arg_ptr: ForeignPtr<()>,
244 _memory_manager: &mut MemoryManager,
245 ) -> SyscallResult {
246 log::warn!("We do not yet handle ioctl request {request:?} on pipes");
247 Err(Errno::EINVAL.into())
248 }
249
250 pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
251 warn_once_then_debug!("Not all fields of 'struct stat' are implemented for pipes");
252
253 Ok(linux_api::stat::stat {
254 st_dev: 0,
257 st_ino: 0,
258 st_nlink: 1,
260 st_mode: (SFlag::S_IFIFO | SFlag::S_IRUSR | SFlag::S_IWUSR).bits(),
264 st_uid: 0,
267 st_gid: 0,
268 l__pad0: 0,
269 st_rdev: 0,
270 st_size: 0,
273 st_blksize: 0,
275 st_blocks: 0,
276 st_atime: 0,
277 st_atime_nsec: 0,
278 st_mtime: 0,
279 st_mtime_nsec: 0,
280 st_ctime: 0,
281 st_ctime_nsec: 0,
282 l__unused: [0; 3],
283 })
284 }
285
286 pub fn connect_to_buffer(
287 arc: &Arc<AtomicRefCell<Self>>,
288 buffer: Arc<AtomicRefCell<SharedBuf>>,
289 cb_queue: &mut CallbackQueue,
290 ) {
291 let weak = Arc::downgrade(arc);
292 let pipe = &mut *arc.borrow_mut();
293
294 pipe.buffer = Some(buffer);
295
296 if pipe.mode.contains(FileMode::WRITE) {
297 pipe.writer_handle = Some(
298 pipe.buffer
299 .as_ref()
300 .unwrap()
301 .borrow_mut()
302 .add_writer(cb_queue),
303 );
304 }
305
306 if pipe.mode.contains(FileMode::READ) {
307 pipe.reader_handle = Some(
308 pipe.buffer
309 .as_ref()
310 .unwrap()
311 .borrow_mut()
312 .add_reader(cb_queue),
313 );
314 }
315
316 let mut monitoring_state = BufferState::empty();
318
319 if pipe.mode.contains(FileMode::READ) {
322 monitoring_state.insert(BufferState::READABLE);
323 monitoring_state.insert(BufferState::NO_WRITERS);
324 }
325
326 if pipe.mode.contains(FileMode::WRITE) {
329 monitoring_state.insert(BufferState::WRITABLE);
330 monitoring_state.insert(BufferState::NO_READERS);
331 }
332
333 let monitoring_signals = BufferSignals::BUFFER_GREW;
345
346 let handle = pipe.buffer.as_ref().unwrap().borrow_mut().add_listener(
347 monitoring_state,
348 monitoring_signals,
349 move |buffer_state, buffer_signals, cb_queue| {
350 if let Some(pipe) = weak.upgrade() {
352 let mut pipe = pipe.borrow_mut();
353
354 pipe.align_state_to_buffer(buffer_state, buffer_signals, cb_queue);
356 }
357 },
358 );
359
360 pipe.buffer_event_handle = Some(handle);
361
362 let buffer_state = pipe.buffer.as_ref().unwrap().borrow().state();
364 pipe.align_state_to_buffer(buffer_state, BufferSignals::empty(), cb_queue);
365 }
366
367 pub fn add_listener(
368 &mut self,
369 monitoring_state: FileState,
370 monitoring_signals: FileSignals,
371 filter: StateListenerFilter,
372 notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
373 + Send
374 + Sync
375 + 'static,
376 ) -> StateListenHandle {
377 self.event_source
378 .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
379 }
380
381 pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<c::StatusListener>) {
382 self.event_source.add_legacy_listener(ptr);
383 }
384
385 pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener) {
386 self.event_source.remove_legacy_listener(ptr);
387 }
388
389 pub fn state(&self) -> FileState {
390 self.state
391 }
392
393 fn align_state_to_buffer(
398 &mut self,
399 buffer_state: BufferState,
400 buffer_signals: BufferSignals,
401 cb_queue: &mut CallbackQueue,
402 ) {
403 let mut mask = FileState::empty();
404 let mut file_state = FileState::empty();
405 let mut file_signals = FileSignals::empty();
406
407 if self.state.contains(FileState::CLOSED) {
409 return;
410 }
411
412 if self.mode.contains(FileMode::READ) {
414 mask.insert(FileState::READABLE);
415 if buffer_state.intersects(BufferState::READABLE | BufferState::NO_WRITERS) {
417 file_state.insert(FileState::READABLE);
418 }
419 if buffer_signals.intersects(BufferSignals::BUFFER_GREW) {
420 file_signals.insert(FileSignals::READ_BUFFER_GREW);
421 }
422 }
423
424 if self.mode.contains(FileMode::WRITE) {
426 mask.insert(FileState::WRITABLE);
427 if buffer_state.intersects(BufferState::WRITABLE | BufferState::NO_READERS) {
429 file_state.insert(FileState::WRITABLE);
430 }
431 }
432
433 self.update_state(mask, file_state, file_signals, cb_queue);
435 }
436
437 fn update_state(
438 &mut self,
439 mask: FileState,
440 state: FileState,
441 signals: FileSignals,
442 cb_queue: &mut CallbackQueue,
443 ) {
444 let old_state = self.state;
445
446 self.state.remove(mask);
448 self.state.insert(state & mask);
449
450 self.handle_state_change(old_state, signals, cb_queue);
451 }
452
453 fn handle_state_change(
454 &mut self,
455 old_state: FileState,
456 signals: FileSignals,
457 cb_queue: &mut CallbackQueue,
458 ) {
459 let states_changed = self.state ^ old_state;
460
461 if states_changed.is_empty() && signals.is_empty() {
463 return;
464 }
465
466 self.event_source
467 .notify_listeners(self.state, states_changed, signals, cb_queue);
468 }
469}
470
471#[derive(Debug, PartialEq, Eq)]
472enum WriteMode {
473 Stream,
474 Packet,
475}