shadow_rs/utility/
legacy_callback_queue.rs

1use std::cell::RefCell;
2use std::ops::DerefMut;
3
4use shadow_shim_helper_rs::rootedcell::refcell::RootedRefCell;
5
6use crate::host::descriptor::FileState;
7use crate::host::descriptor::listener::StateEventSource;
8use crate::utility::callback_queue::CallbackQueue;
9
10/// An event source stored by a `LegacyFile`.
11#[allow(non_camel_case_types)]
12pub type RootedRefCell_StateEventSource = RootedRefCell<StateEventSource>;
13
14thread_local! {
15    static C_CALLBACK_QUEUE: RefCell<Option<CallbackQueue>> = const { RefCell::new(None) };
16}
17
18/// Helper function to initialize and run a global thread-local callback queue.
19///
20/// This is a hack so that C [`LegacyFile`](crate::cshadow::LegacyFile)s can queue listener
21/// callbacks using `notify_listeners_with_global_cb_queue`. This is primarily for
22/// [`TCP`](crate::cshadow::TCP) objects, and should not be used with Rust file objects.
23///
24/// The closure should make any borrows of the file object, rather than making any borrows outside
25/// of the closure.
26pub fn with_global_cb_queue<T>(f: impl FnOnce() -> T) -> T {
27    C_CALLBACK_QUEUE.with(|cb_queue| {
28        if cb_queue.borrow().is_some() {
29            // we seem to be in a nested `with_global_cb_queue()` call, so just run the closure with
30            // the existing queue
31            return f();
32        }
33
34        // set the global queue
35        assert!(
36            cb_queue
37                .borrow_mut()
38                .replace(CallbackQueue::new())
39                .is_none()
40        );
41
42        let rv = f();
43
44        // run the queued callbacks
45        loop {
46            // take and replace the global queue since callbacks may try to add new callbacks to the
47            // global queue as we're running old callbacks
48            let mut queue_to_run = cb_queue.borrow_mut().replace(CallbackQueue::new()).unwrap();
49            if queue_to_run.is_empty() {
50                // no new callbacks were added, so we're done
51                break;
52            }
53            queue_to_run.run();
54        }
55
56        assert!(cb_queue.borrow_mut().take().is_some());
57
58        rv
59    })
60}
61mod export {
62    use super::*;
63
64    use std::net::Ipv4Addr;
65
66    use crate::core::worker;
67    use crate::host::descriptor::FileSignals;
68    use crate::host::descriptor::socket::inet::InetSocket;
69    use crate::host::host::Host;
70
71    /// Notify listeners using the global callback queue. If the queue hasn't been set using
72    /// [`with_global_cb_queue`], the listeners will be notified here before returning.
73    #[unsafe(no_mangle)]
74    pub unsafe extern "C-unwind" fn notify_listeners_with_global_cb_queue(
75        event_source: *const RootedRefCell_StateEventSource,
76        state: FileState,
77        changed: FileState,
78        signals: FileSignals,
79    ) {
80        let event_source = unsafe { event_source.as_ref() }.unwrap();
81
82        with_global_cb_queue(|| {
83            C_CALLBACK_QUEUE.with(|cb_queue| {
84                let mut cb_queue = cb_queue.borrow_mut();
85                // must not be `None` since it will be set to `Some` by `with_global_cb_queue`
86                let cb_queue = cb_queue.deref_mut().as_mut().unwrap();
87
88                worker::Worker::with_active_host(|host| {
89                    let mut event_source = event_source.borrow_mut(host.root());
90                    event_source.notify_listeners(state, changed, signals, cb_queue)
91                })
92                .unwrap();
93            });
94        });
95    }
96
97    /// Tell the host that the socket wants to send packets using the global callback queue. If the
98    /// queue hasn't been set using [`with_global_cb_queue`], the host will be notified here before
99    /// returning. Takes ownership of `inetSocket` (will free/drop).
100    #[unsafe(no_mangle)]
101    pub unsafe extern "C-unwind" fn socket_wants_to_send_with_global_cb_queue(
102        host: *const Host,
103        socket: *mut InetSocket,
104        ip: libc::in_addr_t,
105    ) {
106        let host = unsafe { host.as_ref() }.unwrap();
107        let ip = Ipv4Addr::from(u32::from_be(ip));
108
109        let host_id = host.id();
110
111        with_global_cb_queue(|| {
112            C_CALLBACK_QUEUE.with(|cb_queue| {
113                let mut cb_queue = cb_queue.borrow_mut();
114                // must not be `None` since it will be set to `Some` by `with_global_cb_queue`
115                let cb_queue = cb_queue.deref_mut().as_mut().unwrap();
116
117                cb_queue.add(move |_cb_queue| {
118                    worker::Worker::with_active_host(|host| {
119                        assert_eq!(host.id(), host_id);
120                        let socket = unsafe { Box::from_raw(socket) };
121                        host.notify_socket_has_packets(ip, &socket);
122                    })
123                    .unwrap();
124                });
125            });
126        });
127    }
128}