shadow_rs/host/descriptor/epoll/
mod.rs1use std::collections::hash_map::Entry as HashMapEntry;
2use std::collections::{BinaryHeap, HashMap};
3use std::sync::{Arc, Weak};
4
5use atomic_refcell::AtomicRefCell;
6use linux_api::epoll::{EpollCtlOp, EpollEvents};
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use shadow_shim_helper_rs::syscall_types::ForeignPtr;
10
11use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
12use crate::host::descriptor::{File, FileMode, FileSignals, FileState, FileStatus, SyscallResult};
13use crate::host::memory_manager::MemoryManager;
14use crate::host::syscall::io::IoVec;
15use crate::host::syscall::types::SyscallError;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::{HostTreePointer, ObjectCounter};
18
19use self::entry::Entry;
20use self::key::{Key, PriorityKey};
21
22use super::socket::Socket;
23use super::socket::inet::InetSocket;
24
25mod entry;
27mod key;
28
29pub struct Epoll {
30 event_source: StateEventSource,
31 status: FileStatus,
32 state: FileState,
33 has_open_file: bool,
36 pri_counter: u64,
40 monitoring: HashMap<Key, Entry>,
42 ready: BinaryHeap<PriorityKey>,
44 _counter: ObjectCounter,
45}
46
47impl Epoll {
48 pub fn new() -> Arc<AtomicRefCell<Self>> {
49 let mut epoll = Self {
50 event_source: StateEventSource::new(),
51 status: FileStatus::empty(),
52 state: FileState::ACTIVE,
53 has_open_file: false,
54 pri_counter: u64::MAX,
55 monitoring: HashMap::new(),
56 ready: BinaryHeap::new(),
57 _counter: ObjectCounter::new("Epoll"),
58 };
59
60 CallbackQueue::queue_and_run_with_legacy(|cb_queue| epoll.refresh_state(cb_queue));
61
62 Arc::new(AtomicRefCell::new(epoll))
63 }
64
65 pub fn status(&self) -> FileStatus {
66 self.status
67 }
68
69 pub fn set_status(&mut self, status: FileStatus) {
70 self.status = status;
71 }
72
73 pub fn mode(&self) -> FileMode {
74 FileMode::READ | FileMode::WRITE
75 }
76
77 pub fn has_open_file(&self) -> bool {
78 self.has_open_file
79 }
80
81 pub fn supports_sa_restart(&self) -> bool {
82 false
85 }
86
87 pub fn set_has_open_file(&mut self, val: bool) {
88 self.has_open_file = val;
89 }
90
91 pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
92 self.update_state(
93 FileState::all(),
94 FileState::CLOSED,
95 FileSignals::empty(),
96 cb_queue,
97 );
98 Ok(())
99 }
100
101 pub fn readv(
102 &mut self,
103 _iovs: &[IoVec],
104 _offset: Option<libc::off_t>,
105 _flags: libc::c_int,
106 _mem: &mut MemoryManager,
107 _cb_queue: &mut CallbackQueue,
108 ) -> Result<libc::ssize_t, SyscallError> {
109 Err(Errno::EINVAL.into())
111 }
112
113 pub fn writev(
114 &mut self,
115 _iovs: &[IoVec],
116 _offset: Option<libc::off_t>,
117 _flags: libc::c_int,
118 _mem: &mut MemoryManager,
119 _cb_queue: &mut CallbackQueue,
120 ) -> Result<libc::ssize_t, SyscallError> {
121 Err(Errno::EINVAL.into())
123 }
124
125 pub fn ioctl(
126 &mut self,
127 _request: IoctlRequest,
128 _arg_ptr: ForeignPtr<()>,
129 _mem: &mut MemoryManager,
130 ) -> SyscallResult {
131 warn_once_then_trace!("Epoll does not support any ioctl requests.");
134 Err(Errno::ENOTTY.into())
137 }
138
139 pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
140 warn_once_then_debug!("We do not yet handle stat calls on epoll fds");
141 Err(Errno::EINVAL.into())
142 }
143
144 pub fn ctl(
149 &mut self,
150 op: EpollCtlOp,
151 target_fd: i32,
152 target_file: File,
153 events: EpollEvents,
154 data: u64,
155 weak_self: Weak<AtomicRefCell<Epoll>>,
156 cb_queue: &mut CallbackQueue,
157 ) -> Result<(), Errno> {
158 let state = target_file.borrow().state();
159 let key = Key::new(target_fd, target_file);
160
161 log::trace!("Epoll editing fd {target_fd} while in state {state:?}");
162
163 match op {
164 EpollCtlOp::EPOLL_CTL_ADD => {
165 if state.contains(FileState::CLOSED) {
172 log::warn!("Attempted to add a closed file {target_fd} to epoll");
173 return Err(Errno::EBADF);
174 }
175
176 let mut entry = Entry::new(events, data, state);
177
178 if matches!(
180 key.file(),
181 File::Socket(Socket::Inet(InetSocket::LegacyTcp(_)))
182 ) {
183 entry.set_legacy();
184 }
185
186 match self.monitoring.entry(key.clone()) {
189 HashMapEntry::Occupied(_) => return Err(Errno::EEXIST),
190 HashMapEntry::Vacant(x) => x.insert(entry),
191 };
192 }
193 EpollCtlOp::EPOLL_CTL_MOD => {
194 let entry = self.monitoring.get_mut(&key).ok_or(Errno::ENOENT)?;
195 entry.modify(events, data, state);
196 }
197 EpollCtlOp::EPOLL_CTL_DEL => {
198 let entry = self.monitoring.remove(&key).ok_or(Errno::ENOENT)?;
201
202 if let Some(pri) = entry.priority() {
204 self.ready.retain(|e| e.priority() != pri)
205 }
206 }
207 };
208
209 self.refresh_ready(key.clone());
210 self.refresh_listener(weak_self, key);
211 self.refresh_state(cb_queue);
212
213 Ok(())
214 }
215
216 pub fn add_listener(
217 &mut self,
218 monitoring_state: FileState,
219 monitoring_signals: FileSignals,
220 filter: StateListenerFilter,
221 notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
222 + Send
223 + Sync
224 + 'static,
225 ) -> StateListenHandle {
226 self.event_source
227 .add_listener(monitoring_state, monitoring_signals, filter, notify_fn)
228 }
229
230 pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<crate::cshadow::StatusListener>) {
231 self.event_source.add_legacy_listener(ptr);
232 }
233
234 pub fn remove_legacy_listener(&mut self, ptr: *mut crate::cshadow::StatusListener) {
235 self.event_source.remove_legacy_listener(ptr);
236 }
237
238 pub fn state(&self) -> FileState {
239 self.state
240 }
241
242 fn refresh_state(&mut self, cb_queue: &mut CallbackQueue) {
243 let readable = if self.has_ready_events() {
244 FileState::READABLE
245 } else {
246 Default::default()
247 };
248 self.update_state(
249 FileState::READABLE,
250 readable,
251 FileSignals::empty(),
252 cb_queue,
253 );
254 }
255
256 fn update_state(
257 &mut self,
258 mask: FileState,
259 state: FileState,
260 signals: FileSignals,
261 cb_queue: &mut CallbackQueue,
262 ) {
263 let old_state = self.state;
264
265 self.state.remove(mask);
267 self.state.insert(state & mask);
268
269 self.handle_state_change(old_state, signals, cb_queue);
270 }
271
272 fn handle_state_change(
273 &mut self,
274 old_state: FileState,
275 signals: FileSignals,
276 cb_queue: &mut CallbackQueue,
277 ) {
278 let states_changed = self.state ^ old_state;
279
280 if !states_changed.is_empty() || !signals.is_empty() {
282 self.event_source
283 .notify_listeners(self.state, states_changed, signals, cb_queue);
284 }
285 }
286
287 fn refresh_listener(&mut self, weak_self: Weak<AtomicRefCell<Epoll>>, key: Key) {
288 let Some(entry) = self.monitoring.get_mut(&key) else {
289 return;
290 };
291
292 let listen_state = entry.get_listener_state().union(FileState::CLOSED);
295 let listen_signals = entry.get_listener_signals();
296 let filter = StateListenerFilter::Always;
297
298 let file = key.file().clone();
300 let handle = file.borrow_mut().add_listener(
301 listen_state,
302 listen_signals,
303 filter,
304 move |state, changed, signals, cb_queue| {
305 if let Some(epoll) = weak_self.upgrade() {
306 epoll
307 .borrow_mut()
308 .notify_entry(&key, state, changed, signals, cb_queue);
309 }
310 },
311 );
312 entry.set_listener_handle(Some(handle));
313 }
314
315 fn notify_entry(
317 &mut self,
318 key: &Key,
319 state: FileState,
320 changed: FileState,
321 signals: FileSignals,
322 cb_queue: &mut CallbackQueue,
323 ) {
324 match self.monitoring.get_mut(&key.clone()) {
326 Some(entry) => entry.notify(state, changed, signals),
327 None => return,
328 };
329
330 self.refresh_ready(key.clone());
332
333 if state.contains(FileState::CLOSED) {
335 self.monitoring.remove(key);
336 }
337
338 self.refresh_state(cb_queue);
340 }
341
342 fn refresh_ready(&mut self, key: Key) {
344 let Some(entry) = self.monitoring.get_mut(&key.clone()) else {
345 return;
346 };
347
348 if entry.has_ready_events() {
350 if entry.priority().is_none() {
351 let pri = self.pri_counter;
353 self.pri_counter -= 1;
354 self.ready.push(PriorityKey::new(pri, key));
355 entry.set_priority(Some(pri));
356 }
357 } else if let Some(pri) = entry.priority() {
358 self.ready.retain(|e| e.priority() != pri);
360 entry.set_priority(None);
361 }
362 }
363
364 pub fn has_ready_events(&self) -> bool {
365 !self.ready.is_empty()
366 }
367
368 pub fn collect_ready_events(
369 &mut self,
370 cb_queue: &mut CallbackQueue,
371 max_events: u32,
372 ) -> Vec<(EpollEvents, u64)> {
373 let mut events = vec![];
374 let mut keep = vec![];
375
376 while !self.ready.is_empty() && events.len() < max_events as usize {
377 let pri_key = self.ready.pop().unwrap();
379 let key = Key::from(pri_key);
380 let entry = self.monitoring.get_mut(&key).unwrap();
381
382 entry.set_priority(None);
384
385 debug_assert!(entry.has_ready_events());
387
388 events.push(entry.collect_ready_events().unwrap());
390
391 if entry.has_ready_events() {
393 let pri = self.pri_counter;
395 self.pri_counter -= 1;
396 let pri_key = PriorityKey::new(pri, key);
397
398 keep.push(pri_key);
400
401 entry.set_priority(Some(pri));
403 }
404 }
405
406 self.ready.extend(keep);
408
409 self.refresh_state(cb_queue);
411
412 events
414 }
415}