Skip to main content

shadow_rs/core/
manager.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use linux_api::prctl::ArchPrctlOp;
13use log::{debug, warn};
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16use scheduler::thread_per_core::ThreadPerCoreSched;
17use scheduler::thread_per_host::ThreadPerHostSched;
18use scheduler::{HostIter, Scheduler};
19use shadow_shim_helper_rs::HostId;
20use shadow_shim_helper_rs::emulated_time::EmulatedTime;
21use shadow_shim_helper_rs::option::FfiOption;
22use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
23use shadow_shim_helper_rs::simulation_time::SimulationTime;
24use shadow_shmem::allocator::ShMemBlock;
25
26use crate::core::configuration::{self, ConfigOptions, Flatten};
27use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
28use crate::core::cpu;
29use crate::core::resource_usage;
30use crate::core::runahead::Runahead;
31use crate::core::sim_config::{Bandwidth, HostInfo};
32use crate::core::sim_stats;
33use crate::core::worker;
34use crate::cshadow as c;
35use crate::host::host::{Host, HostParameters};
36use crate::network::dns::DnsBuilder;
37use crate::network::graph::{IpAssignment, RoutingInfo};
38use crate::utility;
39use crate::utility::childpid_watcher::ChildPidWatcher;
40use crate::utility::status_bar::Status;
41
42pub struct Manager<'a> {
43    manager_config: Option<ManagerConfig>,
44    controller: &'a Controller<'a>,
45    config: &'a ConfigOptions,
46
47    raw_frequency: u64,
48    native_tsc_frequency: u64,
49    end_time: EmulatedTime,
50
51    data_path: PathBuf,
52    hosts_path: PathBuf,
53
54    preload_paths: Arc<Vec<PathBuf>>,
55
56    check_fd_usage: bool,
57    check_mem_usage: bool,
58
59    meminfo_file: std::fs::File,
60    shmem: ShMemBlock<'static, ManagerShmem>,
61}
62
63impl<'a> Manager<'a> {
64    pub fn new(
65        manager_config: ManagerConfig,
66        controller: &'a Controller<'a>,
67        config: &'a ConfigOptions,
68        end_time: EmulatedTime,
69    ) -> anyhow::Result<Self> {
70        // get the system's CPU frequency
71        let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
72            let default_freq = 2_500_000_000; // 2.5 GHz
73            log::debug!("Failed to get raw CPU frequency, using {default_freq} Hz instead: {e}");
74            default_freq
75        });
76
77        let native_tsc_frequency = if let Some(f) = asm_util::tsc::Tsc::native_cycles_per_second() {
78            f
79        } else {
80            warn!(
81                "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
82            );
83            raw_frequency
84        };
85
86        let mut preload_paths = Vec::new();
87
88        // we always preload the injector lib to ensure that the shim is loaded into the managed
89        // processes
90        const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
91        preload_paths.push(
92            get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
93                format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
94            })?,
95        );
96
97        // preload libc lib if option is enabled
98        const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
99        if config.experimental.use_preload_libc.unwrap() {
100            let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
101                format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
102            })?;
103            preload_paths.push(path);
104        } else {
105            log::info!("Preloading the libc library is disabled");
106        };
107
108        // preload openssl rng lib if option is enabled
109        const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
110        if config.experimental.use_preload_openssl_rng.unwrap() {
111            let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
112                format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
113            })?;
114            preload_paths.push(path);
115        } else {
116            log::info!("Preloading the openssl rng library is disabled");
117        };
118
119        // preload openssl crypto lib if option is enabled
120        const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
121        if config.experimental.use_preload_openssl_crypto.unwrap() {
122            let path =
123                get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
124                    format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
125                })?;
126            preload_paths.push(path);
127        } else {
128            log::info!("Preloading the openssl crypto library is disabled");
129        };
130
131        // use the working dir to generate absolute paths
132        let cwd = std::env::current_dir()?;
133        let template_path = config
134            .general
135            .template_directory
136            .flatten_ref()
137            .map(|x| cwd.clone().join(x));
138        let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
139        let hosts_path = data_path.join("hosts");
140
141        if let Some(template_path) = template_path {
142            log::debug!(
143                "Copying template directory '{}' to '{}'",
144                template_path.display(),
145                data_path.display()
146            );
147
148            // copy the template directory to the data directory path
149            utility::copy_dir_all(&template_path, &data_path).with_context(|| {
150                format!(
151                    "Failed to copy template directory '{}' to '{}'",
152                    template_path.display(),
153                    data_path.display()
154                )
155            })?;
156
157            // create the hosts directory if it doesn't exist
158            let result = std::fs::create_dir(&hosts_path);
159            if let Err(e) = result
160                && e.kind() != std::io::ErrorKind::AlreadyExists
161            {
162                return Err(e).context(format!(
163                    "Failed to create hosts directory '{}'",
164                    hosts_path.display()
165                ));
166            }
167        } else {
168            // create the data and hosts directories
169            std::fs::create_dir(&data_path).with_context(|| {
170                format!("Failed to create data directory '{}'", data_path.display())
171            })?;
172            std::fs::create_dir(&hosts_path).with_context(|| {
173                format!(
174                    "Failed to create hosts directory '{}'",
175                    hosts_path.display(),
176                )
177            })?;
178        }
179
180        // save the processed config as yaml
181        let config_out_filename = data_path.join("processed-config.yaml");
182        let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
183            format!("Failed to create file '{}'", config_out_filename.display())
184        })?;
185
186        serde_yaml::to_writer(config_out_file, &config).with_context(|| {
187            format!(
188                "Failed to write processed config yaml to file '{}'",
189                config_out_filename.display()
190            )
191        })?;
192
193        let meminfo_file =
194            std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
195
196        // Determind whether we can and should emulate cpuid in the shim.
197        let emulate_cpuid = {
198            let supports_rdrand = asm_util::cpuid::supports_rdrand();
199            let supports_rdseed = asm_util::cpuid::supports_rdseed();
200            if !(supports_rdrand || supports_rdseed) {
201                // No need to emulate cpuid.
202                debug!(
203                    "No rdrand nor rdseed support. cpuid emulation is unnecessary, so skipping."
204                );
205                false
206            } else {
207                // CPU has `rdrand` and/or `rdseed`, which produce
208                // non-deterministic results by design.  We want to trap and
209                // emulate `cpuid` in the shim to mask this support so that
210                // managed programs (hopefully) don't use it.
211
212                // Test whether the current platform actually supports intercepting cpuid.
213                // This is dependent on the CPU model and kernel version.
214                let res = unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 0) };
215                match res {
216                    Ok(_) => {
217                        // Re-enable cpuid for ourselves.
218                        unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 1) }
219                            .unwrap_or_else(|e| panic!("Couldn't re-enable cpuid: {e:?}"));
220                        debug!(
221                            "CPU supports rdrand and/or rdseed, and platform supports intercepting cpuid. Enabling cpuid emulation."
222                        );
223                        true
224                    }
225                    Err(e) => {
226                        warn!(
227                            "CPU appears to support rdrand and/or rdseed, but platform doesn't support emulating cpuid ({e:?}). This may break determinism."
228                        );
229                        false
230                    }
231                }
232            }
233        };
234
235        let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
236            log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
237            native_preemption_config: if config.native_preemption_enabled() {
238                FfiOption::Some(NativePreemptionConfig {
239                    native_duration: config.native_preemption_native_interval()?,
240                    sim_duration: config.native_preemption_sim_interval(),
241                })
242            } else {
243                FfiOption::None
244            },
245            emulate_cpuid,
246        });
247
248        Ok(Self {
249            manager_config: Some(manager_config),
250            controller,
251            config,
252            raw_frequency,
253            native_tsc_frequency,
254            end_time,
255            data_path,
256            hosts_path,
257            preload_paths: Arc::new(preload_paths),
258            check_fd_usage: true,
259            check_mem_usage: true,
260            meminfo_file,
261            shmem,
262        })
263    }
264
265    pub fn run(
266        mut self,
267        status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
268    ) -> anyhow::Result<u32> {
269        let mut manager_config = self.manager_config.take().unwrap();
270
271        let min_runahead_config: Option<Duration> = self
272            .config
273            .experimental
274            .runahead
275            .flatten()
276            .map(|x| x.into());
277        let min_runahead_config: Option<SimulationTime> =
278            min_runahead_config.map(|x| x.try_into().unwrap());
279
280        let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
281        let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
282        let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
283
284        let smallest_latency = SimulationTime::from_nanos(
285            manager_config
286                .routing_info
287                .get_smallest_latency_ns()
288                .unwrap(),
289        );
290
291        let parallelism: usize = match self.config.general.parallelism.unwrap() {
292            0 => {
293                let cores = cpu::count_physical_cores().try_into().unwrap();
294                log::info!("The parallelism option was 0, so using parallelism={cores}");
295                cores
296            }
297            x => x.try_into().unwrap(),
298        };
299
300        // Set up the global DNS before building the hosts
301        let mut dns_builder = DnsBuilder::new();
302
303        // Assign the host id only once to guarantee it stays associated with its host.
304        let host_init: Vec<(&HostInfo, HostId)> = manager_config
305            .hosts
306            .iter()
307            .enumerate()
308            .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
309            .collect();
310
311        for (info, id) in &host_init {
312            // Extract the host address.
313            let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
314                unreachable!("IPv6 not supported");
315            };
316
317            // Register in the global DNS.
318            dns_builder
319                .register(*id, addr, info.name.clone())
320                .with_context(|| {
321                    format!(
322                        "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
323                        *id, addr, info.name
324                    )
325                })?;
326        }
327
328        // Convert to a global read-only DNS struct.
329        let dns = dns_builder.into_dns()?;
330
331        // Now build the hosts using the assigned host ids.
332        // note: there are several return points before we add these hosts to the scheduler and we
333        // would leak memory if we return before then, but not worrying about that since the issues
334        // will go away when we move the hosts to rust, and if we don't add them to the scheduler
335        // then it means there was an error and we're going to exit anyways
336        let mut hosts: Vec<_> = host_init
337            .iter()
338            .map(|(info, id)| {
339                self.build_host(*id, info)
340                    .with_context(|| format!("Failed to build host '{}'", info.name))
341            })
342            .collect::<anyhow::Result<_>>()?;
343
344        // shuffle the list of hosts to make sure that they are randomly assigned by the scheduler
345        hosts.shuffle(&mut manager_config.random);
346
347        let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
348
349        // an infinite iterator that always returns `<Option<Option<u32>>>::Some`
350        let cpu_iter =
351            std::iter::from_fn(|| {
352                // if cpu pinning is enabled, return Some(Some(cpu_id)), otherwise return Some(None)
353                Some(use_cpu_pinning.then(|| {
354                    u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
355                }))
356            });
357
358        // shadow is parallelized at the host level, so we don't need more parallelism than the
359        // number of hosts
360        let parallelism = std::cmp::min(parallelism, hosts.len());
361
362        // should have either all `Some` values, or all `None` values
363        let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
364        if cpus[0].is_some() {
365            log::debug!("Pinning to cpus: {cpus:?}");
366            assert!(cpus.iter().all(|x| x.is_some()));
367        } else {
368            log::debug!("Not pinning to CPUs");
369            assert!(cpus.iter().all(|x| x.is_none()));
370        }
371        assert_eq!(cpus.len(), parallelism);
372
373        // set the simulation's global state
374        worker::WORKER_SHARED
375            .borrow_mut()
376            .replace(worker::WorkerShared {
377                ip_assignment: manager_config.ip_assignment,
378                routing_info: manager_config.routing_info,
379                host_bandwidths: manager_config.host_bandwidths,
380                // safe since the DNS type has an internal mutex
381                dns,
382                num_plugin_errors: AtomicU32::new(0),
383                // allow the status logger's state to be updated from anywhere
384                status_logger_state: status_logger_state.map(Arc::clone),
385                runahead: Runahead::new(
386                    self.config.experimental.use_dynamic_runahead.unwrap(),
387                    smallest_latency,
388                    min_runahead_config,
389                ),
390                child_pid_watcher: ChildPidWatcher::new(),
391                event_queues: hosts
392                    .iter()
393                    .map(|x| (x.id(), x.event_queue().clone()))
394                    .collect(),
395                bootstrap_end_time,
396                sim_end_time: self.end_time,
397            });
398
399        // scope used so that the scheduler is dropped before we log the global counters below
400        {
401            let mut scheduler = match self.config.experimental.scheduler.unwrap() {
402                configuration::Scheduler::ThreadPerHost => {
403                    std::thread_local! {
404                        /// A thread-local required by the thread-per-host scheduler.
405                        static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
406                    }
407                    Scheduler::ThreadPerHost(ThreadPerHostSched::new(
408                        &cpus,
409                        &SCHED_HOST_STORAGE,
410                        hosts,
411                    ))
412                }
413                configuration::Scheduler::ThreadPerCore => {
414                    Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
415                        &cpus,
416                        hosts,
417                        self.config.experimental.use_worker_spinning.unwrap(),
418                    ))
419                }
420            };
421
422            // initialize the thread-local Worker
423            scheduler.scope(|s| {
424                s.run(|thread_id| {
425                    worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
426                });
427            });
428
429            // the current simulation interval
430            let mut window = Some((
431                EmulatedTime::SIMULATION_START,
432                EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
433            ));
434
435            // the next event times for each thread; allocated here to avoid re-allocating each
436            // scheduling loop
437            let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
438                vec![AtomicRefCell::new(None); scheduler.parallelism()];
439
440            // how often to log heartbeat messages
441            let heartbeat_interval = self
442                .config
443                .general
444                .heartbeat_interval
445                .flatten()
446                .map(|x| Duration::from(x).try_into().unwrap());
447
448            let mut last_heartbeat = EmulatedTime::SIMULATION_START;
449            let mut time_of_last_usage_check = std::time::Instant::now();
450
451            // the scheduling loop
452            while let Some((window_start, window_end)) = window {
453                // update the status logger
454                let display_time = std::cmp::min(window_start, window_end);
455                worker::WORKER_SHARED
456                    .borrow()
457                    .as_ref()
458                    .unwrap()
459                    .update_status_logger(|state| {
460                        state.current = display_time;
461                    });
462
463                // run the events
464                scheduler.scope(|s| {
465                    // run the closure on each of the scheduler's threads
466                    s.run_with_data(
467                        &thread_next_event_times,
468                        // each call of the closure is given an abstract thread-specific host
469                        // iterator, and an element of 'thread_next_event_times'
470                        move |_, hosts, next_event_time| {
471                            let mut next_event_time = next_event_time.borrow_mut();
472
473                            worker::Worker::reset_next_event_time();
474                            worker::Worker::set_round_end_time(window_end);
475
476                            for_each_host(hosts, |host| {
477                                let host_next_event_time = {
478                                    host.lock_shmem();
479                                    host.execute(window_end);
480                                    let host_next_event_time = host.next_event_time();
481                                    host.unlock_shmem();
482                                    host_next_event_time
483                                };
484                                *next_event_time = [*next_event_time, host_next_event_time]
485                                    .into_iter()
486                                    .flatten() // filter out None
487                                    .reduce(std::cmp::min);
488                            });
489
490                            let packet_next_event_time = worker::Worker::get_next_event_time();
491
492                            *next_event_time = [*next_event_time, packet_next_event_time]
493                                .into_iter()
494                                .flatten() // filter out None
495                                .reduce(std::cmp::min);
496                        },
497                    );
498
499                    // log a heartbeat message every 'heartbeat_interval' amount of simulated time
500                    if let Some(heartbeat_interval) = heartbeat_interval
501                        && window_start > last_heartbeat + heartbeat_interval
502                    {
503                        last_heartbeat = window_start;
504                        self.log_heartbeat(window_start);
505                    }
506
507                    // check resource usage every 30 real seconds
508                    let current_time = std::time::Instant::now();
509                    if current_time.duration_since(time_of_last_usage_check)
510                        > Duration::from_secs(30)
511                    {
512                        time_of_last_usage_check = current_time;
513                        self.check_resource_usage();
514                    }
515                });
516
517                // get the minimum next event time for all threads (also resets the next event times
518                // to None while we have them borrowed)
519                let min_next_event_time = thread_next_event_times
520                    .iter()
521                    // the take() resets it to None for the next scheduling loop
522                    .filter_map(|x| x.borrow_mut().take())
523                    .reduce(std::cmp::min)
524                    .unwrap_or(EmulatedTime::MAX);
525
526                log::debug!(
527                    "Finished execution window [{}--{}], next event at {}",
528                    (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
529                    (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
530                    (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
531                );
532
533                // notify controller that we finished this round, and the time of our next event in
534                // order to fast-forward our execute window if possible
535                window = self
536                    .controller
537                    .manager_finished_current_round(min_next_event_time);
538            }
539
540            scheduler.scope(|s| {
541                s.run_with_hosts(move |_, hosts| {
542                    for_each_host(hosts, |host| {
543                        worker::Worker::set_current_time(self.end_time);
544                        host.free_all_applications();
545                        host.shutdown();
546                        worker::Worker::clear_current_time();
547                    });
548                });
549            });
550
551            // add each thread's local sim statistics to the global sim statistics.
552            scheduler.scope(|s| {
553                s.run(|_| {
554                    worker::Worker::add_to_global_sim_stats();
555                });
556            });
557
558            scheduler.join();
559        }
560
561        // simulation is finished, so update the status logger
562        worker::WORKER_SHARED
563            .borrow()
564            .as_ref()
565            .unwrap()
566            .update_status_logger(|state| {
567                state.current = self.end_time;
568            });
569
570        let num_plugin_errors = worker::WORKER_SHARED
571            .borrow()
572            .as_ref()
573            .unwrap()
574            .plugin_error_count();
575
576        // drop the simulation's global state
577        // must drop before the allocation counters have been checked
578        worker::WORKER_SHARED.borrow_mut().take();
579
580        // since the scheduler was dropped, all workers should have completed and the global object
581        // and syscall counters should have been updated
582
583        worker::with_global_sim_stats(|stats| {
584            if self.config.experimental.use_syscall_counters.unwrap() {
585                log::info!(
586                    "Global syscall counts: {}",
587                    stats.syscall_counts.lock().unwrap()
588                );
589            }
590            if self.config.experimental.use_object_counters.unwrap() {
591                let alloc_counts = stats.alloc_counts.lock().unwrap();
592                let dealloc_counts = stats.dealloc_counts.lock().unwrap();
593                log::info!("Global allocated object counts: {alloc_counts}");
594                log::info!("Global deallocated object counts: {dealloc_counts}");
595
596                if *alloc_counts == *dealloc_counts {
597                    log::info!("We allocated and deallocated the same number of objects :)");
598                } else {
599                    // don't change the formatting of this line as we search for it in test cases
600                    log::warn!("Memory leak detected");
601                }
602            }
603
604            let stats_filename = self.data_path.clone().join("sim-stats.json");
605            sim_stats::write_stats_to_file(&stats_filename, stats)
606        })?;
607
608        Ok(num_plugin_errors)
609    }
610
611    fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
612        let hostname = CString::new(&*host_info.name).unwrap();
613
614        // scope used to enforce drop order for pointers
615        let host = {
616            let params = HostParameters {
617                // the manager sets this ID
618                id: host_id,
619                // the manager sets this CPU frequency
620                cpu_frequency: self.raw_frequency,
621                node_seed: host_info.seed,
622                hostname,
623                node_id: host_info.network_node_id,
624                ip_addr: match host_info.ip_addr.unwrap() {
625                    std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
626                    // the config only allows ipv4 addresses, so this shouldn't happen
627                    std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
628                },
629                sim_end_time: self.end_time,
630                requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
631                requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
632                cpu_threshold: host_info.cpu_threshold,
633                cpu_precision: host_info.cpu_precision,
634                log_level: host_info
635                    .log_level
636                    .map(|x| x.to_c_loglevel())
637                    .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
638                pcap_config: host_info.pcap_config,
639                qdisc: host_info.qdisc,
640                init_sock_recv_buf_size: host_info.recv_buf_size,
641                autotune_recv_buf: host_info.autotune_recv_buf,
642                init_sock_send_buf_size: host_info.send_buf_size,
643                autotune_send_buf: host_info.autotune_send_buf,
644                native_tsc_frequency: self.native_tsc_frequency,
645                model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
646                max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
647                unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
648                unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
649                strace_logging_options: self.config.strace_logging_mode(),
650                shim_log_level: host_info
651                    .log_level
652                    .unwrap_or_else(|| self.config.general.log_level.unwrap())
653                    .to_c_loglevel(),
654                use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
655                use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
656                use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
657            };
658
659            Box::new(Host::new(
660                params,
661                &self.hosts_path,
662                self.raw_frequency,
663                self.shmem(),
664                self.preload_paths.clone(),
665            ))
666        };
667
668        host.lock_shmem();
669
670        for proc in &host_info.processes {
671            let plugin_path =
672                CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
673            let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
674            let pause_for_debugging = host_info.pause_for_debugging;
675
676            let argv: Vec<CString> = proc
677                .args
678                .iter()
679                .map(|x| CString::new(x.as_bytes()).unwrap())
680                .collect();
681
682            let envv: Vec<CString> = proc
683                .env
684                .clone()
685                .into_iter()
686                .map(|(x, y)| {
687                    let mut x: OsString = String::from(x).into();
688                    x.push("=");
689                    x.push(y);
690                    CString::new(x.as_bytes()).unwrap()
691                })
692                .collect();
693
694            host.continue_execution_timer();
695
696            host.add_application(
697                proc.start_time,
698                proc.shutdown_time,
699                proc.shutdown_signal,
700                plugin_name,
701                plugin_path,
702                argv,
703                envv,
704                pause_for_debugging,
705                proc.expected_final_state,
706            );
707
708            host.stop_execution_timer();
709        }
710
711        host.unlock_shmem();
712
713        Ok(host)
714    }
715
716    fn log_heartbeat(&mut self, now: EmulatedTime) {
717        let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
718        if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
719            let err = nix::errno::Errno::last();
720            log::warn!("Unable to get shadow's resource usage: {err}");
721            return;
722        }
723
724        // the sysinfo syscall also would give memory usage info, but it's less detailed
725        let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
726
727        // the linux man page says this is in kilobytes, but it seems to be in kibibytes
728        let max_memory = (resources.ru_maxrss as f64) / 1048576.0; // KiB->GiB
729        let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
730        let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
731
732        // tornettools assumes a specific log format for this message, so don't change it without
733        // testing that tornettools can parse resource usage information from the shadow log
734        // https://github.com/shadow/tornettools/blob/6c00856c3f08899da30bfc452b6a055572cc4536/tornettools/parse_rusage.py#L58-L86
735        log::info!(
736            "Process resource usage at simtime {} reported by getrusage(): \
737            ru_maxrss={:.03} GiB, \
738            ru_utime={:.03} minutes, \
739            ru_stime={:.03} minutes, \
740            ru_nvcsw={}, \
741            ru_nivcsw={}",
742            (now - EmulatedTime::SIMULATION_START).as_nanos(),
743            max_memory,
744            user_time_minutes,
745            system_time_minutes,
746            resources.ru_nvcsw,
747            resources.ru_nivcsw,
748        );
749
750        // there are different ways of calculating system memory usage (for example 'free' will
751        // calculate used memory differently than 'htop'), so we'll log the values we think are
752        // useful, and something parsing the log can calculate whatever it wants
753        log::info!(
754            "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
755            (now - EmulatedTime::SIMULATION_START).as_nanos(),
756            serde_json::to_string(&mem_info).unwrap(),
757        );
758    }
759
760    fn check_resource_usage(&mut self) {
761        if self.check_fd_usage {
762            match self.fd_usage() {
763                // if more than 90% in use
764                Ok((usage, limit)) if usage > limit * 90 / 100 => {
765                    log::warn!(
766                        "Using more than 90% ({usage}/{limit}) of available file descriptors"
767                    );
768                    self.check_fd_usage = false;
769                }
770                Err(e) => {
771                    log::warn!("Unable to check fd usage: {e}");
772                    self.check_fd_usage = false;
773                }
774                Ok(_) => {}
775            }
776        }
777
778        if self.check_mem_usage {
779            match self.memory_remaining() {
780                // if less than 500 MiB available
781                Ok(remaining) if remaining < 500 * 1024 * 1024 => {
782                    log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
783                    self.check_mem_usage = false;
784                }
785                Err(e) => {
786                    log::warn!("Unable to check memory usage: {e}");
787                    self.check_mem_usage = false;
788                }
789                Ok(_) => {}
790            }
791        }
792    }
793
794    /// Returns a tuple of (usage, limit).
795    fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
796        let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
797
798        let mut fd_count: u64 = 0;
799        for entry in dir {
800            // short-circuit and return on error
801            entry.context("Failed to read entry in '/proc/self/fd'")?;
802            fd_count += 1;
803        }
804
805        let (soft_limit, _) =
806            nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
807                .context("Failed to get the fd limit")?;
808
809        Ok((fd_count, soft_limit))
810    }
811
812    /// Returns the number of bytes remaining.
813    fn memory_remaining(&mut self) -> anyhow::Result<u64> {
814        let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
815            .context("Failed to get the page size")?
816            .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
817
818        let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
819            .context("Failed to get the number of available pages of physical memory")?
820            .ok_or_else(|| {
821                anyhow::anyhow!(
822                    "Failed to get the number of available pages of physical memory (no errno)"
823                )
824            })?;
825
826        let page_size: u64 = page_size.try_into().unwrap();
827        let avl_pages: u64 = avl_pages.try_into().unwrap();
828
829        Ok(page_size * avl_pages)
830    }
831
832    pub fn shmem(&self) -> &ShMemBlock<'_, ManagerShmem> {
833        &self.shmem
834    }
835}
836
837pub struct ManagerConfig {
838    // deterministic source of randomness for this manager
839    pub random: Xoshiro256PlusPlus,
840
841    // map of ip addresses to graph nodes
842    pub ip_assignment: IpAssignment<u32>,
843
844    // routing information for paths between graph nodes
845    pub routing_info: RoutingInfo<u32>,
846
847    // bandwidths of hosts at ip addresses
848    pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
849
850    // a list of hosts and their processes
851    pub hosts: Vec<HostInfo>,
852}
853
854/// Helper function to initialize the global [`Host`] before running the closure.
855fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
856    host_iter.for_each(|host| {
857        worker::Worker::set_active_host(host);
858        worker::Worker::with_active_host(|host| {
859            f(host);
860        })
861        .unwrap();
862        worker::Worker::take_active_host()
863    });
864}
865
866/// Get the raw speed of the experiment machine.
867fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
868    const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
869    let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
870    Ok(khz * 1000)
871}
872
873fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
874    let libname_c = CString::new(libname).unwrap();
875    let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
876
877    // scope needed to make sure the CStr is dropped before we free libpath_c
878    let libpath = if !libpath_c.is_null() {
879        let libpath = unsafe { CStr::from_ptr(libpath_c) };
880        let libpath = OsStr::from_bytes(libpath.to_bytes());
881        Some(PathBuf::from(libpath.to_os_string()))
882    } else {
883        None
884    };
885
886    unsafe { libc::free(libpath_c as *mut libc::c_void) };
887
888    let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
889
890    let bytes = libpath.as_os_str().as_bytes();
891    if bytes.iter().any(|c| *c == b' ' || *c == b':') {
892        // These are unescapable separators in LD_PRELOAD.
893        anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
894    }
895
896    log::debug!(
897        "Found required preload library {} at path {}",
898        libname,
899        libpath.display(),
900    );
901
902    Ok(libpath)
903}