shadow_rs/core/
worker.rs

1use std::cell::{Cell, RefCell};
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicBool, AtomicU32};
4use std::sync::{Arc, Mutex};
5
6use atomic_refcell::{AtomicRef, AtomicRefCell};
7use linux_api::posix_types::Pid;
8use once_cell::sync::Lazy;
9use rand::Rng;
10use shadow_shim_helper_rs::HostId;
11use shadow_shim_helper_rs::emulated_time::EmulatedTime;
12use shadow_shim_helper_rs::rootedcell::rc::RootedRc;
13use shadow_shim_helper_rs::rootedcell::refcell::RootedRefCell;
14use shadow_shim_helper_rs::simulation_time::SimulationTime;
15
16use super::work::event_queue::EventQueue;
17use crate::core::controller::ShadowStatusBarState;
18use crate::core::runahead::Runahead;
19use crate::core::sim_config::Bandwidth;
20use crate::core::sim_stats::{LocalSimStats, SharedSimStats};
21use crate::core::work::event::Event;
22use crate::host::host::Host;
23use crate::host::process::{Process, ProcessId};
24use crate::host::thread::{Thread, ThreadId};
25use crate::network::dns::Dns;
26use crate::network::graph::{IpAssignment, RoutingInfo};
27use crate::network::packet::{PacketRc, PacketStatus};
28use crate::utility::childpid_watcher::ChildPidWatcher;
29use crate::utility::counter::Counter;
30use crate::utility::status_bar;
31
32static USE_OBJECT_COUNTERS: AtomicBool = AtomicBool::new(false);
33
34// global counters to be used when there is no worker active
35static SIM_STATS: Lazy<SharedSimStats> = Lazy::new(SharedSimStats::new);
36
37// thread-local global state
38std::thread_local! {
39    // Initialized when the worker thread starts running. No shared ownership
40    // or access from outside of the current thread.
41    static WORKER: once_cell::unsync::OnceCell<RefCell<Worker>> = const { once_cell::unsync::OnceCell::new() };
42}
43
44// shared global state
45// Must not mutably borrow when the simulation is running.
46pub static WORKER_SHARED: AtomicRefCell<Option<WorkerShared>> = AtomicRefCell::new(None);
47
48#[derive(Copy, Clone, Debug)]
49pub struct WorkerThreadID(pub u32);
50
51struct Clock {
52    now: Option<EmulatedTime>,
53    barrier: Option<EmulatedTime>,
54}
55
56/// Worker context, containing 'global' information for the current thread.
57pub struct Worker {
58    worker_id: WorkerThreadID,
59
60    // A shared reference to the state in `WORKER_SHARED`.
61    shared: AtomicRef<'static, WorkerShared>,
62
63    // These store some information about the current Host, Process, and Thread,
64    // when applicable. These are used to make this information available to
65    // code that might not have access to the objects themselves, such as the
66    // ShadowLogger.
67    active_host: RefCell<Option<Box<Host>>>,
68    active_process: RefCell<Option<RootedRc<RootedRefCell<Process>>>>,
69    active_thread: RefCell<Option<RootedRc<RootedRefCell<Thread>>>>,
70
71    clock: RefCell<Clock>,
72
73    // This value is not the minimum latency of the simulation, but just a saved copy of this
74    // worker's minimum latency.
75    min_latency_cache: Cell<Option<SimulationTime>>,
76
77    // Statistics about the simulation, such as syscall counts.
78    sim_stats: LocalSimStats,
79
80    next_event_time: Cell<Option<EmulatedTime>>,
81}
82
83impl Worker {
84    // Create worker for this thread.
85    pub fn new_for_this_thread(worker_id: WorkerThreadID) {
86        WORKER.with(|worker| {
87            let res = worker.set(RefCell::new(Self {
88                worker_id,
89                shared: AtomicRef::map(WORKER_SHARED.borrow(), |x| x.as_ref().unwrap()),
90                active_host: RefCell::new(None),
91                active_process: RefCell::new(None),
92                active_thread: RefCell::new(None),
93                clock: RefCell::new(Clock {
94                    now: None,
95                    barrier: None,
96                }),
97                min_latency_cache: Cell::new(None),
98                sim_stats: LocalSimStats::new(),
99                next_event_time: Cell::new(None),
100            }));
101            assert!(res.is_ok(), "Worker already initialized");
102        });
103    }
104
105    /// Run `f` with a reference to the current Host, or return None if there is no current Host.
106    #[must_use]
107    pub fn with_active_host<F, R>(f: F) -> Option<R>
108    where
109        F: FnOnce(&Host) -> R,
110    {
111        Worker::with(|w| {
112            let h = &*w.active_host.borrow();
113            h.as_ref().map(|h| f(h))
114        })
115        .flatten()
116    }
117
118    /// Run `f` with a reference to the current
119    /// `RootedRc<RootedRefCell<Process>>`, or return `None` if there isn't one.
120    ///
121    /// Prefer to pass Process explicitly where feasible. e.g. see `ProcessContext`.
122    #[must_use]
123    pub fn with_active_process_rc<F, R>(f: F) -> Option<R>
124    where
125        F: FnOnce(&RootedRc<RootedRefCell<Process>>) -> R,
126    {
127        Worker::with(|w| w.active_process.borrow().as_ref().map(f)).flatten()
128    }
129
130    /// Run `f` with a reference to the current `Process`, or return `None` if there isn't one.
131    ///
132    /// Prefer to pass Process explicitly where feasible. e.g. see `ProcessContext`.
133    #[must_use]
134    pub fn with_active_process<F, R>(f: F) -> Option<R>
135    where
136        F: FnOnce(&Process) -> R,
137    {
138        Worker::with(|w| {
139            let host = w.active_host.borrow();
140            let process = w.active_process.borrow();
141            match (host.as_ref(), process.as_ref()) {
142                (Some(host), Some(process)) => {
143                    let process = process.borrow(host.root());
144                    Some(f(&process))
145                }
146                _ => None,
147            }
148        })
149        .flatten()
150    }
151
152    /// Run `f` with a reference to the current `Thread`, or return `None` if there isn't one.
153    ///
154    /// Prefer to pass Thread explicitly where feasible. e.g. see `ThreadContext`.
155    #[must_use]
156    pub fn with_active_thread<F, R>(f: F) -> Option<R>
157    where
158        F: FnOnce(&Thread) -> R,
159    {
160        Worker::with(|w| {
161            let host = w.active_host.borrow();
162            let host = host.as_ref()?;
163            let thread = w.active_thread.borrow();
164            let thread = thread.as_ref()?;
165            let thread = thread.borrow(host.root());
166            Some(f(&thread))
167        })
168        .flatten()
169    }
170
171    /// Run `f` with a reference to the global DNS.
172    ///
173    /// Panics if the Worker or its DNS hasn't yet been initialized.
174    fn with_dns<F, R>(f: F) -> R
175    where
176        F: FnOnce(&Dns) -> R,
177    {
178        Worker::with(|w| f(w.shared.dns())).unwrap()
179    }
180
181    /// Set the currently-active Host.
182    pub fn set_active_host(host: Box<Host>) {
183        let old = Worker::with(|w| w.active_host.borrow_mut().replace(host)).unwrap();
184        debug_assert!(old.is_none());
185    }
186
187    /// Clear the currently-active Host.
188    pub fn take_active_host() -> Box<Host> {
189        Worker::with(|w| w.active_host.borrow_mut().take())
190            .unwrap()
191            .unwrap()
192    }
193
194    /// Set the currently-active Process.
195    pub fn set_active_process(process: &RootedRc<RootedRefCell<Process>>) {
196        Worker::with(|w| {
197            let process = process.clone(w.active_host.borrow().as_ref().unwrap().root());
198            let old = w.active_process.borrow_mut().replace(process);
199            debug_assert!(old.is_none());
200        })
201        .unwrap();
202    }
203
204    /// Clear the currently-active Process.
205    pub fn clear_active_process() {
206        Worker::with(|w| {
207            let old = w.active_process.borrow_mut().take().unwrap();
208            let host = w.active_host.borrow();
209            let host = host.as_ref().unwrap();
210            old.explicit_drop_recursive(host.root(), host);
211        })
212        .unwrap();
213    }
214
215    /// Set the currently-active Thread.
216    pub fn set_active_thread(thread: &RootedRc<RootedRefCell<Thread>>) {
217        Worker::with(|w| {
218            let thread = thread.clone(w.active_host.borrow().as_ref().unwrap().root());
219            let old = w.active_thread.borrow_mut().replace(thread);
220            debug_assert!(old.is_none());
221        })
222        .unwrap();
223    }
224
225    /// Clear the currently-active Thread.
226    pub fn clear_active_thread() {
227        Worker::with(|w| {
228            let host = w.active_host.borrow();
229            let host = host.as_ref().unwrap();
230            let old = w.active_thread.borrow_mut().take().unwrap();
231            old.explicit_drop_recursive(host.root(), host);
232        })
233        .unwrap()
234    }
235
236    /// Whether currently running on a live Worker.
237    pub fn is_alive() -> bool {
238        Worker::with(|_| ()).is_some()
239    }
240
241    /// ID of this thread's Worker, if any.
242    pub fn worker_id() -> Option<WorkerThreadID> {
243        Worker::with(|w| w.worker_id)
244    }
245
246    pub fn active_process_native_pid() -> Option<Pid> {
247        Worker::with_active_process(|p| p.native_pid())
248    }
249
250    pub fn active_process_id() -> Option<ProcessId> {
251        Worker::with_active_process(|p| p.id())
252    }
253
254    pub fn active_thread_id() -> Option<ThreadId> {
255        Worker::with_active_thread(|thread| thread.id())
256    }
257
258    pub fn active_thread_native_tid() -> Option<Pid> {
259        Worker::with_active_thread(|thread| thread.native_tid())
260    }
261
262    pub fn set_round_end_time(t: EmulatedTime) {
263        Worker::with(|w| w.clock.borrow_mut().barrier.replace(t)).unwrap();
264    }
265
266    fn round_end_time() -> Option<EmulatedTime> {
267        Worker::with(|w| w.clock.borrow().barrier).flatten()
268    }
269
270    /// Maximum time that the current event may run ahead to.
271    pub fn max_event_runahead_time(host: &Host) -> EmulatedTime {
272        let mut max = Worker::round_end_time().unwrap();
273        if let Some(next_event_time) = host.next_event_time() {
274            max = std::cmp::min(max, next_event_time);
275        }
276        max
277    }
278
279    pub fn set_current_time(t: EmulatedTime) {
280        Worker::with(|w| w.clock.borrow_mut().now.replace(t)).unwrap();
281    }
282
283    pub fn clear_current_time() {
284        Worker::with(|w| w.clock.borrow_mut().now.take()).unwrap();
285    }
286
287    pub fn current_time() -> Option<EmulatedTime> {
288        Worker::with(|w| w.clock.borrow().now).flatten()
289    }
290
291    pub fn update_lowest_used_latency(t: SimulationTime) {
292        assert!(t != SimulationTime::ZERO);
293
294        Worker::with(|w| {
295            let min_latency_cache = w.min_latency_cache.get();
296            if min_latency_cache.is_none() || t < min_latency_cache.unwrap() {
297                w.min_latency_cache.set(Some(t));
298                w.shared.update_lowest_used_latency(t);
299            }
300        })
301        .unwrap();
302    }
303
304    pub fn reset_next_event_time() {
305        Worker::with(|w| w.next_event_time.set(None)).unwrap();
306    }
307
308    pub fn get_next_event_time() -> Option<EmulatedTime> {
309        Worker::with(|w| w.next_event_time.get()).unwrap()
310    }
311
312    pub fn update_next_event_time(t: EmulatedTime) {
313        Worker::with(|w| {
314            let next_event_time = w.next_event_time.get();
315            if next_event_time.is_none() || t < next_event_time.unwrap() {
316                w.next_event_time.set(Some(t));
317            }
318        })
319        .unwrap();
320    }
321
322    /// The packet will be dropped if the packet's destination IP is not part of the simulation (no
323    /// host has been configured for the IP).
324    pub fn send_packet(src_host: &Host, packetrc: PacketRc) {
325        let current_time = Worker::current_time().unwrap();
326        let round_end_time = Worker::round_end_time().unwrap();
327
328        let is_completed = current_time >= Worker::with(|w| w.shared.sim_end_time).unwrap();
329        let is_bootstrapping =
330            current_time < Worker::with(|w| w.shared.bootstrap_end_time).unwrap();
331
332        if is_completed {
333            // the simulation is over, don't bother
334            return;
335        }
336
337        let src_ip = *packetrc.src_ipv4_address().ip();
338        let dst_ip = *packetrc.dst_ipv4_address().ip();
339        let payload_size = packetrc.payload_len();
340
341        let Some(dst_host_id) = Worker::resolve_ip_to_host_id(dst_ip) else {
342            log_once_per_value_at_level!(
343                dst_ip,
344                std::net::Ipv4Addr,
345                log::Level::Warn,
346                log::Level::Debug,
347                "Packet has destination {dst_ip} which doesn't exist in the simulation. Dropping the packet.",
348            );
349            packetrc.add_status(PacketStatus::InetDropped);
350            return;
351        };
352
353        let src_ip = std::net::IpAddr::V4(src_ip);
354        let dst_ip = std::net::IpAddr::V4(dst_ip);
355
356        // check if network reliability forces us to 'drop' the packet
357        let reliability: f64 = Worker::with(|w| w.shared.reliability(src_ip, dst_ip).unwrap())
358            .unwrap()
359            .into();
360        let chance: f64 = src_host.random_mut().random();
361
362        // don't drop control packets with length 0, otherwise congestion control has problems
363        // responding to packet loss
364        // https://github.com/shadow/shadow/issues/2517
365        if !is_bootstrapping && chance >= reliability && payload_size > 0 {
366            packetrc.add_status(PacketStatus::InetDropped);
367            return;
368        }
369
370        let delay = Worker::with(|w| w.shared.latency(src_ip, dst_ip).unwrap()).unwrap();
371
372        Worker::update_lowest_used_latency(delay);
373        Worker::with(|w| w.shared.increment_packet_count(src_ip, dst_ip)).unwrap();
374
375        // TODO: this should change for sending to remote manager (on a different machine); this is
376        // the only place where tasks are sent between separate host
377
378        packetrc.add_status(PacketStatus::InetSent);
379
380        // delay the packet until the next round
381        let mut deliver_time = current_time + delay;
382        if deliver_time < round_end_time {
383            deliver_time = round_end_time;
384        }
385
386        // we may have sent this packet after the destination host finished running the current
387        // round and calculated its min event time, so we put this in our min event time instead
388        Worker::update_next_event_time(deliver_time);
389
390        // copy the packet (except the payload) so the dst gets its own header info
391        let dst_packet = packetrc.new_copy_inner();
392        Worker::with(|w| {
393            w.shared
394                .push_packet_to_host(dst_packet, dst_host_id, deliver_time, src_host)
395        })
396        .unwrap();
397    }
398
399    // Runs `f` with a shared reference to the current thread's Worker. Returns
400    // None if this thread has no Worker object.
401    #[must_use]
402    fn with<F, O>(f: F) -> Option<O>
403    where
404        F: FnOnce(&Worker) -> O,
405    {
406        WORKER
407            .try_with(|w| w.get().map(|w| f(&w.borrow())))
408            .ok()
409            .flatten()
410    }
411
412    pub fn increment_object_alloc_counter(s: &str) {
413        if !USE_OBJECT_COUNTERS.load(std::sync::atomic::Ordering::Relaxed) {
414            return;
415        }
416
417        Worker::with(|w| {
418            w.sim_stats.alloc_counts.borrow_mut().add_one(s);
419        })
420        .unwrap_or_else(|| {
421            // no live worker; fall back to the shared counter
422            SIM_STATS.alloc_counts.lock().unwrap().add_one(s);
423        });
424    }
425
426    pub fn increment_object_dealloc_counter(s: &str) {
427        if !USE_OBJECT_COUNTERS.load(std::sync::atomic::Ordering::Relaxed) {
428            return;
429        }
430
431        Worker::with(|w| {
432            w.sim_stats.dealloc_counts.borrow_mut().add_one(s);
433        })
434        .unwrap_or_else(|| {
435            // no live worker; fall back to the shared counter
436            SIM_STATS.dealloc_counts.lock().unwrap().add_one(s);
437        });
438    }
439
440    pub fn add_syscall_counts(syscall_counts: &Counter) {
441        Worker::with(|w| {
442            w.sim_stats
443                .syscall_counts
444                .borrow_mut()
445                .add_counter(syscall_counts);
446        })
447        .unwrap_or_else(|| {
448            // no live worker; fall back to the shared counter
449            SIM_STATS
450                .syscall_counts
451                .lock()
452                .unwrap()
453                .add_counter(syscall_counts);
454
455            // while we handle this okay, this probably indicates an issue somewhere else in the
456            // code so panic only in debug builds
457            debug_panic!("Trying to add syscall counts when there is no worker");
458        });
459    }
460
461    pub fn add_to_global_sim_stats() {
462        Worker::with(|w| SIM_STATS.add_from_local_stats(&w.sim_stats)).unwrap()
463    }
464
465    pub fn is_routable(src: std::net::IpAddr, dst: std::net::IpAddr) -> bool {
466        Worker::with(|w| w.shared.is_routable(src, dst)).unwrap()
467    }
468
469    pub fn increment_plugin_error_count() {
470        Worker::with(|w| w.shared.increment_plugin_error_count()).unwrap()
471    }
472
473    /// Shadow allows configuration of a "bootstrapping" interval, during which
474    /// hosts' network activity does not consume bandwidth. Returns `true` if we
475    /// are still within this preliminary interval, or `false` otherwise.
476    pub fn is_bootstrapping() -> bool {
477        Worker::with(|w| w.clock.borrow().now.unwrap() < w.shared.bootstrap_end_time).unwrap()
478    }
479
480    pub fn resolve_name_to_ip(name: &std::ffi::CStr) -> Option<std::net::Ipv4Addr> {
481        if let Ok(name) = name.to_str() {
482            Worker::with_dns(|dns| dns.name_to_addr(name))
483        } else {
484            None
485        }
486    }
487
488    fn resolve_ip_to_host_id(ip: std::net::Ipv4Addr) -> Option<HostId> {
489        Worker::with_dns(|dns| dns.addr_to_host_id(ip))
490    }
491}
492
493#[derive(Debug)]
494pub struct WorkerShared {
495    pub ip_assignment: IpAssignment<u32>,
496    pub routing_info: RoutingInfo<u32>,
497    pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
498    pub dns: Dns,
499    // allows for easy updating of the status bar's state
500    pub status_logger_state: Option<Arc<status_bar::Status<ShadowStatusBarState>>>,
501    // number of plugins that failed with a non-zero exit code
502    pub num_plugin_errors: AtomicU32,
503    // calculates the runahead for the next simulation round
504    pub runahead: Runahead,
505    pub child_pid_watcher: ChildPidWatcher,
506    /// Event queues for each host. This should only be used to push packet events.
507    pub event_queues: HashMap<HostId, Arc<Mutex<EventQueue>>>,
508    pub bootstrap_end_time: EmulatedTime,
509    pub sim_end_time: EmulatedTime,
510}
511
512impl WorkerShared {
513    pub fn dns(&self) -> &Dns {
514        &self.dns
515    }
516
517    pub fn latency(&self, src: std::net::IpAddr, dst: std::net::IpAddr) -> Option<SimulationTime> {
518        let src = self.ip_assignment.get_node(src)?;
519        let dst = self.ip_assignment.get_node(dst)?;
520
521        Some(SimulationTime::from_nanos(
522            self.routing_info.path(src, dst)?.latency_ns,
523        ))
524    }
525
526    pub fn reliability(&self, src: std::net::IpAddr, dst: std::net::IpAddr) -> Option<f32> {
527        let src = self.ip_assignment.get_node(src)?;
528        let dst = self.ip_assignment.get_node(dst)?;
529
530        Some(1.0 - self.routing_info.path(src, dst)?.packet_loss)
531    }
532
533    pub fn bandwidth(&self, ip: std::net::IpAddr) -> Option<&Bandwidth> {
534        self.host_bandwidths.get(&ip)
535    }
536
537    pub fn increment_packet_count(&self, src: std::net::IpAddr, dst: std::net::IpAddr) {
538        let src = self.ip_assignment.get_node(src).unwrap();
539        let dst = self.ip_assignment.get_node(dst).unwrap();
540
541        self.routing_info.increment_packet_count(src, dst)
542    }
543
544    pub fn is_routable(&self, src: std::net::IpAddr, dst: std::net::IpAddr) -> bool {
545        if self.ip_assignment.get_node(src).is_none() {
546            return false;
547        }
548
549        if self.ip_assignment.get_node(dst).is_none() {
550            return false;
551        }
552
553        // the network graph is required to be a connected graph, so they must be routable
554        true
555    }
556
557    pub fn increment_plugin_error_count(&self) {
558        let old_count = self
559            .num_plugin_errors
560            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
561
562        self.update_status_logger(|state| {
563            // there is a race condition here, so use the max
564            let new_value = old_count + 1;
565            state.num_failed_processes = std::cmp::max(state.num_failed_processes, new_value);
566        });
567    }
568
569    pub fn plugin_error_count(&self) -> u32 {
570        self.num_plugin_errors
571            .load(std::sync::atomic::Ordering::SeqCst)
572    }
573
574    /// Update the status logger. If the status logger is disabled, this will be a no-op.
575    pub fn update_status_logger(&self, f: impl FnOnce(&mut ShadowStatusBarState)) {
576        if let Some(ref logger_state) = self.status_logger_state {
577            logger_state.update(f);
578        }
579    }
580
581    pub fn get_runahead(&self) -> SimulationTime {
582        self.runahead.get()
583    }
584
585    /// Should only be called from the thread-local worker.
586    fn update_lowest_used_latency(&self, min_path_latency: SimulationTime) {
587        self.runahead.update_lowest_used_latency(min_path_latency);
588    }
589
590    /// Get the pid watcher.
591    pub fn child_pid_watcher(&self) -> &ChildPidWatcher {
592        &self.child_pid_watcher
593    }
594
595    /// Push a packet to the destination host's event queue. Does not check that the time is valid
596    /// (is outside of the current scheduling round, etc).
597    pub fn push_packet_to_host(
598        &self,
599        packet: PacketRc,
600        dst_host_id: HostId,
601        time: EmulatedTime,
602        src_host: &Host,
603    ) {
604        let event = Event::new_packet(packet, time, src_host);
605        let event_queue = self.event_queues.get(&dst_host_id).unwrap();
606        event_queue.lock().unwrap().push(event);
607    }
608}
609
610/// Enable object counters. Should be called near the beginning of the program.
611pub fn enable_object_counters() {
612    USE_OBJECT_COUNTERS.store(true, std::sync::atomic::Ordering::Relaxed);
613}
614
615pub fn with_global_sim_stats<T>(f: impl FnOnce(&SharedSimStats) -> T) -> T {
616    f(&SIM_STATS)
617}
618
619mod export {
620    use std::ffi::CString;
621    use std::os::unix::ffi::OsStrExt;
622
623    use shadow_shim_helper_rs::emulated_time::CEmulatedTime;
624    use shadow_shim_helper_rs::simulation_time::CSimulationTime;
625
626    use super::*;
627
628    /// # Safety
629    /// The returned string should be returned to rust to be deallocated by calling
630    /// `worker_freeHostsFilePath()`.
631    #[unsafe(no_mangle)]
632    pub extern "C-unwind" fn worker_getHostsFilePath() -> *const std::ffi::c_char {
633        let pathbuf = Worker::with_dns(|dns| dns.hosts_path());
634        let pathstr = CString::new(pathbuf.as_os_str().as_bytes()).unwrap();
635        // Move ownership to C.
636        pathstr.into_raw()
637    }
638
639    /// # Safety
640    /// The path should be a valid pointer to the string allocated by rust, such as
641    /// the string returned in `worker_getHostsFilePath()`.
642    #[unsafe(no_mangle)]
643    pub extern "C-unwind" fn worker_freeHostsFilePath(path: *const std::ffi::c_char) {
644        // Take the ownership back to rust and drop the owner
645        unsafe {
646            let _ = CString::from_raw(path as *mut _);
647        }
648    }
649
650    /// Addresses must be provided in network byte order.
651    #[unsafe(no_mangle)]
652    pub extern "C-unwind" fn worker_getLatency(
653        src: libc::in_addr_t,
654        dst: libc::in_addr_t,
655    ) -> CSimulationTime {
656        let src = std::net::IpAddr::V4(u32::from_be(src).into());
657        let dst = std::net::IpAddr::V4(u32::from_be(dst).into());
658
659        let latency = Worker::with(|w| w.shared.latency(src, dst)).unwrap();
660        SimulationTime::to_c_simtime(latency)
661    }
662
663    /// Addresses must be provided in network byte order.
664    #[unsafe(no_mangle)]
665    pub extern "C-unwind" fn worker_getBandwidthDownBytes(ip: libc::in_addr_t) -> u64 {
666        let ip = std::net::IpAddr::V4(u32::from_be(ip).into());
667        Worker::with(|w| w.shared.bandwidth(ip).unwrap().down_bytes).unwrap()
668    }
669
670    /// Addresses must be provided in network byte order.
671    #[unsafe(no_mangle)]
672    pub extern "C-unwind" fn worker_getBandwidthUpBytes(ip: libc::in_addr_t) -> u64 {
673        let ip = std::net::IpAddr::V4(u32::from_be(ip).into());
674        Worker::with(|w| w.shared.bandwidth(ip).unwrap().up_bytes).unwrap()
675    }
676
677    /// Addresses must be provided in network byte order.
678    #[unsafe(no_mangle)]
679    pub extern "C-unwind" fn worker_isRoutable(src: libc::in_addr_t, dst: libc::in_addr_t) -> bool {
680        let src = std::net::IpAddr::V4(u32::from_be(src).into());
681        let dst = std::net::IpAddr::V4(u32::from_be(dst).into());
682
683        Worker::is_routable(src, dst)
684    }
685
686    /// SAFETY: The returned pointer must not be accessed after this worker thread has exited.
687    #[unsafe(no_mangle)]
688    pub unsafe extern "C-unwind" fn worker_getChildPidWatcher() -> *const ChildPidWatcher {
689        Worker::with(|w| std::ptr::from_ref(w.shared.child_pid_watcher())).unwrap()
690    }
691
692    /// Implementation for counting allocated objects. Do not use this function directly.
693    /// Use worker_count_allocation instead from the call site.
694    #[unsafe(no_mangle)]
695    pub extern "C-unwind" fn worker_increment_object_alloc_counter(
696        object_name: *const libc::c_char,
697    ) {
698        assert!(!object_name.is_null());
699
700        let s = unsafe { std::ffi::CStr::from_ptr(object_name) };
701        let s = s.to_str().unwrap();
702        Worker::increment_object_alloc_counter(s);
703    }
704
705    /// Implementation for counting deallocated objects. Do not use this function directly.
706    /// Use worker_count_deallocation instead from the call site.
707    #[unsafe(no_mangle)]
708    pub extern "C-unwind" fn worker_increment_object_dealloc_counter(
709        object_name: *const libc::c_char,
710    ) {
711        assert!(!object_name.is_null());
712
713        let s = unsafe { std::ffi::CStr::from_ptr(object_name) };
714        let s = s.to_str().unwrap();
715        Worker::increment_object_dealloc_counter(s);
716    }
717
718    /// Aggregate the given syscall counts in a worker syscall counter.
719    #[unsafe(no_mangle)]
720    pub extern "C-unwind" fn worker_add_syscall_counts(syscall_counts: *const Counter) {
721        assert!(!syscall_counts.is_null());
722        let syscall_counts = unsafe { syscall_counts.as_ref() }.unwrap();
723
724        Worker::add_syscall_counts(syscall_counts);
725    }
726
727    #[unsafe(no_mangle)]
728    pub extern "C-unwind" fn worker_setCurrentEmulatedTime(t: CEmulatedTime) {
729        Worker::set_current_time(EmulatedTime::from_c_emutime(t).unwrap());
730    }
731
732    #[unsafe(no_mangle)]
733    pub extern "C-unwind" fn worker_getCurrentSimulationTime() -> CSimulationTime {
734        SimulationTime::to_c_simtime(Worker::current_time().map(|t| t.to_abs_simtime()))
735    }
736
737    #[unsafe(no_mangle)]
738    pub extern "C-unwind" fn worker_getCurrentEmulatedTime() -> CEmulatedTime {
739        EmulatedTime::to_c_emutime(Worker::current_time())
740    }
741
742    /// Returns a pointer to the current running host. The returned pointer is
743    /// invalidated the next time the worker switches hosts.
744    #[unsafe(no_mangle)]
745    pub extern "C-unwind" fn worker_getCurrentHost() -> *const Host {
746        Worker::with_active_host(std::ptr::from_ref).unwrap()
747    }
748
749    /// Returns a pointer to the current running process. The returned pointer is
750    /// invalidated the next time the worker switches processes.
751    #[unsafe(no_mangle)]
752    pub extern "C-unwind" fn worker_getCurrentProcess() -> *const Process {
753        // We can't use `with_active_process` here since that returns the &Process instead
754        // of the enclosing &Process.
755        Worker::with_active_process(std::ptr::from_ref).unwrap()
756    }
757
758    /// Returns a pointer to the current running thread. The returned pointer is
759    /// invalidated the next time the worker switches threads.
760    #[unsafe(no_mangle)]
761    pub extern "C-unwind" fn worker_getCurrentThread() -> *const Thread {
762        Worker::with_active_thread(std::ptr::from_ref).unwrap()
763    }
764
765    /// Maximum time that the current event may run ahead to. Must only be called if we hold the
766    /// host lock.
767    #[unsafe(no_mangle)]
768    pub extern "C-unwind" fn worker_maxEventRunaheadTime(host: *const Host) -> CEmulatedTime {
769        let host = unsafe { host.as_ref() }.unwrap();
770        EmulatedTime::to_c_emutime(Some(Worker::max_event_runahead_time(host)))
771    }
772}