shadow_rs/host/descriptor/epoll/
mod.rs

1use std::collections::hash_map::Entry as HashMapEntry;
2use std::collections::{BinaryHeap, HashMap};
3use std::sync::{Arc, Weak};
4
5use atomic_refcell::AtomicRefCell;
6use linux_api::epoll::{EpollCtlOp, EpollEvents};
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use shadow_shim_helper_rs::syscall_types::ForeignPtr;
10
11use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
12use crate::host::descriptor::{File, FileMode, FileSignals, FileState, FileStatus, SyscallResult};
13use crate::host::memory_manager::MemoryManager;
14use crate::host::syscall::io::IoVec;
15use crate::host::syscall::types::SyscallError;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::{HostTreePointer, ObjectCounter};
18
19use self::entry::Entry;
20use self::key::{Key, PriorityKey};
21
22use super::socket::Socket;
23use super::socket::inet::InetSocket;
24
25// Private submodules to help us track the status of items we are monitoring.
26mod entry;
27mod key;
28
29pub struct Epoll {
30    event_source: StateEventSource,
31    status: FileStatus,
32    state: FileState,
33    // Should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
34    // this file.
35    has_open_file: bool,
36    // A counter for sorting entries, to guarantee fairness and determinism when reporting events.
37    // Because our ready set is a max heap, we initialize this counter to u64::MAX and count down as
38    // we assign values so that entries whose events were last reported longest ago are prioritized.
39    pri_counter: u64,
40    // Stores entries for all descriptors we are currently monitoring for events.
41    monitoring: HashMap<Key, Entry>,
42    // Stores keys for entries with events that are ready to be reported.
43    ready: BinaryHeap<PriorityKey>,
44    _counter: ObjectCounter,
45}
46
47impl Epoll {
48    pub fn new() -> Arc<AtomicRefCell<Self>> {
49        let mut epoll = Self {
50            event_source: StateEventSource::new(),
51            status: FileStatus::empty(),
52            state: FileState::ACTIVE,
53            has_open_file: false,
54            pri_counter: u64::MAX,
55            monitoring: HashMap::new(),
56            ready: BinaryHeap::new(),
57            _counter: ObjectCounter::new("Epoll"),
58        };
59
60        CallbackQueue::queue_and_run_with_legacy(|cb_queue| epoll.refresh_state(cb_queue));
61
62        Arc::new(AtomicRefCell::new(epoll))
63    }
64
65    pub fn status(&self) -> FileStatus {
66        self.status
67    }
68
69    pub fn set_status(&mut self, status: FileStatus) {
70        self.status = status;
71    }
72
73    pub fn mode(&self) -> FileMode {
74        FileMode::READ | FileMode::WRITE
75    }
76
77    pub fn has_open_file(&self) -> bool {
78        self.has_open_file
79    }
80
81    pub fn supports_sa_restart(&self) -> bool {
82        // Epoll always returns EINTR if interrupted by a signal handler regardless of the use of
83        // the SA_RESTART flag. See signal(7).
84        false
85    }
86
87    pub fn set_has_open_file(&mut self, val: bool) {
88        self.has_open_file = val;
89    }
90
91    pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
92        self.update_state(
93            /* mask= */ FileState::all(),
94            FileState::CLOSED,
95            FileSignals::empty(),
96            cb_queue,
97        );
98        Ok(())
99    }
100
101    pub fn readv(
102        &mut self,
103        _iovs: &[IoVec],
104        _offset: Option<libc::off_t>,
105        _flags: libc::c_int,
106        _mem: &mut MemoryManager,
107        _cb_queue: &mut CallbackQueue,
108    ) -> Result<libc::ssize_t, SyscallError> {
109        // EpollFDs don't support reading.
110        Err(Errno::EINVAL.into())
111    }
112
113    pub fn writev(
114        &mut self,
115        _iovs: &[IoVec],
116        _offset: Option<libc::off_t>,
117        _flags: libc::c_int,
118        _mem: &mut MemoryManager,
119        _cb_queue: &mut CallbackQueue,
120    ) -> Result<libc::ssize_t, SyscallError> {
121        // EpollFDs don't support writing.
122        Err(Errno::EINVAL.into())
123    }
124
125    pub fn ioctl(
126        &mut self,
127        _request: IoctlRequest,
128        _arg_ptr: ForeignPtr<()>,
129        _mem: &mut MemoryManager,
130    ) -> SyscallResult {
131        // After checking the epoll man pages and the Linux source for eventpoll.c, we don't think
132        // epoll descriptors support any ioctl operations.
133        warn_once_then_trace!("Epoll does not support any ioctl requests.");
134        // From ioctl(2): ENOTTY The specified request does not apply to the kind of object that the
135        // file descriptor fd references. Verified that epoll descriptors return this on Linux.
136        Err(Errno::ENOTTY.into())
137    }
138
139    pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
140        warn_once_then_debug!("We do not yet handle stat calls on epoll fds");
141        Err(Errno::EINVAL.into())
142    }
143
144    /// Executes an epoll control operation on the target file.
145    ///
146    /// We think this panics if `target_file` is an instance of this epoll object due to recursive
147    /// mutable borrows (but it does not panic due to a check+panic).
148    pub fn ctl(
149        &mut self,
150        op: EpollCtlOp,
151        target_fd: i32,
152        target_file: File,
153        events: EpollEvents,
154        data: u64,
155        weak_self: Weak<AtomicRefCell<Epoll>>,
156        cb_queue: &mut CallbackQueue,
157    ) -> Result<(), Errno> {
158        let state = target_file.borrow().state();
159        let key = Key::new(target_fd, target_file);
160
161        log::trace!("Epoll editing fd {target_fd} while in state {state:?}");
162
163        match op {
164            EpollCtlOp::EPOLL_CTL_ADD => {
165                // Check if we're trying to add a file that's already been closed. Typically a file
166                // that is referenced in the descriptor table should never be a closed file, but
167                // Shadow's C TCP sockets do close themselves even if there are still file handles
168                // (see `_tcp_endOfFileSignalled`), so we need to check this.
169                //
170                // TODO change this to an assertion when legacy tcp is removed.
171                if state.contains(FileState::CLOSED) {
172                    log::warn!("Attempted to add a closed file {target_fd} to epoll");
173                    return Err(Errno::EBADF);
174                }
175
176                let mut entry = Entry::new(events, data, state);
177
178                // TODO remove when legacy tcp is removed.
179                if matches!(
180                    key.file(),
181                    File::Socket(Socket::Inet(InetSocket::LegacyTcp(_)))
182                ) {
183                    entry.set_legacy();
184                }
185
186                // From epoll_ctl(2): Returns EEXIST when "op was EPOLL_CTL_ADD, and the supplied
187                // file descriptor fd is already registered with this epoll instance."
188                match self.monitoring.entry(key.clone()) {
189                    HashMapEntry::Occupied(_) => return Err(Errno::EEXIST),
190                    HashMapEntry::Vacant(x) => x.insert(entry),
191                };
192            }
193            EpollCtlOp::EPOLL_CTL_MOD => {
194                let entry = self.monitoring.get_mut(&key).ok_or(Errno::ENOENT)?;
195                entry.modify(events, data, state);
196            }
197            EpollCtlOp::EPOLL_CTL_DEL => {
198                // Stop monitoring this entry. Dropping the entry will cause it to stop listening
199                // for status changes on its inner `File` event source object.
200                let entry = self.monitoring.remove(&key).ok_or(Errno::ENOENT)?;
201
202                // If it has a priority, then we also remove it from the ready set.
203                if let Some(pri) = entry.priority() {
204                    self.ready.retain(|e| e.priority() != pri)
205                }
206            }
207        };
208
209        self.refresh_ready(key.clone());
210        self.refresh_listener(weak_self, key);
211        self.refresh_state(cb_queue);
212
213        Ok(())
214    }
215
216    pub fn add_listener(
217        &mut self,
218        monitoring_state: FileState,
219        monitoring_signals: FileSignals,
220        filter: StateListenerFilter,
221        notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
222        + Send
223        + Sync
224        + 'static,
225    ) -> StateListenHandle {
226        self.event_source
227            .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
228    }
229
230    pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<crate::cshadow::StatusListener>) {
231        self.event_source.add_legacy_listener(ptr);
232    }
233
234    pub fn remove_legacy_listener(&mut self, ptr: *mut crate::cshadow::StatusListener) {
235        self.event_source.remove_legacy_listener(ptr);
236    }
237
238    pub fn state(&self) -> FileState {
239        self.state
240    }
241
242    fn refresh_state(&mut self, cb_queue: &mut CallbackQueue) {
243        let readable = self
244            .has_ready_events()
245            .then_some(FileState::READABLE)
246            .unwrap_or_default();
247        self.update_state(
248            /* mask= */ FileState::READABLE,
249            readable,
250            FileSignals::empty(),
251            cb_queue,
252        );
253    }
254
255    fn update_state(
256        &mut self,
257        mask: FileState,
258        state: FileState,
259        signals: FileSignals,
260        cb_queue: &mut CallbackQueue,
261    ) {
262        let old_state = self.state;
263
264        // Remove the masked flags, then copy the masked flags.
265        self.state.remove(mask);
266        self.state.insert(state & mask);
267
268        self.handle_state_change(old_state, signals, cb_queue);
269    }
270
271    fn handle_state_change(
272        &mut self,
273        old_state: FileState,
274        signals: FileSignals,
275        cb_queue: &mut CallbackQueue,
276    ) {
277        let states_changed = self.state ^ old_state;
278
279        // If something changed, notify our listeners.
280        if !states_changed.is_empty() || !signals.is_empty() {
281            self.event_source
282                .notify_listeners(self.state, states_changed, signals, cb_queue);
283        }
284    }
285
286    fn refresh_listener(&mut self, weak_self: Weak<AtomicRefCell<Epoll>>, key: Key) {
287        let Some(entry) = self.monitoring.get_mut(&key) else {
288            return;
289        };
290
291        // Check what state and what signals we need to listen for this entry.
292        // We always listen for closed so we know when to stop monitoring the entry.
293        let listen_state = entry.get_listener_state().union(FileState::CLOSED);
294        let listen_signals = entry.get_listener_signals();
295        let filter = StateListenerFilter::Always;
296
297        // Set up a callback so we get informed when the file changes.
298        let file = key.file().clone();
299        let handle = file.borrow_mut().add_listener(
300            listen_state,
301            listen_signals,
302            filter,
303            move |state, changed, signals, cb_queue| {
304                if let Some(epoll) = weak_self.upgrade() {
305                    epoll
306                        .borrow_mut()
307                        .notify_entry(&key, state, changed, signals, cb_queue);
308                }
309            },
310        );
311        entry.set_listener_handle(Some(handle));
312    }
313
314    /// The file listener callback for when a monitored entry file status changes.
315    fn notify_entry(
316        &mut self,
317        key: &Key,
318        state: FileState,
319        changed: FileState,
320        signals: FileSignals,
321        cb_queue: &mut CallbackQueue,
322    ) {
323        // Notify entry of file state change if we're still monitoring it.
324        match self.monitoring.get_mut(&key.clone()) {
325            Some(entry) => entry.notify(state, changed, signals),
326            None => return,
327        };
328
329        // Update our ready set, which removes the key if the file closed.
330        self.refresh_ready(key.clone());
331
332        // Also stop monitoring if the file was closed.
333        if state.contains(FileState::CLOSED) {
334            self.monitoring.remove(key);
335        }
336
337        // Update the readability of the epoll descriptor.
338        self.refresh_state(cb_queue);
339    }
340
341    /// Ensures that the entry is in the ready set if it should be, or not if it shouldn't be.
342    fn refresh_ready(&mut self, key: Key) {
343        let Some(entry) = self.monitoring.get_mut(&key.clone()) else {
344            return;
345        };
346
347        // The entry will not be ready if the file closed.
348        if entry.has_ready_events() {
349            if entry.priority().is_none() {
350                // It's ready but not in the ready set yet.
351                let pri = self.pri_counter;
352                self.pri_counter -= 1;
353                self.ready.push(PriorityKey::new(pri, key));
354                entry.set_priority(Some(pri));
355            }
356        } else if let Some(pri) = entry.priority() {
357            // It's not ready anymore but it's in the ready set, so remove it.
358            self.ready.retain(|e| e.priority() != pri);
359            entry.set_priority(None);
360        }
361    }
362
363    pub fn has_ready_events(&self) -> bool {
364        !self.ready.is_empty()
365    }
366
367    pub fn collect_ready_events(
368        &mut self,
369        cb_queue: &mut CallbackQueue,
370        max_events: u32,
371    ) -> Vec<(EpollEvents, u64)> {
372        let mut events = vec![];
373        let mut keep = vec![];
374
375        while !self.ready.is_empty() && events.len() < max_events as usize {
376            // Get the next ready entry.
377            let pri_key = self.ready.pop().unwrap();
378            let key = Key::from(pri_key);
379            let entry = self.monitoring.get_mut(&key).unwrap();
380
381            // Just removed from the ready set, keep the priority consistent.
382            entry.set_priority(None);
383
384            // It was ready so it should have events.
385            debug_assert!(entry.has_ready_events());
386
387            // Store the events we should report to the managed process.
388            events.push(entry.collect_ready_events().unwrap());
389
390            // It might still be ready even after we report.
391            if entry.has_ready_events() {
392                // It's ready again. Assign a new priority to ensure fairness with other entries.
393                let pri = self.pri_counter;
394                self.pri_counter -= 1;
395                let pri_key = PriorityKey::new(pri, key);
396
397                // Use temp vec so we don't report the same entry twice in the same round.
398                keep.push(pri_key);
399
400                // The entry will be in the ready set, keep its priority consistent.
401                entry.set_priority(Some(pri));
402            }
403        }
404
405        // Add everything that is still ready back to the ready set.
406        self.ready.extend(keep);
407
408        // We've mutated the ready list; we may need to trigger callbacks.
409        self.refresh_state(cb_queue);
410
411        // The events to be returned to the managed process.
412        events
413    }
414}