shadow_rs/host/descriptor/
pipe.rs

1use std::sync::Arc;
2
3use atomic_refcell::AtomicRefCell;
4use linux_api::errno::Errno;
5use linux_api::ioctls::IoctlRequest;
6use linux_api::stat::SFlag;
7use shadow_shim_helper_rs::syscall_types::ForeignPtr;
8
9use crate::cshadow as c;
10use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
11use crate::host::descriptor::shared_buf::{
12    BufferHandle, BufferSignals, BufferState, ReaderHandle, SharedBuf, WriterHandle,
13};
14use crate::host::descriptor::{FileMode, FileSignals, FileState, FileStatus};
15use crate::host::memory_manager::MemoryManager;
16use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
17use crate::host::syscall::types::{SyscallError, SyscallResult};
18use crate::utility::HostTreePointer;
19use crate::utility::callback_queue::CallbackQueue;
20
21pub struct Pipe {
22    buffer: Option<Arc<AtomicRefCell<SharedBuf>>>,
23    event_source: StateEventSource,
24    state: FileState,
25    mode: FileMode,
26    status: FileStatus,
27    write_mode: WriteMode,
28    buffer_event_handle: Option<BufferHandle>,
29    reader_handle: Option<ReaderHandle>,
30    writer_handle: Option<WriterHandle>,
31    // should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
32    // this file
33    has_open_file: bool,
34}
35
36impl Pipe {
37    /// Create a new [`Pipe`]. The new pipe must be initialized using [`Pipe::connect_to_buffer`]
38    /// before any of its methods are called.
39    pub fn new(mode: FileMode, status: FileStatus) -> Self {
40        Self {
41            buffer: None,
42            event_source: StateEventSource::new(),
43            state: FileState::ACTIVE,
44            mode,
45            status,
46            write_mode: WriteMode::Stream,
47            buffer_event_handle: None,
48            reader_handle: None,
49            writer_handle: None,
50            has_open_file: false,
51        }
52    }
53
54    pub fn status(&self) -> FileStatus {
55        self.status
56    }
57
58    pub fn set_status(&mut self, status: FileStatus) {
59        self.status = status;
60    }
61
62    pub fn mode(&self) -> FileMode {
63        self.mode
64    }
65
66    pub fn has_open_file(&self) -> bool {
67        self.has_open_file
68    }
69
70    pub fn supports_sa_restart(&self) -> bool {
71        true
72    }
73
74    pub fn set_has_open_file(&mut self, val: bool) {
75        self.has_open_file = val;
76    }
77
78    pub fn max_size(&self) -> usize {
79        self.buffer.as_ref().unwrap().borrow().max_len()
80    }
81
82    pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
83        if self.state.contains(FileState::CLOSED) {
84            log::warn!("Attempting to close an already-closed pipe");
85        }
86
87        // drop the event listener handle so that we stop receiving new events
88        if let Some(h) = self.buffer_event_handle.take() {
89            h.stop_listening()
90        }
91
92        // if acting as a writer, inform the buffer that there is one fewer writers
93        if let Some(writer_handle) = self.writer_handle.take() {
94            self.buffer
95                .as_ref()
96                .unwrap()
97                .borrow_mut()
98                .remove_writer(writer_handle, cb_queue);
99        }
100
101        // if acting as a reader, inform the buffer that there is one fewer readers
102        if let Some(reader_handle) = self.reader_handle.take() {
103            self.buffer
104                .as_ref()
105                .unwrap()
106                .borrow_mut()
107                .remove_reader(reader_handle, cb_queue);
108        }
109
110        // no need to hold on to the buffer anymore
111        self.buffer = None;
112
113        // set the closed flag and remove the active, readable, and writable flags
114        self.update_state(
115            FileState::CLOSED | FileState::ACTIVE | FileState::READABLE | FileState::WRITABLE,
116            FileState::CLOSED,
117            FileSignals::empty(),
118            cb_queue,
119        );
120
121        Ok(())
122    }
123
124    pub fn readv(
125        &mut self,
126        iovs: &[IoVec],
127        offset: Option<libc::off_t>,
128        _flags: libc::c_int,
129        mem: &mut MemoryManager,
130        cb_queue: &mut CallbackQueue,
131    ) -> Result<libc::ssize_t, SyscallError> {
132        // pipes don't support seeking
133        if offset.is_some() {
134            return Err(linux_api::errno::Errno::ESPIPE.into());
135        }
136
137        // if the file is not open for reading, return EBADF
138        if !self.mode.contains(FileMode::READ) {
139            return Err(linux_api::errno::Errno::EBADF.into());
140        }
141
142        let num_bytes_to_read: libc::size_t = iovs.iter().map(|x| x.len).sum();
143
144        let mut writer = IoVecWriter::new(iovs, mem);
145
146        let (num_copied, _num_removed_from_buf) = self
147            .buffer
148            .as_ref()
149            .unwrap()
150            .borrow_mut()
151            .read(&mut writer, cb_queue)?;
152
153        // the read would block if all:
154        //  1. we could not read any bytes
155        //  2. we were asked to read >0 bytes
156        //  3. there are open descriptors that refer to the write end of the pipe
157        if num_copied == 0
158            && num_bytes_to_read != 0
159            && self.buffer.as_ref().unwrap().borrow().num_writers() > 0
160        {
161            Err(Errno::EWOULDBLOCK.into())
162        } else {
163            Ok(num_copied.try_into().unwrap())
164        }
165    }
166
167    pub fn writev(
168        &mut self,
169        iovs: &[IoVec],
170        offset: Option<libc::off_t>,
171        _flags: libc::c_int,
172        mem: &mut MemoryManager,
173        cb_queue: &mut CallbackQueue,
174    ) -> Result<libc::ssize_t, SyscallError> {
175        // pipes don't support seeking
176        if offset.is_some() {
177            return Err(linux_api::errno::Errno::ESPIPE.into());
178        }
179
180        // if the file is not open for writing, return EBADF
181        if !self.mode.contains(FileMode::WRITE) {
182            return Err(linux_api::errno::Errno::EBADF.into());
183        }
184
185        let mut buffer = self.buffer.as_ref().unwrap().borrow_mut();
186
187        if buffer.num_readers() == 0 {
188            return Err(linux_api::errno::Errno::EPIPE.into());
189        }
190
191        if self.write_mode == WriteMode::Packet && !self.status.contains(FileStatus::DIRECT) {
192            // switch to stream mode immediately, regardless of whether the buffer is empty or not
193            self.write_mode = WriteMode::Stream;
194        } else if self.write_mode == WriteMode::Stream && self.status.contains(FileStatus::DIRECT) {
195            // in linux, it seems that pipes only switch to packet mode when a new page is added to
196            // the buffer, so we simulate that behaviour for when the first page is added (when the
197            // buffer is empty)
198            if !buffer.has_data() {
199                self.write_mode = WriteMode::Packet;
200            }
201        }
202
203        let len: libc::size_t = iovs.iter().map(|x| x.len).sum();
204
205        let mut reader = IoVecReader::new(iovs, mem);
206
207        let num_copied = match self.write_mode {
208            WriteMode::Stream => buffer.write_stream(&mut reader, len, cb_queue)?,
209            WriteMode::Packet => {
210                let mut num_written = 0;
211
212                loop {
213                    // the number of remaining bytes to write
214                    let bytes_remaining = len - num_written;
215
216                    // if there are no more bytes to write (pipes don't support 0-length packets)
217                    if bytes_remaining == 0 {
218                        break num_written;
219                    }
220
221                    // split the packet up into PIPE_BUF-sized packets
222                    let bytes_to_write = std::cmp::min(bytes_remaining, libc::PIPE_BUF);
223
224                    if let Err(e) = buffer.write_packet(&mut reader, bytes_to_write, cb_queue) {
225                        // if we've already written bytes, return those instead of an error
226                        if num_written > 0 {
227                            break num_written;
228                        }
229                        return Err(e.into());
230                    }
231
232                    num_written += bytes_to_write;
233                }
234            }
235        };
236
237        Ok(num_copied.try_into().unwrap())
238    }
239
240    pub fn ioctl(
241        &mut self,
242        request: IoctlRequest,
243        _arg_ptr: ForeignPtr<()>,
244        _memory_manager: &mut MemoryManager,
245    ) -> SyscallResult {
246        log::warn!("We do not yet handle ioctl request {request:?} on pipes");
247        Err(Errno::EINVAL.into())
248    }
249
250    pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
251        warn_once_then_debug!("Not all fields of 'struct stat' are implemented for pipes");
252
253        Ok(linux_api::stat::stat {
254            // the device and inode are non-zero on linux, but shadow can't really give meaningful
255            // values here
256            st_dev: 0,
257            st_ino: 0,
258            // this may need to be >1 if shadow ever supports named pipes
259            st_nlink: 1,
260            // linux seems to use a mode of readable+writable for both ends of a pipe, but as a
261            // reminder this st_mode field is the mode of the pipe in the pipefs filesystem, not the
262            // mode of the pipe file (linux struct file) itself
263            st_mode: (SFlag::S_IFIFO | SFlag::S_IRUSR | SFlag::S_IWUSR).bits(),
264            // shadow pretends to run as root, although this gets messy since file-related syscalls
265            // that are passed through to linux have the uid/gid of the user running the simulation
266            st_uid: 0,
267            st_gid: 0,
268            l__pad0: 0,
269            st_rdev: 0,
270            // apparently the behaviour of this field depends on what unix you're running, but on
271            // linux it seems to always be 0
272            st_size: 0,
273            // TODO
274            st_blksize: 0,
275            st_blocks: 0,
276            st_atime: 0,
277            st_atime_nsec: 0,
278            st_mtime: 0,
279            st_mtime_nsec: 0,
280            st_ctime: 0,
281            st_ctime_nsec: 0,
282            l__unused: [0; 3],
283        })
284    }
285
286    pub fn connect_to_buffer(
287        arc: &Arc<AtomicRefCell<Self>>,
288        buffer: Arc<AtomicRefCell<SharedBuf>>,
289        cb_queue: &mut CallbackQueue,
290    ) {
291        let weak = Arc::downgrade(arc);
292        let pipe = &mut *arc.borrow_mut();
293
294        pipe.buffer = Some(buffer);
295
296        if pipe.mode.contains(FileMode::WRITE) {
297            pipe.writer_handle = Some(
298                pipe.buffer
299                    .as_ref()
300                    .unwrap()
301                    .borrow_mut()
302                    .add_writer(cb_queue),
303            );
304        }
305
306        if pipe.mode.contains(FileMode::READ) {
307            pipe.reader_handle = Some(
308                pipe.buffer
309                    .as_ref()
310                    .unwrap()
311                    .borrow_mut()
312                    .add_reader(cb_queue),
313            );
314        }
315
316        // buffer state changes that we want to receive events for
317        let mut monitoring_state = BufferState::empty();
318
319        // if the file is open for reading, watch for the buffer to become readable or have no
320        // writers
321        if pipe.mode.contains(FileMode::READ) {
322            monitoring_state.insert(BufferState::READABLE);
323            monitoring_state.insert(BufferState::NO_WRITERS);
324        }
325
326        // if the file is open for writing, watch for the buffer to become writable or have no
327        // readers
328        if pipe.mode.contains(FileMode::WRITE) {
329            monitoring_state.insert(BufferState::WRITABLE);
330            monitoring_state.insert(BufferState::NO_READERS);
331        }
332
333        // TODO: We have to monitor all of the buffer's signals that we might want to pass to the
334        // pipe file's listeners, but this adds extra overhead when the file doesn't have any
335        // listeners that are listening for some of these signals. For example if there's a noisy
336        // signal `BufferSignals::FOO` that we want to forward to file listeners as
337        // `FileSignals::FOO`, we must always subscribe to `BufferSignals::FOO` signals even if the
338        // pipe doesn't have any file listeners subscribed to `FileSignals::FOO`. We don't know if
339        // there are currently file listeners subscribed to this file signal, so we must always emit
340        // them. This means we might receive a lot of notifications for `BufferSignals::Foo` that we
341        // ultimitely throw away since no file listener wants them. It would be great if there was a
342        // nice way to optimize this somehow so that we only listen for `BufferSignals::FOO` if we
343        // have a listener for `FileSignals::FOO`.
344        let monitoring_signals = BufferSignals::BUFFER_GREW;
345
346        let handle = pipe.buffer.as_ref().unwrap().borrow_mut().add_listener(
347            monitoring_state,
348            monitoring_signals,
349            move |buffer_state, buffer_signals, cb_queue| {
350                // if the file hasn't been dropped
351                if let Some(pipe) = weak.upgrade() {
352                    let mut pipe = pipe.borrow_mut();
353
354                    // update the pipe file's state to align with the buffer's current state
355                    pipe.align_state_to_buffer(buffer_state, buffer_signals, cb_queue);
356                }
357            },
358        );
359
360        pipe.buffer_event_handle = Some(handle);
361
362        // update the pipe file's initial state to align with the buffer's current state
363        let buffer_state = pipe.buffer.as_ref().unwrap().borrow().state();
364        pipe.align_state_to_buffer(buffer_state, BufferSignals::empty(), cb_queue);
365    }
366
367    pub fn add_listener(
368        &mut self,
369        monitoring_state: FileState,
370        monitoring_signals: FileSignals,
371        filter: StateListenerFilter,
372        notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
373        + Send
374        + Sync
375        + 'static,
376    ) -> StateListenHandle {
377        self.event_source
378            .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
379    }
380
381    pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<c::StatusListener>) {
382        self.event_source.add_legacy_listener(ptr);
383    }
384
385    pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener) {
386        self.event_source.remove_legacy_listener(ptr);
387    }
388
389    pub fn state(&self) -> FileState {
390        self.state
391    }
392
393    /// Align the pipe's state to the buffer state. For example if the buffer is both `READABLE` and
394    /// `WRITABLE`, and the pipe is only open in `READ` mode, the pipe's `READABLE` state will be
395    /// set and the `WRITABLE` state will be unchanged. This method may also pass through signals
396    /// from the buffer to any of the file's listeners.
397    fn align_state_to_buffer(
398        &mut self,
399        buffer_state: BufferState,
400        buffer_signals: BufferSignals,
401        cb_queue: &mut CallbackQueue,
402    ) {
403        let mut mask = FileState::empty();
404        let mut file_state = FileState::empty();
405        let mut file_signals = FileSignals::empty();
406
407        // if the pipe is already closed, do nothing
408        if self.state.contains(FileState::CLOSED) {
409            return;
410        }
411
412        // only update the readable state if the file is open for reading
413        if self.mode.contains(FileMode::READ) {
414            mask.insert(FileState::READABLE);
415            // file is readable if the buffer is readable or there are no writers
416            if buffer_state.intersects(BufferState::READABLE | BufferState::NO_WRITERS) {
417                file_state.insert(FileState::READABLE);
418            }
419            if buffer_signals.intersects(BufferSignals::BUFFER_GREW) {
420                file_signals.insert(FileSignals::READ_BUFFER_GREW);
421            }
422        }
423
424        // only update the writable state if the file is open for writing
425        if self.mode.contains(FileMode::WRITE) {
426            mask.insert(FileState::WRITABLE);
427            // file is writable if the buffer is writable or there are no readers
428            if buffer_state.intersects(BufferState::WRITABLE | BufferState::NO_READERS) {
429                file_state.insert(FileState::WRITABLE);
430            }
431        }
432
433        // update the file's state
434        self.update_state(mask, file_state, file_signals, cb_queue);
435    }
436
437    fn update_state(
438        &mut self,
439        mask: FileState,
440        state: FileState,
441        signals: FileSignals,
442        cb_queue: &mut CallbackQueue,
443    ) {
444        let old_state = self.state;
445
446        // remove the masked flags, then copy the masked flags
447        self.state.remove(mask);
448        self.state.insert(state & mask);
449
450        self.handle_state_change(old_state, signals, cb_queue);
451    }
452
453    fn handle_state_change(
454        &mut self,
455        old_state: FileState,
456        signals: FileSignals,
457        cb_queue: &mut CallbackQueue,
458    ) {
459        let states_changed = self.state ^ old_state;
460
461        // if nothing changed
462        if states_changed.is_empty() && signals.is_empty() {
463            return;
464        }
465
466        self.event_source
467            .notify_listeners(self.state, states_changed, signals, cb_queue);
468    }
469}
470
471#[derive(Debug, PartialEq, Eq)]
472enum WriteMode {
473    Stream,
474    Packet,
475}