1use std::collections::hash_map::Entry as HashMapEntry;
2use std::collections::{BinaryHeap, HashMap};
3use std::sync::{Arc, Weak};
45use atomic_refcell::AtomicRefCell;
6use linux_api::epoll::{EpollCtlOp, EpollEvents};
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use shadow_shim_helper_rs::syscall_types::ForeignPtr;
1011use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
12use crate::host::descriptor::{File, FileMode, FileSignals, FileState, FileStatus, SyscallResult};
13use crate::host::memory_manager::MemoryManager;
14use crate::host::syscall::io::IoVec;
15use crate::host::syscall::types::SyscallError;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::{HostTreePointer, ObjectCounter};
1819use self::entry::Entry;
20use self::key::{Key, PriorityKey};
2122use super::socket::Socket;
23use super::socket::inet::InetSocket;
2425// Private submodules to help us track the status of items we are monitoring.
26mod entry;
27mod key;
2829pub struct Epoll {
30 event_source: StateEventSource,
31 status: FileStatus,
32 state: FileState,
33// Should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
34 // this file.
35has_open_file: bool,
36// A counter for sorting entries, to guarantee fairness and determinism when reporting events.
37 // Because our ready set is a max heap, we initialize this counter to u64::MAX and count down as
38 // we assign values so that entries whose events were last reported longest ago are prioritized.
39pri_counter: u64,
40// Stores entries for all descriptors we are currently monitoring for events.
41monitoring: HashMap<Key, Entry>,
42// Stores keys for entries with events that are ready to be reported.
43ready: BinaryHeap<PriorityKey>,
44 _counter: ObjectCounter,
45}
4647impl Epoll {
48pub fn new() -> Arc<AtomicRefCell<Self>> {
49let mut epoll = Self {
50 event_source: StateEventSource::new(),
51 status: FileStatus::empty(),
52 state: FileState::ACTIVE,
53 has_open_file: false,
54 pri_counter: u64::MAX,
55 monitoring: HashMap::new(),
56 ready: BinaryHeap::new(),
57 _counter: ObjectCounter::new("Epoll"),
58 };
5960 CallbackQueue::queue_and_run_with_legacy(|cb_queue| epoll.refresh_state(cb_queue));
6162 Arc::new(AtomicRefCell::new(epoll))
63 }
6465pub fn status(&self) -> FileStatus {
66self.status
67 }
6869pub fn set_status(&mut self, status: FileStatus) {
70self.status = status;
71 }
7273pub fn mode(&self) -> FileMode {
74 FileMode::READ | FileMode::WRITE
75 }
7677pub fn has_open_file(&self) -> bool {
78self.has_open_file
79 }
8081pub fn supports_sa_restart(&self) -> bool {
82// Epoll always returns EINTR if interrupted by a signal handler regardless of the use of
83 // the SA_RESTART flag. See signal(7).
84false
85}
8687pub fn set_has_open_file(&mut self, val: bool) {
88self.has_open_file = val;
89 }
9091pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
92self.update_state(
93/* mask= */ FileState::all(),
94 FileState::CLOSED,
95 FileSignals::empty(),
96 cb_queue,
97 );
98Ok(())
99 }
100101pub fn readv(
102&mut self,
103 _iovs: &[IoVec],
104 _offset: Option<libc::off_t>,
105 _flags: libc::c_int,
106 _mem: &mut MemoryManager,
107 _cb_queue: &mut CallbackQueue,
108 ) -> Result<libc::ssize_t, SyscallError> {
109// EpollFDs don't support reading.
110Err(Errno::EINVAL.into())
111 }
112113pub fn writev(
114&mut self,
115 _iovs: &[IoVec],
116 _offset: Option<libc::off_t>,
117 _flags: libc::c_int,
118 _mem: &mut MemoryManager,
119 _cb_queue: &mut CallbackQueue,
120 ) -> Result<libc::ssize_t, SyscallError> {
121// EpollFDs don't support writing.
122Err(Errno::EINVAL.into())
123 }
124125pub fn ioctl(
126&mut self,
127 _request: IoctlRequest,
128 _arg_ptr: ForeignPtr<()>,
129 _mem: &mut MemoryManager,
130 ) -> SyscallResult {
131// After checking the epoll man pages and the Linux source for eventpoll.c, we don't think
132 // epoll descriptors support any ioctl operations.
133warn_once_then_trace!("Epoll does not support any ioctl requests.");
134// From ioctl(2): ENOTTY The specified request does not apply to the kind of object that the
135 // file descriptor fd references. Verified that epoll descriptors return this on Linux.
136Err(Errno::ENOTTY.into())
137 }
138139pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
140warn_once_then_debug!("We do not yet handle stat calls on epoll fds");
141Err(Errno::EINVAL.into())
142 }
143144/// Executes an epoll control operation on the target file.
145 ///
146 /// We think this panics if `target_file` is an instance of this epoll object due to recursive
147 /// mutable borrows (but it does not panic due to a check+panic).
148pub fn ctl(
149&mut self,
150 op: EpollCtlOp,
151 target_fd: i32,
152 target_file: File,
153 events: EpollEvents,
154 data: u64,
155 weak_self: Weak<AtomicRefCell<Epoll>>,
156 cb_queue: &mut CallbackQueue,
157 ) -> Result<(), Errno> {
158let state = target_file.borrow().state();
159let key = Key::new(target_fd, target_file);
160161log::trace!("Epoll editing fd {target_fd} while in state {state:?}");
162163match op {
164 EpollCtlOp::EPOLL_CTL_ADD => {
165// Check if we're trying to add a file that's already been closed. Typically a file
166 // that is referenced in the descriptor table should never be a closed file, but
167 // Shadow's C TCP sockets do close themselves even if there are still file handles
168 // (see `_tcp_endOfFileSignalled`), so we need to check this.
169 //
170 // TODO change this to an assertion when legacy tcp is removed.
171if state.contains(FileState::CLOSED) {
172log::warn!("Attempted to add a closed file {target_fd} to epoll");
173return Err(Errno::EBADF);
174 }
175176let mut entry = Entry::new(events, data, state);
177178// TODO remove when legacy tcp is removed.
179if matches!(
180 key.file(),
181 File::Socket(Socket::Inet(InetSocket::LegacyTcp(_)))
182 ) {
183 entry.set_legacy();
184 }
185186// From epoll_ctl(2): Returns EEXIST when "op was EPOLL_CTL_ADD, and the supplied
187 // file descriptor fd is already registered with this epoll instance."
188match self.monitoring.entry(key.clone()) {
189 HashMapEntry::Occupied(_) => return Err(Errno::EEXIST),
190 HashMapEntry::Vacant(x) => x.insert(entry),
191 };
192 }
193 EpollCtlOp::EPOLL_CTL_MOD => {
194let entry = self.monitoring.get_mut(&key).ok_or(Errno::ENOENT)?;
195 entry.modify(events, data, state);
196 }
197 EpollCtlOp::EPOLL_CTL_DEL => {
198// Stop monitoring this entry. Dropping the entry will cause it to stop listening
199 // for status changes on its inner `File` event source object.
200let entry = self.monitoring.remove(&key).ok_or(Errno::ENOENT)?;
201202// If it has a priority, then we also remove it from the ready set.
203if let Some(pri) = entry.priority() {
204self.ready.retain(|e| e.priority() != pri)
205 }
206 }
207 };
208209self.refresh_ready(key.clone());
210self.refresh_listener(weak_self, key);
211self.refresh_state(cb_queue);
212213Ok(())
214 }
215216pub fn add_listener(
217&mut self,
218 monitoring_state: FileState,
219 monitoring_signals: FileSignals,
220 filter: StateListenerFilter,
221 notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
222 + Send
223 + Sync
224 + 'static,
225 ) -> StateListenHandle {
226self.event_source
227 .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
228 }
229230pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<crate::cshadow::StatusListener>) {
231self.event_source.add_legacy_listener(ptr);
232 }
233234pub fn remove_legacy_listener(&mut self, ptr: *mut crate::cshadow::StatusListener) {
235self.event_source.remove_legacy_listener(ptr);
236 }
237238pub fn state(&self) -> FileState {
239self.state
240 }
241242fn refresh_state(&mut self, cb_queue: &mut CallbackQueue) {
243let readable = if self.has_ready_events() {
244 FileState::READABLE
245 } else {
246 Default::default()
247 };
248self.update_state(
249/* mask= */ FileState::READABLE,
250 readable,
251 FileSignals::empty(),
252 cb_queue,
253 );
254 }
255256fn update_state(
257&mut self,
258 mask: FileState,
259 state: FileState,
260 signals: FileSignals,
261 cb_queue: &mut CallbackQueue,
262 ) {
263let old_state = self.state;
264265// Remove the masked flags, then copy the masked flags.
266self.state.remove(mask);
267self.state.insert(state & mask);
268269self.handle_state_change(old_state, signals, cb_queue);
270 }
271272fn handle_state_change(
273&mut self,
274 old_state: FileState,
275 signals: FileSignals,
276 cb_queue: &mut CallbackQueue,
277 ) {
278let states_changed = self.state ^ old_state;
279280// If something changed, notify our listeners.
281if !states_changed.is_empty() || !signals.is_empty() {
282self.event_source
283 .notify_listeners(self.state, states_changed, signals, cb_queue);
284 }
285 }
286287fn refresh_listener(&mut self, weak_self: Weak<AtomicRefCell<Epoll>>, key: Key) {
288let Some(entry) = self.monitoring.get_mut(&key) else {
289return;
290 };
291292// Check what state and what signals we need to listen for this entry.
293 // We always listen for closed so we know when to stop monitoring the entry.
294let listen_state = entry.get_listener_state().union(FileState::CLOSED);
295let listen_signals = entry.get_listener_signals();
296let filter = StateListenerFilter::Always;
297298// Set up a callback so we get informed when the file changes.
299let file = key.file().clone();
300let handle = file.borrow_mut().add_listener(
301 listen_state,
302 listen_signals,
303 filter,
304move |state, changed, signals, cb_queue| {
305if let Some(epoll) = weak_self.upgrade() {
306 epoll
307 .borrow_mut()
308 .notify_entry(&key, state, changed, signals, cb_queue);
309 }
310 },
311 );
312 entry.set_listener_handle(Some(handle));
313 }
314315/// The file listener callback for when a monitored entry file status changes.
316fn notify_entry(
317&mut self,
318 key: &Key,
319 state: FileState,
320 changed: FileState,
321 signals: FileSignals,
322 cb_queue: &mut CallbackQueue,
323 ) {
324// Notify entry of file state change if we're still monitoring it.
325match self.monitoring.get_mut(&key.clone()) {
326Some(entry) => entry.notify(state, changed, signals),
327None => return,
328 };
329330// Update our ready set, which removes the key if the file closed.
331self.refresh_ready(key.clone());
332333// Also stop monitoring if the file was closed.
334if state.contains(FileState::CLOSED) {
335self.monitoring.remove(key);
336 }
337338// Update the readability of the epoll descriptor.
339self.refresh_state(cb_queue);
340 }
341342/// Ensures that the entry is in the ready set if it should be, or not if it shouldn't be.
343fn refresh_ready(&mut self, key: Key) {
344let Some(entry) = self.monitoring.get_mut(&key.clone()) else {
345return;
346 };
347348// The entry will not be ready if the file closed.
349if entry.has_ready_events() {
350if entry.priority().is_none() {
351// It's ready but not in the ready set yet.
352let pri = self.pri_counter;
353self.pri_counter -= 1;
354self.ready.push(PriorityKey::new(pri, key));
355 entry.set_priority(Some(pri));
356 }
357 } else if let Some(pri) = entry.priority() {
358// It's not ready anymore but it's in the ready set, so remove it.
359self.ready.retain(|e| e.priority() != pri);
360 entry.set_priority(None);
361 }
362 }
363364pub fn has_ready_events(&self) -> bool {
365 !self.ready.is_empty()
366 }
367368pub fn collect_ready_events(
369&mut self,
370 cb_queue: &mut CallbackQueue,
371 max_events: u32,
372 ) -> Vec<(EpollEvents, u64)> {
373let mut events = vec![];
374let mut keep = vec![];
375376while !self.ready.is_empty() && events.len() < max_events as usize {
377// Get the next ready entry.
378let pri_key = self.ready.pop().unwrap();
379let key = Key::from(pri_key);
380let entry = self.monitoring.get_mut(&key).unwrap();
381382// Just removed from the ready set, keep the priority consistent.
383entry.set_priority(None);
384385// It was ready so it should have events.
386debug_assert!(entry.has_ready_events());
387388// Store the events we should report to the managed process.
389events.push(entry.collect_ready_events().unwrap());
390391// It might still be ready even after we report.
392if entry.has_ready_events() {
393// It's ready again. Assign a new priority to ensure fairness with other entries.
394let pri = self.pri_counter;
395self.pri_counter -= 1;
396let pri_key = PriorityKey::new(pri, key);
397398// Use temp vec so we don't report the same entry twice in the same round.
399keep.push(pri_key);
400401// The entry will be in the ready set, keep its priority consistent.
402entry.set_priority(Some(pri));
403 }
404 }
405406// Add everything that is still ready back to the ready set.
407self.ready.extend(keep);
408409// We've mutated the ready list; we may need to trigger callbacks.
410self.refresh_state(cb_queue);
411412// The events to be returned to the managed process.
413events
414 }
415}