shadow_rs/utility/legacy_callback_queue.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
use std::cell::RefCell;
use std::ops::DerefMut;
use shadow_shim_helper_rs::rootedcell::refcell::RootedRefCell;
use crate::host::descriptor::listener::StateEventSource;
use crate::host::descriptor::FileState;
use crate::utility::callback_queue::CallbackQueue;
/// An event source stored by a `LegacyFile`.
#[allow(non_camel_case_types)]
pub type RootedRefCell_StateEventSource = RootedRefCell<StateEventSource>;
thread_local! {
static C_CALLBACK_QUEUE: RefCell<Option<CallbackQueue>> = const { RefCell::new(None) };
}
/// Helper function to initialize and run a global thread-local callback queue.
///
/// This is a hack so that C [`LegacyFile`](crate::cshadow::LegacyFile)s can queue listener
/// callbacks using `notify_listeners_with_global_cb_queue`. This is primarily for
/// [`TCP`](crate::cshadow::TCP) objects, and should not be used with Rust file objects.
///
/// The closure should make any borrows of the file object, rather than making any borrows outside
/// of the closure.
pub fn with_global_cb_queue<T>(f: impl FnOnce() -> T) -> T {
C_CALLBACK_QUEUE.with(|cb_queue| {
if cb_queue.borrow().is_some() {
// we seem to be in a nested `with_global_cb_queue()` call, so just run the closure with
// the existing queue
return f();
}
// set the global queue
assert!(cb_queue
.borrow_mut()
.replace(CallbackQueue::new())
.is_none());
let rv = f();
// run the queued callbacks
loop {
// take and replace the global queue since callbacks may try to add new callbacks to the
// global queue as we're running old callbacks
let mut queue_to_run = cb_queue.borrow_mut().replace(CallbackQueue::new()).unwrap();
if queue_to_run.is_empty() {
// no new callbacks were added, so we're done
break;
}
queue_to_run.run();
}
assert!(cb_queue.borrow_mut().take().is_some());
rv
})
}
mod export {
use super::*;
use std::net::Ipv4Addr;
use crate::core::worker;
use crate::host::descriptor::socket::inet::InetSocket;
use crate::host::descriptor::FileSignals;
use crate::host::host::Host;
/// Notify listeners using the global callback queue. If the queue hasn't been set using
/// [`with_global_cb_queue`], the listeners will be notified here before returning.
#[no_mangle]
pub unsafe extern "C-unwind" fn notify_listeners_with_global_cb_queue(
event_source: *const RootedRefCell_StateEventSource,
state: FileState,
changed: FileState,
signals: FileSignals,
) {
let event_source = unsafe { event_source.as_ref() }.unwrap();
with_global_cb_queue(|| {
C_CALLBACK_QUEUE.with(|cb_queue| {
let mut cb_queue = cb_queue.borrow_mut();
// must not be `None` since it will be set to `Some` by `with_global_cb_queue`
let cb_queue = cb_queue.deref_mut().as_mut().unwrap();
worker::Worker::with_active_host(|host| {
let mut event_source = event_source.borrow_mut(host.root());
event_source.notify_listeners(state, changed, signals, cb_queue)
})
.unwrap();
});
});
}
/// Tell the host that the socket wants to send packets using the global callback queue. If the
/// queue hasn't been set using [`with_global_cb_queue`], the host will be notified here before
/// returning. Takes ownership of `inetSocket` (will free/drop).
#[no_mangle]
pub unsafe extern "C-unwind" fn socket_wants_to_send_with_global_cb_queue(
host: *const Host,
socket: *mut InetSocket,
ip: libc::in_addr_t,
) {
let host = unsafe { host.as_ref() }.unwrap();
let ip = Ipv4Addr::from(u32::from_be(ip));
let host_id = host.id();
with_global_cb_queue(|| {
C_CALLBACK_QUEUE.with(|cb_queue| {
let mut cb_queue = cb_queue.borrow_mut();
// must not be `None` since it will be set to `Some` by `with_global_cb_queue`
let cb_queue = cb_queue.deref_mut().as_mut().unwrap();
cb_queue.add(move |_cb_queue| {
worker::Worker::with_active_host(|host| {
assert_eq!(host.id(), host_id);
let socket = unsafe { Box::from_raw(socket) };
host.notify_socket_has_packets(ip, &socket);
})
.unwrap();
});
});
});
}
}