1use std::sync::Arc;
23use atomic_refcell::AtomicRefCell;
4use linux_api::errno::Errno;
5use linux_api::ioctls::IoctlRequest;
6use linux_api::stat::SFlag;
7use shadow_shim_helper_rs::syscall_types::ForeignPtr;
89use crate::cshadow as c;
10use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
11use crate::host::descriptor::shared_buf::{
12 BufferHandle, BufferSignals, BufferState, ReaderHandle, SharedBuf, WriterHandle,
13};
14use crate::host::descriptor::{FileMode, FileSignals, FileState, FileStatus};
15use crate::host::memory_manager::MemoryManager;
16use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
17use crate::host::syscall::types::{SyscallError, SyscallResult};
18use crate::utility::HostTreePointer;
19use crate::utility::callback_queue::CallbackQueue;
2021pub struct Pipe {
22 buffer: Option<Arc<AtomicRefCell<SharedBuf>>>,
23 event_source: StateEventSource,
24 state: FileState,
25 mode: FileMode,
26 status: FileStatus,
27 write_mode: WriteMode,
28 buffer_event_handle: Option<BufferHandle>,
29 reader_handle: Option<ReaderHandle>,
30 writer_handle: Option<WriterHandle>,
31// should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
32 // this file
33has_open_file: bool,
34}
3536impl Pipe {
37/// Create a new [`Pipe`]. The new pipe must be initialized using [`Pipe::connect_to_buffer`]
38 /// before any of its methods are called.
39pub fn new(mode: FileMode, status: FileStatus) -> Self {
40Self {
41 buffer: None,
42 event_source: StateEventSource::new(),
43 state: FileState::ACTIVE,
44 mode,
45 status,
46 write_mode: WriteMode::Stream,
47 buffer_event_handle: None,
48 reader_handle: None,
49 writer_handle: None,
50 has_open_file: false,
51 }
52 }
5354pub fn status(&self) -> FileStatus {
55self.status
56 }
5758pub fn set_status(&mut self, status: FileStatus) {
59self.status = status;
60 }
6162pub fn mode(&self) -> FileMode {
63self.mode
64 }
6566pub fn has_open_file(&self) -> bool {
67self.has_open_file
68 }
6970pub fn supports_sa_restart(&self) -> bool {
71true
72}
7374pub fn set_has_open_file(&mut self, val: bool) {
75self.has_open_file = val;
76 }
7778pub fn max_size(&self) -> usize {
79self.buffer.as_ref().unwrap().borrow().max_len()
80 }
8182pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
83if self.state.contains(FileState::CLOSED) {
84log::warn!("Attempting to close an already-closed pipe");
85 }
8687// drop the event listener handle so that we stop receiving new events
88if let Some(h) = self.buffer_event_handle.take() {
89 h.stop_listening()
90 }
9192// if acting as a writer, inform the buffer that there is one fewer writers
93if let Some(writer_handle) = self.writer_handle.take() {
94self.buffer
95 .as_ref()
96 .unwrap()
97 .borrow_mut()
98 .remove_writer(writer_handle, cb_queue);
99 }
100101// if acting as a reader, inform the buffer that there is one fewer readers
102if let Some(reader_handle) = self.reader_handle.take() {
103self.buffer
104 .as_ref()
105 .unwrap()
106 .borrow_mut()
107 .remove_reader(reader_handle, cb_queue);
108 }
109110// no need to hold on to the buffer anymore
111self.buffer = None;
112113// set the closed flag and remove the active, readable, and writable flags
114self.update_state(
115 FileState::CLOSED | FileState::ACTIVE | FileState::READABLE | FileState::WRITABLE,
116 FileState::CLOSED,
117 FileSignals::empty(),
118 cb_queue,
119 );
120121Ok(())
122 }
123124pub fn readv(
125&mut self,
126 iovs: &[IoVec],
127 offset: Option<libc::off_t>,
128 _flags: libc::c_int,
129 mem: &mut MemoryManager,
130 cb_queue: &mut CallbackQueue,
131 ) -> Result<libc::ssize_t, SyscallError> {
132// pipes don't support seeking
133if offset.is_some() {
134return Err(linux_api::errno::Errno::ESPIPE.into());
135 }
136137// if the file is not open for reading, return EBADF
138if !self.mode.contains(FileMode::READ) {
139return Err(linux_api::errno::Errno::EBADF.into());
140 }
141142let num_bytes_to_read: libc::size_t = iovs.iter().map(|x| x.len).sum();
143144let mut writer = IoVecWriter::new(iovs, mem);
145146let (num_copied, _num_removed_from_buf) = self
147.buffer
148 .as_ref()
149 .unwrap()
150 .borrow_mut()
151 .read(&mut writer, cb_queue)?;
152153// the read would block if all:
154 // 1. we could not read any bytes
155 // 2. we were asked to read >0 bytes
156 // 3. there are open descriptors that refer to the write end of the pipe
157if num_copied == 0
158&& num_bytes_to_read != 0
159&& self.buffer.as_ref().unwrap().borrow().num_writers() > 0
160{
161Err(Errno::EWOULDBLOCK.into())
162 } else {
163Ok(num_copied.try_into().unwrap())
164 }
165 }
166167pub fn writev(
168&mut self,
169 iovs: &[IoVec],
170 offset: Option<libc::off_t>,
171 _flags: libc::c_int,
172 mem: &mut MemoryManager,
173 cb_queue: &mut CallbackQueue,
174 ) -> Result<libc::ssize_t, SyscallError> {
175// pipes don't support seeking
176if offset.is_some() {
177return Err(linux_api::errno::Errno::ESPIPE.into());
178 }
179180// if the file is not open for writing, return EBADF
181if !self.mode.contains(FileMode::WRITE) {
182return Err(linux_api::errno::Errno::EBADF.into());
183 }
184185let mut buffer = self.buffer.as_ref().unwrap().borrow_mut();
186187if buffer.num_readers() == 0 {
188return Err(linux_api::errno::Errno::EPIPE.into());
189 }
190191if self.write_mode == WriteMode::Packet && !self.status.contains(FileStatus::DIRECT) {
192// switch to stream mode immediately, regardless of whether the buffer is empty or not
193self.write_mode = WriteMode::Stream;
194 } else if self.write_mode == WriteMode::Stream && self.status.contains(FileStatus::DIRECT) {
195// in linux, it seems that pipes only switch to packet mode when a new page is added to
196 // the buffer, so we simulate that behaviour for when the first page is added (when the
197 // buffer is empty)
198if !buffer.has_data() {
199self.write_mode = WriteMode::Packet;
200 }
201 }
202203let len: libc::size_t = iovs.iter().map(|x| x.len).sum();
204205let mut reader = IoVecReader::new(iovs, mem);
206207let num_copied = match self.write_mode {
208 WriteMode::Stream => buffer.write_stream(&mut reader, len, cb_queue)?,
209 WriteMode::Packet => {
210let mut num_written = 0;
211212loop {
213// the number of remaining bytes to write
214let bytes_remaining = len - num_written;
215216// if there are no more bytes to write (pipes don't support 0-length packets)
217if bytes_remaining == 0 {
218break num_written;
219 }
220221// split the packet up into PIPE_BUF-sized packets
222let bytes_to_write = std::cmp::min(bytes_remaining, libc::PIPE_BUF);
223224if let Err(e) = buffer.write_packet(&mut reader, bytes_to_write, cb_queue) {
225// if we've already written bytes, return those instead of an error
226if num_written > 0 {
227break num_written;
228 }
229return Err(e.into());
230 }
231232 num_written += bytes_to_write;
233 }
234 }
235 };
236237Ok(num_copied.try_into().unwrap())
238 }
239240pub fn ioctl(
241&mut self,
242 request: IoctlRequest,
243 _arg_ptr: ForeignPtr<()>,
244 _memory_manager: &mut MemoryManager,
245 ) -> SyscallResult {
246log::warn!("We do not yet handle ioctl request {request:?} on pipes");
247Err(Errno::EINVAL.into())
248 }
249250pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
251warn_once_then_debug!("Not all fields of 'struct stat' are implemented for pipes");
252253Ok(linux_api::stat::stat {
254// the device and inode are non-zero on linux, but shadow can't really give meaningful
255 // values here
256st_dev: 0,
257 st_ino: 0,
258// this may need to be >1 if shadow ever supports named pipes
259st_nlink: 1,
260// linux seems to use a mode of readable+writable for both ends of a pipe, but as a
261 // reminder this st_mode field is the mode of the pipe in the pipefs filesystem, not the
262 // mode of the pipe file (linux struct file) itself
263st_mode: (SFlag::S_IFIFO | SFlag::S_IRUSR | SFlag::S_IWUSR).bits(),
264// shadow pretends to run as root, although this gets messy since file-related syscalls
265 // that are passed through to linux have the uid/gid of the user running the simulation
266st_uid: 0,
267 st_gid: 0,
268 l__pad0: 0,
269 st_rdev: 0,
270// apparently the behaviour of this field depends on what unix you're running, but on
271 // linux it seems to always be 0
272st_size: 0,
273// TODO
274st_blksize: 0,
275 st_blocks: 0,
276 st_atime: 0,
277 st_atime_nsec: 0,
278 st_mtime: 0,
279 st_mtime_nsec: 0,
280 st_ctime: 0,
281 st_ctime_nsec: 0,
282 l__unused: [0; 3],
283 })
284 }
285286pub fn connect_to_buffer(
287 arc: &Arc<AtomicRefCell<Self>>,
288 buffer: Arc<AtomicRefCell<SharedBuf>>,
289 cb_queue: &mut CallbackQueue,
290 ) {
291let weak = Arc::downgrade(arc);
292let pipe = &mut *arc.borrow_mut();
293294 pipe.buffer = Some(buffer);
295296if pipe.mode.contains(FileMode::WRITE) {
297 pipe.writer_handle = Some(
298 pipe.buffer
299 .as_ref()
300 .unwrap()
301 .borrow_mut()
302 .add_writer(cb_queue),
303 );
304 }
305306if pipe.mode.contains(FileMode::READ) {
307 pipe.reader_handle = Some(
308 pipe.buffer
309 .as_ref()
310 .unwrap()
311 .borrow_mut()
312 .add_reader(cb_queue),
313 );
314 }
315316// buffer state changes that we want to receive events for
317let mut monitoring_state = BufferState::empty();
318319// if the file is open for reading, watch for the buffer to become readable or have no
320 // writers
321if pipe.mode.contains(FileMode::READ) {
322 monitoring_state.insert(BufferState::READABLE);
323 monitoring_state.insert(BufferState::NO_WRITERS);
324 }
325326// if the file is open for writing, watch for the buffer to become writable or have no
327 // readers
328if pipe.mode.contains(FileMode::WRITE) {
329 monitoring_state.insert(BufferState::WRITABLE);
330 monitoring_state.insert(BufferState::NO_READERS);
331 }
332333// TODO: We have to monitor all of the buffer's signals that we might want to pass to the
334 // pipe file's listeners, but this adds extra overhead when the file doesn't have any
335 // listeners that are listening for some of these signals. For example if there's a noisy
336 // signal `BufferSignals::FOO` that we want to forward to file listeners as
337 // `FileSignals::FOO`, we must always subscribe to `BufferSignals::FOO` signals even if the
338 // pipe doesn't have any file listeners subscribed to `FileSignals::FOO`. We don't know if
339 // there are currently file listeners subscribed to this file signal, so we must always emit
340 // them. This means we might receive a lot of notifications for `BufferSignals::Foo` that we
341 // ultimitely throw away since no file listener wants them. It would be great if there was a
342 // nice way to optimize this somehow so that we only listen for `BufferSignals::FOO` if we
343 // have a listener for `FileSignals::FOO`.
344let monitoring_signals = BufferSignals::BUFFER_GREW;
345346let handle = pipe.buffer.as_ref().unwrap().borrow_mut().add_listener(
347 monitoring_state,
348 monitoring_signals,
349move |buffer_state, buffer_signals, cb_queue| {
350// if the file hasn't been dropped
351if let Some(pipe) = weak.upgrade() {
352let mut pipe = pipe.borrow_mut();
353354// update the pipe file's state to align with the buffer's current state
355pipe.align_state_to_buffer(buffer_state, buffer_signals, cb_queue);
356 }
357 },
358 );
359360 pipe.buffer_event_handle = Some(handle);
361362// update the pipe file's initial state to align with the buffer's current state
363let buffer_state = pipe.buffer.as_ref().unwrap().borrow().state();
364 pipe.align_state_to_buffer(buffer_state, BufferSignals::empty(), cb_queue);
365 }
366367pub fn add_listener(
368&mut self,
369 monitoring_state: FileState,
370 monitoring_signals: FileSignals,
371 filter: StateListenerFilter,
372 notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
373 + Send
374 + Sync
375 + 'static,
376 ) -> StateListenHandle {
377self.event_source
378 .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
379 }
380381pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<c::StatusListener>) {
382self.event_source.add_legacy_listener(ptr);
383 }
384385pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener) {
386self.event_source.remove_legacy_listener(ptr);
387 }
388389pub fn state(&self) -> FileState {
390self.state
391 }
392393/// Align the pipe's state to the buffer state. For example if the buffer is both `READABLE` and
394 /// `WRITABLE`, and the pipe is only open in `READ` mode, the pipe's `READABLE` state will be
395 /// set and the `WRITABLE` state will be unchanged. This method may also pass through signals
396 /// from the buffer to any of the file's listeners.
397fn align_state_to_buffer(
398&mut self,
399 buffer_state: BufferState,
400 buffer_signals: BufferSignals,
401 cb_queue: &mut CallbackQueue,
402 ) {
403let mut mask = FileState::empty();
404let mut file_state = FileState::empty();
405let mut file_signals = FileSignals::empty();
406407// if the pipe is already closed, do nothing
408if self.state.contains(FileState::CLOSED) {
409return;
410 }
411412// only update the readable state if the file is open for reading
413if self.mode.contains(FileMode::READ) {
414 mask.insert(FileState::READABLE);
415// file is readable if the buffer is readable or there are no writers
416if buffer_state.intersects(BufferState::READABLE | BufferState::NO_WRITERS) {
417 file_state.insert(FileState::READABLE);
418 }
419if buffer_signals.intersects(BufferSignals::BUFFER_GREW) {
420 file_signals.insert(FileSignals::READ_BUFFER_GREW);
421 }
422 }
423424// only update the writable state if the file is open for writing
425if self.mode.contains(FileMode::WRITE) {
426 mask.insert(FileState::WRITABLE);
427// file is writable if the buffer is writable or there are no readers
428if buffer_state.intersects(BufferState::WRITABLE | BufferState::NO_READERS) {
429 file_state.insert(FileState::WRITABLE);
430 }
431 }
432433// update the file's state
434self.update_state(mask, file_state, file_signals, cb_queue);
435 }
436437fn update_state(
438&mut self,
439 mask: FileState,
440 state: FileState,
441 signals: FileSignals,
442 cb_queue: &mut CallbackQueue,
443 ) {
444let old_state = self.state;
445446// remove the masked flags, then copy the masked flags
447self.state.remove(mask);
448self.state.insert(state & mask);
449450self.handle_state_change(old_state, signals, cb_queue);
451 }
452453fn handle_state_change(
454&mut self,
455 old_state: FileState,
456 signals: FileSignals,
457 cb_queue: &mut CallbackQueue,
458 ) {
459let states_changed = self.state ^ old_state;
460461// if nothing changed
462if states_changed.is_empty() && signals.is_empty() {
463return;
464 }
465466self.event_source
467 .notify_listeners(self.state, states_changed, signals, cb_queue);
468 }
469}
470471#[derive(Debug, PartialEq, Eq)]
472enum WriteMode {
473 Stream,
474 Packet,
475}