1use std::cell::RefCell;
2use std::ops::DerefMut;
34use shadow_shim_helper_rs::rootedcell::refcell::RootedRefCell;
56use crate::host::descriptor::FileState;
7use crate::host::descriptor::listener::StateEventSource;
8use crate::utility::callback_queue::CallbackQueue;
910/// An event source stored by a `LegacyFile`.
11#[allow(non_camel_case_types)]
12pub type RootedRefCell_StateEventSource = RootedRefCell<StateEventSource>;
1314thread_local! {
15static C_CALLBACK_QUEUE: RefCell<Option<CallbackQueue>> = const { RefCell::new(None) };
16}
1718/// Helper function to initialize and run a global thread-local callback queue.
19///
20/// This is a hack so that C [`LegacyFile`](crate::cshadow::LegacyFile)s can queue listener
21/// callbacks using `notify_listeners_with_global_cb_queue`. This is primarily for
22/// [`TCP`](crate::cshadow::TCP) objects, and should not be used with Rust file objects.
23///
24/// The closure should make any borrows of the file object, rather than making any borrows outside
25/// of the closure.
26pub fn with_global_cb_queue<T>(f: impl FnOnce() -> T) -> T {
27 C_CALLBACK_QUEUE.with(|cb_queue| {
28if cb_queue.borrow().is_some() {
29// we seem to be in a nested `with_global_cb_queue()` call, so just run the closure with
30 // the existing queue
31return f();
32 }
3334// set the global queue
35assert!(
36 cb_queue
37 .borrow_mut()
38 .replace(CallbackQueue::new())
39 .is_none()
40 );
4142let rv = f();
4344// run the queued callbacks
45loop {
46// take and replace the global queue since callbacks may try to add new callbacks to the
47 // global queue as we're running old callbacks
48let mut queue_to_run = cb_queue.borrow_mut().replace(CallbackQueue::new()).unwrap();
49if queue_to_run.is_empty() {
50// no new callbacks were added, so we're done
51break;
52 }
53 queue_to_run.run();
54 }
5556assert!(cb_queue.borrow_mut().take().is_some());
5758 rv
59 })
60}
61mod export {
62use super::*;
6364use std::net::Ipv4Addr;
6566use crate::core::worker;
67use crate::host::descriptor::FileSignals;
68use crate::host::descriptor::socket::inet::InetSocket;
69use crate::host::host::Host;
7071/// Notify listeners using the global callback queue. If the queue hasn't been set using
72 /// [`with_global_cb_queue`], the listeners will be notified here before returning.
73#[unsafe(no_mangle)]
74pub unsafe extern "C-unwind" fn notify_listeners_with_global_cb_queue(
75 event_source: *const RootedRefCell_StateEventSource,
76 state: FileState,
77 changed: FileState,
78 signals: FileSignals,
79 ) {
80let event_source = unsafe { event_source.as_ref() }.unwrap();
8182 with_global_cb_queue(|| {
83 C_CALLBACK_QUEUE.with(|cb_queue| {
84let mut cb_queue = cb_queue.borrow_mut();
85// must not be `None` since it will be set to `Some` by `with_global_cb_queue`
86let cb_queue = cb_queue.deref_mut().as_mut().unwrap();
8788 worker::Worker::with_active_host(|host| {
89let mut event_source = event_source.borrow_mut(host.root());
90 event_source.notify_listeners(state, changed, signals, cb_queue)
91 })
92 .unwrap();
93 });
94 });
95 }
9697/// Tell the host that the socket wants to send packets using the global callback queue. If the
98 /// queue hasn't been set using [`with_global_cb_queue`], the host will be notified here before
99 /// returning. Takes ownership of `inetSocket` (will free/drop).
100#[unsafe(no_mangle)]
101pub unsafe extern "C-unwind" fn socket_wants_to_send_with_global_cb_queue(
102 host: *const Host,
103 socket: *mut InetSocket,
104 ip: libc::in_addr_t,
105 ) {
106let host = unsafe { host.as_ref() }.unwrap();
107let ip = Ipv4Addr::from(u32::from_be(ip));
108109let host_id = host.id();
110111 with_global_cb_queue(|| {
112 C_CALLBACK_QUEUE.with(|cb_queue| {
113let mut cb_queue = cb_queue.borrow_mut();
114// must not be `None` since it will be set to `Some` by `with_global_cb_queue`
115let cb_queue = cb_queue.deref_mut().as_mut().unwrap();
116117 cb_queue.add(move |_cb_queue| {
118 worker::Worker::with_active_host(|host| {
119assert_eq!(host.id(), host_id);
120let socket = unsafe { Box::from_raw(socket) };
121 host.notify_socket_has_packets(ip, &socket);
122 })
123 .unwrap();
124 });
125 });
126 });
127 }
128}