1use std::collections::hash_map::Entry as HashMapEntry;
2use std::collections::{BinaryHeap, HashMap};
3use std::sync::{Arc, Weak};
45use atomic_refcell::AtomicRefCell;
6use linux_api::epoll::{EpollCtlOp, EpollEvents};
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use shadow_shim_helper_rs::syscall_types::ForeignPtr;
1011use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
12use crate::host::descriptor::{File, FileMode, FileSignals, FileState, FileStatus, SyscallResult};
13use crate::host::memory_manager::MemoryManager;
14use crate::host::syscall::io::IoVec;
15use crate::host::syscall::types::SyscallError;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::{HostTreePointer, ObjectCounter};
1819use self::entry::Entry;
20use self::key::{Key, PriorityKey};
2122use super::socket::Socket;
23use super::socket::inet::InetSocket;
2425// Private submodules to help us track the status of items we are monitoring.
26mod entry;
27mod key;
2829pub struct Epoll {
30 event_source: StateEventSource,
31 status: FileStatus,
32 state: FileState,
33// Should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
34 // this file.
35has_open_file: bool,
36// A counter for sorting entries, to guarantee fairness and determinism when reporting events.
37 // Because our ready set is a max heap, we initialize this counter to u64::MAX and count down as
38 // we assign values so that entries whose events were last reported longest ago are prioritized.
39pri_counter: u64,
40// Stores entries for all descriptors we are currently monitoring for events.
41monitoring: HashMap<Key, Entry>,
42// Stores keys for entries with events that are ready to be reported.
43ready: BinaryHeap<PriorityKey>,
44 _counter: ObjectCounter,
45}
4647impl Epoll {
48pub fn new() -> Arc<AtomicRefCell<Self>> {
49let mut epoll = Self {
50 event_source: StateEventSource::new(),
51 status: FileStatus::empty(),
52 state: FileState::ACTIVE,
53 has_open_file: false,
54 pri_counter: u64::MAX,
55 monitoring: HashMap::new(),
56 ready: BinaryHeap::new(),
57 _counter: ObjectCounter::new("Epoll"),
58 };
5960 CallbackQueue::queue_and_run_with_legacy(|cb_queue| epoll.refresh_state(cb_queue));
6162 Arc::new(AtomicRefCell::new(epoll))
63 }
6465pub fn status(&self) -> FileStatus {
66self.status
67 }
6869pub fn set_status(&mut self, status: FileStatus) {
70self.status = status;
71 }
7273pub fn mode(&self) -> FileMode {
74 FileMode::READ | FileMode::WRITE
75 }
7677pub fn has_open_file(&self) -> bool {
78self.has_open_file
79 }
8081pub fn supports_sa_restart(&self) -> bool {
82// Epoll always returns EINTR if interrupted by a signal handler regardless of the use of
83 // the SA_RESTART flag. See signal(7).
84false
85}
8687pub fn set_has_open_file(&mut self, val: bool) {
88self.has_open_file = val;
89 }
9091pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
92self.update_state(
93/* mask= */ FileState::all(),
94 FileState::CLOSED,
95 FileSignals::empty(),
96 cb_queue,
97 );
98Ok(())
99 }
100101pub fn readv(
102&mut self,
103 _iovs: &[IoVec],
104 _offset: Option<libc::off_t>,
105 _flags: libc::c_int,
106 _mem: &mut MemoryManager,
107 _cb_queue: &mut CallbackQueue,
108 ) -> Result<libc::ssize_t, SyscallError> {
109// EpollFDs don't support reading.
110Err(Errno::EINVAL.into())
111 }
112113pub fn writev(
114&mut self,
115 _iovs: &[IoVec],
116 _offset: Option<libc::off_t>,
117 _flags: libc::c_int,
118 _mem: &mut MemoryManager,
119 _cb_queue: &mut CallbackQueue,
120 ) -> Result<libc::ssize_t, SyscallError> {
121// EpollFDs don't support writing.
122Err(Errno::EINVAL.into())
123 }
124125pub fn ioctl(
126&mut self,
127 _request: IoctlRequest,
128 _arg_ptr: ForeignPtr<()>,
129 _mem: &mut MemoryManager,
130 ) -> SyscallResult {
131// After checking the epoll man pages and the Linux source for eventpoll.c, we don't think
132 // epoll descriptors support any ioctl operations.
133warn_once_then_trace!("Epoll does not support any ioctl requests.");
134// From ioctl(2): ENOTTY The specified request does not apply to the kind of object that the
135 // file descriptor fd references. Verified that epoll descriptors return this on Linux.
136Err(Errno::ENOTTY.into())
137 }
138139pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
140warn_once_then_debug!("We do not yet handle stat calls on epoll fds");
141Err(Errno::EINVAL.into())
142 }
143144/// Executes an epoll control operation on the target file.
145 ///
146 /// We think this panics if `target_file` is an instance of this epoll object due to recursive
147 /// mutable borrows (but it does not panic due to a check+panic).
148pub fn ctl(
149&mut self,
150 op: EpollCtlOp,
151 target_fd: i32,
152 target_file: File,
153 events: EpollEvents,
154 data: u64,
155 weak_self: Weak<AtomicRefCell<Epoll>>,
156 cb_queue: &mut CallbackQueue,
157 ) -> Result<(), Errno> {
158let state = target_file.borrow().state();
159let key = Key::new(target_fd, target_file);
160161log::trace!("Epoll editing fd {target_fd} while in state {state:?}");
162163match op {
164 EpollCtlOp::EPOLL_CTL_ADD => {
165// Check if we're trying to add a file that's already been closed. Typically a file
166 // that is referenced in the descriptor table should never be a closed file, but
167 // Shadow's C TCP sockets do close themselves even if there are still file handles
168 // (see `_tcp_endOfFileSignalled`), so we need to check this.
169 //
170 // TODO change this to an assertion when legacy tcp is removed.
171if state.contains(FileState::CLOSED) {
172log::warn!("Attempted to add a closed file {target_fd} to epoll");
173return Err(Errno::EBADF);
174 }
175176let mut entry = Entry::new(events, data, state);
177178// TODO remove when legacy tcp is removed.
179if matches!(
180 key.file(),
181 File::Socket(Socket::Inet(InetSocket::LegacyTcp(_)))
182 ) {
183 entry.set_legacy();
184 }
185186// From epoll_ctl(2): Returns EEXIST when "op was EPOLL_CTL_ADD, and the supplied
187 // file descriptor fd is already registered with this epoll instance."
188match self.monitoring.entry(key.clone()) {
189 HashMapEntry::Occupied(_) => return Err(Errno::EEXIST),
190 HashMapEntry::Vacant(x) => x.insert(entry),
191 };
192 }
193 EpollCtlOp::EPOLL_CTL_MOD => {
194let entry = self.monitoring.get_mut(&key).ok_or(Errno::ENOENT)?;
195 entry.modify(events, data, state);
196 }
197 EpollCtlOp::EPOLL_CTL_DEL => {
198// Stop monitoring this entry. Dropping the entry will cause it to stop listening
199 // for status changes on its inner `File` event source object.
200let entry = self.monitoring.remove(&key).ok_or(Errno::ENOENT)?;
201202// If it has a priority, then we also remove it from the ready set.
203if let Some(pri) = entry.priority() {
204self.ready.retain(|e| e.priority() != pri)
205 }
206 }
207 };
208209self.refresh_ready(key.clone());
210self.refresh_listener(weak_self, key);
211self.refresh_state(cb_queue);
212213Ok(())
214 }
215216pub fn add_listener(
217&mut self,
218 monitoring_state: FileState,
219 monitoring_signals: FileSignals,
220 filter: StateListenerFilter,
221 notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
222 + Send
223 + Sync
224 + 'static,
225 ) -> StateListenHandle {
226self.event_source
227 .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
228 }
229230pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<crate::cshadow::StatusListener>) {
231self.event_source.add_legacy_listener(ptr);
232 }
233234pub fn remove_legacy_listener(&mut self, ptr: *mut crate::cshadow::StatusListener) {
235self.event_source.remove_legacy_listener(ptr);
236 }
237238pub fn state(&self) -> FileState {
239self.state
240 }
241242fn refresh_state(&mut self, cb_queue: &mut CallbackQueue) {
243let readable = self
244.has_ready_events()
245 .then_some(FileState::READABLE)
246 .unwrap_or_default();
247self.update_state(
248/* mask= */ FileState::READABLE,
249 readable,
250 FileSignals::empty(),
251 cb_queue,
252 );
253 }
254255fn update_state(
256&mut self,
257 mask: FileState,
258 state: FileState,
259 signals: FileSignals,
260 cb_queue: &mut CallbackQueue,
261 ) {
262let old_state = self.state;
263264// Remove the masked flags, then copy the masked flags.
265self.state.remove(mask);
266self.state.insert(state & mask);
267268self.handle_state_change(old_state, signals, cb_queue);
269 }
270271fn handle_state_change(
272&mut self,
273 old_state: FileState,
274 signals: FileSignals,
275 cb_queue: &mut CallbackQueue,
276 ) {
277let states_changed = self.state ^ old_state;
278279// If something changed, notify our listeners.
280if !states_changed.is_empty() || !signals.is_empty() {
281self.event_source
282 .notify_listeners(self.state, states_changed, signals, cb_queue);
283 }
284 }
285286fn refresh_listener(&mut self, weak_self: Weak<AtomicRefCell<Epoll>>, key: Key) {
287let Some(entry) = self.monitoring.get_mut(&key) else {
288return;
289 };
290291// Check what state and what signals we need to listen for this entry.
292 // We always listen for closed so we know when to stop monitoring the entry.
293let listen_state = entry.get_listener_state().union(FileState::CLOSED);
294let listen_signals = entry.get_listener_signals();
295let filter = StateListenerFilter::Always;
296297// Set up a callback so we get informed when the file changes.
298let file = key.file().clone();
299let handle = file.borrow_mut().add_listener(
300 listen_state,
301 listen_signals,
302 filter,
303move |state, changed, signals, cb_queue| {
304if let Some(epoll) = weak_self.upgrade() {
305 epoll
306 .borrow_mut()
307 .notify_entry(&key, state, changed, signals, cb_queue);
308 }
309 },
310 );
311 entry.set_listener_handle(Some(handle));
312 }
313314/// The file listener callback for when a monitored entry file status changes.
315fn notify_entry(
316&mut self,
317 key: &Key,
318 state: FileState,
319 changed: FileState,
320 signals: FileSignals,
321 cb_queue: &mut CallbackQueue,
322 ) {
323// Notify entry of file state change if we're still monitoring it.
324match self.monitoring.get_mut(&key.clone()) {
325Some(entry) => entry.notify(state, changed, signals),
326None => return,
327 };
328329// Update our ready set, which removes the key if the file closed.
330self.refresh_ready(key.clone());
331332// Also stop monitoring if the file was closed.
333if state.contains(FileState::CLOSED) {
334self.monitoring.remove(key);
335 }
336337// Update the readability of the epoll descriptor.
338self.refresh_state(cb_queue);
339 }
340341/// Ensures that the entry is in the ready set if it should be, or not if it shouldn't be.
342fn refresh_ready(&mut self, key: Key) {
343let Some(entry) = self.monitoring.get_mut(&key.clone()) else {
344return;
345 };
346347// The entry will not be ready if the file closed.
348if entry.has_ready_events() {
349if entry.priority().is_none() {
350// It's ready but not in the ready set yet.
351let pri = self.pri_counter;
352self.pri_counter -= 1;
353self.ready.push(PriorityKey::new(pri, key));
354 entry.set_priority(Some(pri));
355 }
356 } else if let Some(pri) = entry.priority() {
357// It's not ready anymore but it's in the ready set, so remove it.
358self.ready.retain(|e| e.priority() != pri);
359 entry.set_priority(None);
360 }
361 }
362363pub fn has_ready_events(&self) -> bool {
364 !self.ready.is_empty()
365 }
366367pub fn collect_ready_events(
368&mut self,
369 cb_queue: &mut CallbackQueue,
370 max_events: u32,
371 ) -> Vec<(EpollEvents, u64)> {
372let mut events = vec![];
373let mut keep = vec![];
374375while !self.ready.is_empty() && events.len() < max_events as usize {
376// Get the next ready entry.
377let pri_key = self.ready.pop().unwrap();
378let key = Key::from(pri_key);
379let entry = self.monitoring.get_mut(&key).unwrap();
380381// Just removed from the ready set, keep the priority consistent.
382entry.set_priority(None);
383384// It was ready so it should have events.
385debug_assert!(entry.has_ready_events());
386387// Store the events we should report to the managed process.
388events.push(entry.collect_ready_events().unwrap());
389390// It might still be ready even after we report.
391if entry.has_ready_events() {
392// It's ready again. Assign a new priority to ensure fairness with other entries.
393let pri = self.pri_counter;
394self.pri_counter -= 1;
395let pri_key = PriorityKey::new(pri, key);
396397// Use temp vec so we don't report the same entry twice in the same round.
398keep.push(pri_key);
399400// The entry will be in the ready set, keep its priority consistent.
401entry.set_priority(Some(pri));
402 }
403 }
404405// Add everything that is still ready back to the ready set.
406self.ready.extend(keep);
407408// We've mutated the ready list; we may need to trigger callbacks.
409self.refresh_state(cb_queue);
410411// The events to be returned to the managed process.
412events
413 }
414}