shadow_rs/host/descriptor/epoll/
mod.rs

1use std::collections::hash_map::Entry as HashMapEntry;
2use std::collections::{BinaryHeap, HashMap};
3use std::sync::{Arc, Weak};
4
5use atomic_refcell::AtomicRefCell;
6use linux_api::epoll::{EpollCtlOp, EpollEvents};
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use shadow_shim_helper_rs::syscall_types::ForeignPtr;
10
11use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
12use crate::host::descriptor::{File, FileMode, FileSignals, FileState, FileStatus, SyscallResult};
13use crate::host::memory_manager::MemoryManager;
14use crate::host::syscall::io::IoVec;
15use crate::host::syscall::types::SyscallError;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::{HostTreePointer, ObjectCounter};
18
19use self::entry::Entry;
20use self::key::{Key, PriorityKey};
21
22use super::socket::Socket;
23use super::socket::inet::InetSocket;
24
25// Private submodules to help us track the status of items we are monitoring.
26mod entry;
27mod key;
28
29pub struct Epoll {
30    event_source: StateEventSource,
31    status: FileStatus,
32    state: FileState,
33    // Should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
34    // this file.
35    has_open_file: bool,
36    // A counter for sorting entries, to guarantee fairness and determinism when reporting events.
37    // Because our ready set is a max heap, we initialize this counter to u64::MAX and count down as
38    // we assign values so that entries whose events were last reported longest ago are prioritized.
39    pri_counter: u64,
40    // Stores entries for all descriptors we are currently monitoring for events.
41    monitoring: HashMap<Key, Entry>,
42    // Stores keys for entries with events that are ready to be reported.
43    ready: BinaryHeap<PriorityKey>,
44    _counter: ObjectCounter,
45}
46
47impl Epoll {
48    pub fn new() -> Arc<AtomicRefCell<Self>> {
49        let mut epoll = Self {
50            event_source: StateEventSource::new(),
51            status: FileStatus::empty(),
52            state: FileState::ACTIVE,
53            has_open_file: false,
54            pri_counter: u64::MAX,
55            monitoring: HashMap::new(),
56            ready: BinaryHeap::new(),
57            _counter: ObjectCounter::new("Epoll"),
58        };
59
60        CallbackQueue::queue_and_run_with_legacy(|cb_queue| epoll.refresh_state(cb_queue));
61
62        Arc::new(AtomicRefCell::new(epoll))
63    }
64
65    pub fn status(&self) -> FileStatus {
66        self.status
67    }
68
69    pub fn set_status(&mut self, status: FileStatus) {
70        self.status = status;
71    }
72
73    pub fn mode(&self) -> FileMode {
74        FileMode::READ | FileMode::WRITE
75    }
76
77    pub fn has_open_file(&self) -> bool {
78        self.has_open_file
79    }
80
81    pub fn supports_sa_restart(&self) -> bool {
82        // Epoll always returns EINTR if interrupted by a signal handler regardless of the use of
83        // the SA_RESTART flag. See signal(7).
84        false
85    }
86
87    pub fn set_has_open_file(&mut self, val: bool) {
88        self.has_open_file = val;
89    }
90
91    pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
92        self.update_state(
93            /* mask= */ FileState::all(),
94            FileState::CLOSED,
95            FileSignals::empty(),
96            cb_queue,
97        );
98        Ok(())
99    }
100
101    pub fn readv(
102        &mut self,
103        _iovs: &[IoVec],
104        _offset: Option<libc::off_t>,
105        _flags: libc::c_int,
106        _mem: &mut MemoryManager,
107        _cb_queue: &mut CallbackQueue,
108    ) -> Result<libc::ssize_t, SyscallError> {
109        // EpollFDs don't support reading.
110        Err(Errno::EINVAL.into())
111    }
112
113    pub fn writev(
114        &mut self,
115        _iovs: &[IoVec],
116        _offset: Option<libc::off_t>,
117        _flags: libc::c_int,
118        _mem: &mut MemoryManager,
119        _cb_queue: &mut CallbackQueue,
120    ) -> Result<libc::ssize_t, SyscallError> {
121        // EpollFDs don't support writing.
122        Err(Errno::EINVAL.into())
123    }
124
125    pub fn ioctl(
126        &mut self,
127        _request: IoctlRequest,
128        _arg_ptr: ForeignPtr<()>,
129        _mem: &mut MemoryManager,
130    ) -> SyscallResult {
131        // After checking the epoll man pages and the Linux source for eventpoll.c, we don't think
132        // epoll descriptors support any ioctl operations.
133        warn_once_then_trace!("Epoll does not support any ioctl requests.");
134        // From ioctl(2): ENOTTY The specified request does not apply to the kind of object that the
135        // file descriptor fd references. Verified that epoll descriptors return this on Linux.
136        Err(Errno::ENOTTY.into())
137    }
138
139    pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
140        warn_once_then_debug!("We do not yet handle stat calls on epoll fds");
141        Err(Errno::EINVAL.into())
142    }
143
144    /// Executes an epoll control operation on the target file.
145    ///
146    /// We think this panics if `target_file` is an instance of this epoll object due to recursive
147    /// mutable borrows (but it does not panic due to a check+panic).
148    pub fn ctl(
149        &mut self,
150        op: EpollCtlOp,
151        target_fd: i32,
152        target_file: File,
153        events: EpollEvents,
154        data: u64,
155        weak_self: Weak<AtomicRefCell<Epoll>>,
156        cb_queue: &mut CallbackQueue,
157    ) -> Result<(), Errno> {
158        let state = target_file.borrow().state();
159        let key = Key::new(target_fd, target_file);
160
161        log::trace!("Epoll editing fd {target_fd} while in state {state:?}");
162
163        match op {
164            EpollCtlOp::EPOLL_CTL_ADD => {
165                // Check if we're trying to add a file that's already been closed. Typically a file
166                // that is referenced in the descriptor table should never be a closed file, but
167                // Shadow's C TCP sockets do close themselves even if there are still file handles
168                // (see `_tcp_endOfFileSignalled`), so we need to check this.
169                //
170                // TODO change this to an assertion when legacy tcp is removed.
171                if state.contains(FileState::CLOSED) {
172                    log::warn!("Attempted to add a closed file {target_fd} to epoll");
173                    return Err(Errno::EBADF);
174                }
175
176                let mut entry = Entry::new(events, data, state);
177
178                // TODO remove when legacy tcp is removed.
179                if matches!(
180                    key.file(),
181                    File::Socket(Socket::Inet(InetSocket::LegacyTcp(_)))
182                ) {
183                    entry.set_legacy();
184                }
185
186                // From epoll_ctl(2): Returns EEXIST when "op was EPOLL_CTL_ADD, and the supplied
187                // file descriptor fd is already registered with this epoll instance."
188                match self.monitoring.entry(key.clone()) {
189                    HashMapEntry::Occupied(_) => return Err(Errno::EEXIST),
190                    HashMapEntry::Vacant(x) => x.insert(entry),
191                };
192            }
193            EpollCtlOp::EPOLL_CTL_MOD => {
194                let entry = self.monitoring.get_mut(&key).ok_or(Errno::ENOENT)?;
195                entry.modify(events, data, state);
196            }
197            EpollCtlOp::EPOLL_CTL_DEL => {
198                // Stop monitoring this entry. Dropping the entry will cause it to stop listening
199                // for status changes on its inner `File` event source object.
200                let entry = self.monitoring.remove(&key).ok_or(Errno::ENOENT)?;
201
202                // If it has a priority, then we also remove it from the ready set.
203                if let Some(pri) = entry.priority() {
204                    self.ready.retain(|e| e.priority() != pri)
205                }
206            }
207        };
208
209        self.refresh_ready(key.clone());
210        self.refresh_listener(weak_self, key);
211        self.refresh_state(cb_queue);
212
213        Ok(())
214    }
215
216    pub fn add_listener(
217        &mut self,
218        monitoring_state: FileState,
219        monitoring_signals: FileSignals,
220        filter: StateListenerFilter,
221        notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
222        + Send
223        + Sync
224        + 'static,
225    ) -> StateListenHandle {
226        self.event_source
227            .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
228    }
229
230    pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<crate::cshadow::StatusListener>) {
231        self.event_source.add_legacy_listener(ptr);
232    }
233
234    pub fn remove_legacy_listener(&mut self, ptr: *mut crate::cshadow::StatusListener) {
235        self.event_source.remove_legacy_listener(ptr);
236    }
237
238    pub fn state(&self) -> FileState {
239        self.state
240    }
241
242    fn refresh_state(&mut self, cb_queue: &mut CallbackQueue) {
243        let readable = if self.has_ready_events() {
244            FileState::READABLE
245        } else {
246            Default::default()
247        };
248        self.update_state(
249            /* mask= */ FileState::READABLE,
250            readable,
251            FileSignals::empty(),
252            cb_queue,
253        );
254    }
255
256    fn update_state(
257        &mut self,
258        mask: FileState,
259        state: FileState,
260        signals: FileSignals,
261        cb_queue: &mut CallbackQueue,
262    ) {
263        let old_state = self.state;
264
265        // Remove the masked flags, then copy the masked flags.
266        self.state.remove(mask);
267        self.state.insert(state & mask);
268
269        self.handle_state_change(old_state, signals, cb_queue);
270    }
271
272    fn handle_state_change(
273        &mut self,
274        old_state: FileState,
275        signals: FileSignals,
276        cb_queue: &mut CallbackQueue,
277    ) {
278        let states_changed = self.state ^ old_state;
279
280        // If something changed, notify our listeners.
281        if !states_changed.is_empty() || !signals.is_empty() {
282            self.event_source
283                .notify_listeners(self.state, states_changed, signals, cb_queue);
284        }
285    }
286
287    fn refresh_listener(&mut self, weak_self: Weak<AtomicRefCell<Epoll>>, key: Key) {
288        let Some(entry) = self.monitoring.get_mut(&key) else {
289            return;
290        };
291
292        // Check what state and what signals we need to listen for this entry.
293        // We always listen for closed so we know when to stop monitoring the entry.
294        let listen_state = entry.get_listener_state().union(FileState::CLOSED);
295        let listen_signals = entry.get_listener_signals();
296        let filter = StateListenerFilter::Always;
297
298        // Set up a callback so we get informed when the file changes.
299        let file = key.file().clone();
300        let handle = file.borrow_mut().add_listener(
301            listen_state,
302            listen_signals,
303            filter,
304            move |state, changed, signals, cb_queue| {
305                if let Some(epoll) = weak_self.upgrade() {
306                    epoll
307                        .borrow_mut()
308                        .notify_entry(&key, state, changed, signals, cb_queue);
309                }
310            },
311        );
312        entry.set_listener_handle(Some(handle));
313    }
314
315    /// The file listener callback for when a monitored entry file status changes.
316    fn notify_entry(
317        &mut self,
318        key: &Key,
319        state: FileState,
320        changed: FileState,
321        signals: FileSignals,
322        cb_queue: &mut CallbackQueue,
323    ) {
324        // Notify entry of file state change if we're still monitoring it.
325        match self.monitoring.get_mut(&key.clone()) {
326            Some(entry) => entry.notify(state, changed, signals),
327            None => return,
328        };
329
330        // Update our ready set, which removes the key if the file closed.
331        self.refresh_ready(key.clone());
332
333        // Also stop monitoring if the file was closed.
334        if state.contains(FileState::CLOSED) {
335            self.monitoring.remove(key);
336        }
337
338        // Update the readability of the epoll descriptor.
339        self.refresh_state(cb_queue);
340    }
341
342    /// Ensures that the entry is in the ready set if it should be, or not if it shouldn't be.
343    fn refresh_ready(&mut self, key: Key) {
344        let Some(entry) = self.monitoring.get_mut(&key.clone()) else {
345            return;
346        };
347
348        // The entry will not be ready if the file closed.
349        if entry.has_ready_events() {
350            if entry.priority().is_none() {
351                // It's ready but not in the ready set yet.
352                let pri = self.pri_counter;
353                self.pri_counter -= 1;
354                self.ready.push(PriorityKey::new(pri, key));
355                entry.set_priority(Some(pri));
356            }
357        } else if let Some(pri) = entry.priority() {
358            // It's not ready anymore but it's in the ready set, so remove it.
359            self.ready.retain(|e| e.priority() != pri);
360            entry.set_priority(None);
361        }
362    }
363
364    pub fn has_ready_events(&self) -> bool {
365        !self.ready.is_empty()
366    }
367
368    pub fn collect_ready_events(
369        &mut self,
370        cb_queue: &mut CallbackQueue,
371        max_events: u32,
372    ) -> Vec<(EpollEvents, u64)> {
373        let mut events = vec![];
374        let mut keep = vec![];
375
376        while !self.ready.is_empty() && events.len() < max_events as usize {
377            // Get the next ready entry.
378            let pri_key = self.ready.pop().unwrap();
379            let key = Key::from(pri_key);
380            let entry = self.monitoring.get_mut(&key).unwrap();
381
382            // Just removed from the ready set, keep the priority consistent.
383            entry.set_priority(None);
384
385            // It was ready so it should have events.
386            debug_assert!(entry.has_ready_events());
387
388            // Store the events we should report to the managed process.
389            events.push(entry.collect_ready_events().unwrap());
390
391            // It might still be ready even after we report.
392            if entry.has_ready_events() {
393                // It's ready again. Assign a new priority to ensure fairness with other entries.
394                let pri = self.pri_counter;
395                self.pri_counter -= 1;
396                let pri_key = PriorityKey::new(pri, key);
397
398                // Use temp vec so we don't report the same entry twice in the same round.
399                keep.push(pri_key);
400
401                // The entry will be in the ready set, keep its priority consistent.
402                entry.set_priority(Some(pri));
403            }
404        }
405
406        // Add everything that is still ready back to the ready set.
407        self.ready.extend(keep);
408
409        // We've mutated the ready list; we may need to trigger callbacks.
410        self.refresh_state(cb_queue);
411
412        // The events to be returned to the managed process.
413        events
414    }
415}