shadow_rs/core/
manager.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use linux_api::prctl::ArchPrctlOp;
13use log::{debug, warn};
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16use scheduler::thread_per_core::ThreadPerCoreSched;
17use scheduler::thread_per_host::ThreadPerHostSched;
18use scheduler::{HostIter, Scheduler};
19use shadow_shim_helper_rs::HostId;
20use shadow_shim_helper_rs::emulated_time::EmulatedTime;
21use shadow_shim_helper_rs::option::FfiOption;
22use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
23use shadow_shim_helper_rs::simulation_time::SimulationTime;
24use shadow_shmem::allocator::ShMemBlock;
25
26use crate::core::configuration::{self, ConfigOptions, Flatten};
27use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
28use crate::core::cpu;
29use crate::core::resource_usage;
30use crate::core::runahead::Runahead;
31use crate::core::sim_config::{Bandwidth, HostInfo};
32use crate::core::sim_stats;
33use crate::core::worker;
34use crate::cshadow as c;
35use crate::host::host::{Host, HostParameters};
36use crate::network::dns::DnsBuilder;
37use crate::network::graph::{IpAssignment, RoutingInfo};
38use crate::utility;
39use crate::utility::childpid_watcher::ChildPidWatcher;
40use crate::utility::status_bar::Status;
41
42pub struct Manager<'a> {
43    manager_config: Option<ManagerConfig>,
44    controller: &'a Controller<'a>,
45    config: &'a ConfigOptions,
46
47    raw_frequency: u64,
48    native_tsc_frequency: u64,
49    end_time: EmulatedTime,
50
51    data_path: PathBuf,
52    hosts_path: PathBuf,
53
54    preload_paths: Arc<Vec<PathBuf>>,
55
56    check_fd_usage: bool,
57    check_mem_usage: bool,
58
59    meminfo_file: std::fs::File,
60    shmem: ShMemBlock<'static, ManagerShmem>,
61}
62
63impl<'a> Manager<'a> {
64    pub fn new(
65        manager_config: ManagerConfig,
66        controller: &'a Controller<'a>,
67        config: &'a ConfigOptions,
68        end_time: EmulatedTime,
69    ) -> anyhow::Result<Self> {
70        // get the system's CPU frequency
71        let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
72            let default_freq = 2_500_000_000; // 2.5 GHz
73            log::debug!("Failed to get raw CPU frequency, using {default_freq} Hz instead: {e}");
74            default_freq
75        });
76
77        let native_tsc_frequency = if let Some(f) = asm_util::tsc::Tsc::native_cycles_per_second() {
78            f
79        } else {
80            warn!(
81                "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
82            );
83            raw_frequency
84        };
85
86        let mut preload_paths = Vec::new();
87
88        // we always preload the injector lib to ensure that the shim is loaded into the managed
89        // processes
90        const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
91        preload_paths.push(
92            get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
93                format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
94            })?,
95        );
96
97        // preload libc lib if option is enabled
98        const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
99        if config.experimental.use_preload_libc.unwrap() {
100            let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
101                format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
102            })?;
103            preload_paths.push(path);
104        } else {
105            log::info!("Preloading the libc library is disabled");
106        };
107
108        // preload openssl rng lib if option is enabled
109        const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
110        if config.experimental.use_preload_openssl_rng.unwrap() {
111            let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
112                format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
113            })?;
114            preload_paths.push(path);
115        } else {
116            log::info!("Preloading the openssl rng library is disabled");
117        };
118
119        // preload openssl crypto lib if option is enabled
120        const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
121        if config.experimental.use_preload_openssl_crypto.unwrap() {
122            let path =
123                get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
124                    format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
125                })?;
126            preload_paths.push(path);
127        } else {
128            log::info!("Preloading the openssl crypto library is disabled");
129        };
130
131        // use the working dir to generate absolute paths
132        let cwd = std::env::current_dir()?;
133        let template_path = config
134            .general
135            .template_directory
136            .flatten_ref()
137            .map(|x| cwd.clone().join(x));
138        let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
139        let hosts_path = data_path.join("hosts");
140
141        if let Some(template_path) = template_path {
142            log::debug!(
143                "Copying template directory '{}' to '{}'",
144                template_path.display(),
145                data_path.display()
146            );
147
148            // copy the template directory to the data directory path
149            utility::copy_dir_all(&template_path, &data_path).with_context(|| {
150                format!(
151                    "Failed to copy template directory '{}' to '{}'",
152                    template_path.display(),
153                    data_path.display()
154                )
155            })?;
156
157            // create the hosts directory if it doesn't exist
158            let result = std::fs::create_dir(&hosts_path);
159            if let Err(e) = result {
160                if e.kind() != std::io::ErrorKind::AlreadyExists {
161                    return Err(e).context(format!(
162                        "Failed to create hosts directory '{}'",
163                        hosts_path.display()
164                    ));
165                }
166            }
167        } else {
168            // create the data and hosts directories
169            std::fs::create_dir(&data_path).with_context(|| {
170                format!("Failed to create data directory '{}'", data_path.display())
171            })?;
172            std::fs::create_dir(&hosts_path).with_context(|| {
173                format!(
174                    "Failed to create hosts directory '{}'",
175                    hosts_path.display(),
176                )
177            })?;
178        }
179
180        // save the processed config as yaml
181        let config_out_filename = data_path.join("processed-config.yaml");
182        let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
183            format!("Failed to create file '{}'", config_out_filename.display())
184        })?;
185
186        serde_yaml::to_writer(config_out_file, &config).with_context(|| {
187            format!(
188                "Failed to write processed config yaml to file '{}'",
189                config_out_filename.display()
190            )
191        })?;
192
193        let meminfo_file =
194            std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
195
196        // Determind whether we can and should emulate cpuid in the shim.
197        let emulate_cpuid = {
198            // SAFETY: we don't support running in esoteric environments where cpuid isn't available.
199            let supports_rdrand = unsafe { asm_util::cpuid::supports_rdrand() };
200            let supports_rdseed = unsafe { asm_util::cpuid::supports_rdseed() };
201            if !(supports_rdrand || supports_rdseed) {
202                // No need to emulate cpuid.
203                debug!(
204                    "No rdrand nor rdseed support. cpuid emulation is unnecessary, so skipping."
205                );
206                false
207            } else {
208                // CPU has `rdrand` and/or `rdseed`, which produce
209                // non-deterministic results by design.  We want to trap and
210                // emulate `cpuid` in the shim to mask this support so that
211                // managed programs (hopefully) don't use it.
212
213                // Test whether the current platform actually supports intercepting cpuid.
214                // This is dependent on the CPU model and kernel version.
215                let res = unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 0) };
216                match res {
217                    Ok(_) => {
218                        // Re-enable cpuid for ourselves.
219                        unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 1) }
220                            .unwrap_or_else(|e| panic!("Couldn't re-enable cpuid: {e:?}"));
221                        debug!(
222                            "CPU supports rdrand and/or rdseed, and platform supports intercepting cpuid. Enabling cpuid emulation."
223                        );
224                        true
225                    }
226                    Err(e) => {
227                        warn!(
228                            "CPU appears to support rdrand and/or rdseed, but platform doesn't support emulating cpuid ({e:?}). This may break determinism."
229                        );
230                        false
231                    }
232                }
233            }
234        };
235
236        let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
237            log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
238            native_preemption_config: if config.native_preemption_enabled() {
239                FfiOption::Some(NativePreemptionConfig {
240                    native_duration: config.native_preemption_native_interval()?,
241                    sim_duration: config.native_preemption_sim_interval(),
242                })
243            } else {
244                FfiOption::None
245            },
246            emulate_cpuid,
247        });
248
249        Ok(Self {
250            manager_config: Some(manager_config),
251            controller,
252            config,
253            raw_frequency,
254            native_tsc_frequency,
255            end_time,
256            data_path,
257            hosts_path,
258            preload_paths: Arc::new(preload_paths),
259            check_fd_usage: true,
260            check_mem_usage: true,
261            meminfo_file,
262            shmem,
263        })
264    }
265
266    pub fn run(
267        mut self,
268        status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
269    ) -> anyhow::Result<u32> {
270        let mut manager_config = self.manager_config.take().unwrap();
271
272        let min_runahead_config: Option<Duration> = self
273            .config
274            .experimental
275            .runahead
276            .flatten()
277            .map(|x| x.into());
278        let min_runahead_config: Option<SimulationTime> =
279            min_runahead_config.map(|x| x.try_into().unwrap());
280
281        let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
282        let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
283        let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
284
285        let smallest_latency = SimulationTime::from_nanos(
286            manager_config
287                .routing_info
288                .get_smallest_latency_ns()
289                .unwrap(),
290        );
291
292        let parallelism: usize = match self.config.general.parallelism.unwrap() {
293            0 => {
294                let cores = cpu::count_physical_cores().try_into().unwrap();
295                log::info!("The parallelism option was 0, so using parallelism={cores}");
296                cores
297            }
298            x => x.try_into().unwrap(),
299        };
300
301        // Set up the global DNS before building the hosts
302        let mut dns_builder = DnsBuilder::new();
303
304        // Assign the host id only once to guarantee it stays associated with its host.
305        let host_init: Vec<(&HostInfo, HostId)> = manager_config
306            .hosts
307            .iter()
308            .enumerate()
309            .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
310            .collect();
311
312        for (info, id) in &host_init {
313            // Extract the host address.
314            let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
315                unreachable!("IPv6 not supported");
316            };
317
318            // Register in the global DNS.
319            dns_builder
320                .register(*id, addr, info.name.clone())
321                .with_context(|| {
322                    format!(
323                        "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
324                        *id, addr, info.name
325                    )
326                })?;
327        }
328
329        // Convert to a global read-only DNS struct.
330        let dns = dns_builder.into_dns()?;
331
332        // Now build the hosts using the assigned host ids.
333        // note: there are several return points before we add these hosts to the scheduler and we
334        // would leak memory if we return before then, but not worrying about that since the issues
335        // will go away when we move the hosts to rust, and if we don't add them to the scheduler
336        // then it means there was an error and we're going to exit anyways
337        let mut hosts: Vec<_> = host_init
338            .iter()
339            .map(|(info, id)| {
340                self.build_host(*id, info)
341                    .with_context(|| format!("Failed to build host '{}'", info.name))
342            })
343            .collect::<anyhow::Result<_>>()?;
344
345        // shuffle the list of hosts to make sure that they are randomly assigned by the scheduler
346        hosts.shuffle(&mut manager_config.random);
347
348        let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
349
350        // an infinite iterator that always returns `<Option<Option<u32>>>::Some`
351        let cpu_iter =
352            std::iter::from_fn(|| {
353                // if cpu pinning is enabled, return Some(Some(cpu_id)), otherwise return Some(None)
354                Some(use_cpu_pinning.then(|| {
355                    u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
356                }))
357            });
358
359        // shadow is parallelized at the host level, so we don't need more parallelism than the
360        // number of hosts
361        let parallelism = std::cmp::min(parallelism, hosts.len());
362
363        // should have either all `Some` values, or all `None` values
364        let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
365        if cpus[0].is_some() {
366            log::debug!("Pinning to cpus: {cpus:?}");
367            assert!(cpus.iter().all(|x| x.is_some()));
368        } else {
369            log::debug!("Not pinning to CPUs");
370            assert!(cpus.iter().all(|x| x.is_none()));
371        }
372        assert_eq!(cpus.len(), parallelism);
373
374        // set the simulation's global state
375        worker::WORKER_SHARED
376            .borrow_mut()
377            .replace(worker::WorkerShared {
378                ip_assignment: manager_config.ip_assignment,
379                routing_info: manager_config.routing_info,
380                host_bandwidths: manager_config.host_bandwidths,
381                // safe since the DNS type has an internal mutex
382                dns,
383                num_plugin_errors: AtomicU32::new(0),
384                // allow the status logger's state to be updated from anywhere
385                status_logger_state: status_logger_state.map(Arc::clone),
386                runahead: Runahead::new(
387                    self.config.experimental.use_dynamic_runahead.unwrap(),
388                    smallest_latency,
389                    min_runahead_config,
390                ),
391                child_pid_watcher: ChildPidWatcher::new(),
392                event_queues: hosts
393                    .iter()
394                    .map(|x| (x.id(), x.event_queue().clone()))
395                    .collect(),
396                bootstrap_end_time,
397                sim_end_time: self.end_time,
398            });
399
400        // scope used so that the scheduler is dropped before we log the global counters below
401        {
402            let mut scheduler = match self.config.experimental.scheduler.unwrap() {
403                configuration::Scheduler::ThreadPerHost => {
404                    std::thread_local! {
405                        /// A thread-local required by the thread-per-host scheduler.
406                        static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
407                    }
408                    Scheduler::ThreadPerHost(ThreadPerHostSched::new(
409                        &cpus,
410                        &SCHED_HOST_STORAGE,
411                        hosts,
412                    ))
413                }
414                configuration::Scheduler::ThreadPerCore => {
415                    Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
416                        &cpus,
417                        hosts,
418                        self.config.experimental.use_worker_spinning.unwrap(),
419                    ))
420                }
421            };
422
423            // initialize the thread-local Worker
424            scheduler.scope(|s| {
425                s.run(|thread_id| {
426                    worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
427                });
428            });
429
430            // the current simulation interval
431            let mut window = Some((
432                EmulatedTime::SIMULATION_START,
433                EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
434            ));
435
436            // the next event times for each thread; allocated here to avoid re-allocating each
437            // scheduling loop
438            let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
439                vec![AtomicRefCell::new(None); scheduler.parallelism()];
440
441            // how often to log heartbeat messages
442            let heartbeat_interval = self
443                .config
444                .general
445                .heartbeat_interval
446                .flatten()
447                .map(|x| Duration::from(x).try_into().unwrap());
448
449            let mut last_heartbeat = EmulatedTime::SIMULATION_START;
450            let mut time_of_last_usage_check = std::time::Instant::now();
451
452            // the scheduling loop
453            while let Some((window_start, window_end)) = window {
454                // update the status logger
455                let display_time = std::cmp::min(window_start, window_end);
456                worker::WORKER_SHARED
457                    .borrow()
458                    .as_ref()
459                    .unwrap()
460                    .update_status_logger(|state| {
461                        state.current = display_time;
462                    });
463
464                // run the events
465                scheduler.scope(|s| {
466                    // run the closure on each of the scheduler's threads
467                    s.run_with_data(
468                        &thread_next_event_times,
469                        // each call of the closure is given an abstract thread-specific host
470                        // iterator, and an element of 'thread_next_event_times'
471                        move |_, hosts, next_event_time| {
472                            let mut next_event_time = next_event_time.borrow_mut();
473
474                            worker::Worker::reset_next_event_time();
475                            worker::Worker::set_round_end_time(window_end);
476
477                            for_each_host(hosts, |host| {
478                                let host_next_event_time = {
479                                    host.lock_shmem();
480                                    host.execute(window_end);
481                                    let host_next_event_time = host.next_event_time();
482                                    host.unlock_shmem();
483                                    host_next_event_time
484                                };
485                                *next_event_time = [*next_event_time, host_next_event_time]
486                                    .into_iter()
487                                    .flatten() // filter out None
488                                    .reduce(std::cmp::min);
489                            });
490
491                            let packet_next_event_time = worker::Worker::get_next_event_time();
492
493                            *next_event_time = [*next_event_time, packet_next_event_time]
494                                .into_iter()
495                                .flatten() // filter out None
496                                .reduce(std::cmp::min);
497                        },
498                    );
499
500                    // log a heartbeat message every 'heartbeat_interval' amount of simulated time
501                    if let Some(heartbeat_interval) = heartbeat_interval {
502                        if window_start > last_heartbeat + heartbeat_interval {
503                            last_heartbeat = window_start;
504                            self.log_heartbeat(window_start);
505                        }
506                    }
507
508                    // check resource usage every 30 real seconds
509                    let current_time = std::time::Instant::now();
510                    if current_time.duration_since(time_of_last_usage_check)
511                        > Duration::from_secs(30)
512                    {
513                        time_of_last_usage_check = current_time;
514                        self.check_resource_usage();
515                    }
516                });
517
518                // get the minimum next event time for all threads (also resets the next event times
519                // to None while we have them borrowed)
520                let min_next_event_time = thread_next_event_times
521                    .iter()
522                    // the take() resets it to None for the next scheduling loop
523                    .filter_map(|x| x.borrow_mut().take())
524                    .reduce(std::cmp::min)
525                    .unwrap_or(EmulatedTime::MAX);
526
527                log::debug!(
528                    "Finished execution window [{}--{}], next event at {}",
529                    (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
530                    (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
531                    (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
532                );
533
534                // notify controller that we finished this round, and the time of our next event in
535                // order to fast-forward our execute window if possible
536                window = self
537                    .controller
538                    .manager_finished_current_round(min_next_event_time);
539            }
540
541            scheduler.scope(|s| {
542                s.run_with_hosts(move |_, hosts| {
543                    for_each_host(hosts, |host| {
544                        worker::Worker::set_current_time(self.end_time);
545                        host.free_all_applications();
546                        host.shutdown();
547                        worker::Worker::clear_current_time();
548                    });
549                });
550            });
551
552            // add each thread's local sim statistics to the global sim statistics.
553            scheduler.scope(|s| {
554                s.run(|_| {
555                    worker::Worker::add_to_global_sim_stats();
556                });
557            });
558
559            scheduler.join();
560        }
561
562        // simulation is finished, so update the status logger
563        worker::WORKER_SHARED
564            .borrow()
565            .as_ref()
566            .unwrap()
567            .update_status_logger(|state| {
568                state.current = self.end_time;
569            });
570
571        let num_plugin_errors = worker::WORKER_SHARED
572            .borrow()
573            .as_ref()
574            .unwrap()
575            .plugin_error_count();
576
577        // drop the simulation's global state
578        // must drop before the allocation counters have been checked
579        worker::WORKER_SHARED.borrow_mut().take();
580
581        // since the scheduler was dropped, all workers should have completed and the global object
582        // and syscall counters should have been updated
583
584        worker::with_global_sim_stats(|stats| {
585            if self.config.experimental.use_syscall_counters.unwrap() {
586                log::info!(
587                    "Global syscall counts: {}",
588                    stats.syscall_counts.lock().unwrap()
589                );
590            }
591            if self.config.experimental.use_object_counters.unwrap() {
592                let alloc_counts = stats.alloc_counts.lock().unwrap();
593                let dealloc_counts = stats.dealloc_counts.lock().unwrap();
594                log::info!("Global allocated object counts: {alloc_counts}");
595                log::info!("Global deallocated object counts: {dealloc_counts}");
596
597                if *alloc_counts == *dealloc_counts {
598                    log::info!("We allocated and deallocated the same number of objects :)");
599                } else {
600                    // don't change the formatting of this line as we search for it in test cases
601                    log::warn!("Memory leak detected");
602                }
603            }
604
605            let stats_filename = self.data_path.clone().join("sim-stats.json");
606            sim_stats::write_stats_to_file(&stats_filename, stats)
607        })?;
608
609        Ok(num_plugin_errors)
610    }
611
612    fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
613        let hostname = CString::new(&*host_info.name).unwrap();
614
615        // scope used to enforce drop order for pointers
616        let host = {
617            let params = HostParameters {
618                // the manager sets this ID
619                id: host_id,
620                // the manager sets this CPU frequency
621                cpu_frequency: self.raw_frequency,
622                node_seed: host_info.seed,
623                hostname,
624                node_id: host_info.network_node_id,
625                ip_addr: match host_info.ip_addr.unwrap() {
626                    std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
627                    // the config only allows ipv4 addresses, so this shouldn't happen
628                    std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
629                },
630                sim_end_time: self.end_time,
631                requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
632                requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
633                cpu_threshold: host_info.cpu_threshold,
634                cpu_precision: host_info.cpu_precision,
635                log_level: host_info
636                    .log_level
637                    .map(|x| x.to_c_loglevel())
638                    .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
639                pcap_config: host_info.pcap_config,
640                qdisc: host_info.qdisc,
641                init_sock_recv_buf_size: host_info.recv_buf_size,
642                autotune_recv_buf: host_info.autotune_recv_buf,
643                init_sock_send_buf_size: host_info.send_buf_size,
644                autotune_send_buf: host_info.autotune_send_buf,
645                native_tsc_frequency: self.native_tsc_frequency,
646                model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
647                max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
648                unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
649                unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
650                strace_logging_options: self.config.strace_logging_mode(),
651                shim_log_level: host_info
652                    .log_level
653                    .unwrap_or_else(|| self.config.general.log_level.unwrap())
654                    .to_c_loglevel(),
655                use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
656                use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
657                use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
658            };
659
660            Box::new(Host::new(
661                params,
662                &self.hosts_path,
663                self.raw_frequency,
664                self.shmem(),
665                self.preload_paths.clone(),
666            ))
667        };
668
669        host.lock_shmem();
670
671        for proc in &host_info.processes {
672            let plugin_path =
673                CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
674            let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
675            let pause_for_debugging = host_info.pause_for_debugging;
676
677            let argv: Vec<CString> = proc
678                .args
679                .iter()
680                .map(|x| CString::new(x.as_bytes()).unwrap())
681                .collect();
682
683            let envv: Vec<CString> = proc
684                .env
685                .clone()
686                .into_iter()
687                .map(|(x, y)| {
688                    let mut x: OsString = String::from(x).into();
689                    x.push("=");
690                    x.push(y);
691                    CString::new(x.as_bytes()).unwrap()
692                })
693                .collect();
694
695            host.continue_execution_timer();
696
697            host.add_application(
698                proc.start_time,
699                proc.shutdown_time,
700                proc.shutdown_signal,
701                plugin_name,
702                plugin_path,
703                argv,
704                envv,
705                pause_for_debugging,
706                proc.expected_final_state,
707            );
708
709            host.stop_execution_timer();
710        }
711
712        host.unlock_shmem();
713
714        Ok(host)
715    }
716
717    fn log_heartbeat(&mut self, now: EmulatedTime) {
718        let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
719        if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
720            let err = nix::errno::Errno::last();
721            log::warn!("Unable to get shadow's resource usage: {err}");
722            return;
723        }
724
725        // the sysinfo syscall also would give memory usage info, but it's less detailed
726        let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
727
728        // the linux man page says this is in kilobytes, but it seems to be in kibibytes
729        let max_memory = (resources.ru_maxrss as f64) / 1048576.0; // KiB->GiB
730        let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
731        let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
732
733        // tornettools assumes a specific log format for this message, so don't change it without
734        // testing that tornettools can parse resource usage information from the shadow log
735        // https://github.com/shadow/tornettools/blob/6c00856c3f08899da30bfc452b6a055572cc4536/tornettools/parse_rusage.py#L58-L86
736        log::info!(
737            "Process resource usage at simtime {} reported by getrusage(): \
738            ru_maxrss={:.03} GiB, \
739            ru_utime={:.03} minutes, \
740            ru_stime={:.03} minutes, \
741            ru_nvcsw={}, \
742            ru_nivcsw={}",
743            (now - EmulatedTime::SIMULATION_START).as_nanos(),
744            max_memory,
745            user_time_minutes,
746            system_time_minutes,
747            resources.ru_nvcsw,
748            resources.ru_nivcsw,
749        );
750
751        // there are different ways of calculating system memory usage (for example 'free' will
752        // calculate used memory differently than 'htop'), so we'll log the values we think are
753        // useful, and something parsing the log can calculate whatever it wants
754        log::info!(
755            "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
756            (now - EmulatedTime::SIMULATION_START).as_nanos(),
757            serde_json::to_string(&mem_info).unwrap(),
758        );
759    }
760
761    fn check_resource_usage(&mut self) {
762        if self.check_fd_usage {
763            match self.fd_usage() {
764                // if more than 90% in use
765                Ok((usage, limit)) if usage > limit * 90 / 100 => {
766                    log::warn!(
767                        "Using more than 90% ({usage}/{limit}) of available file descriptors"
768                    );
769                    self.check_fd_usage = false;
770                }
771                Err(e) => {
772                    log::warn!("Unable to check fd usage: {e}");
773                    self.check_fd_usage = false;
774                }
775                Ok(_) => {}
776            }
777        }
778
779        if self.check_mem_usage {
780            match self.memory_remaining() {
781                // if less than 500 MiB available
782                Ok(remaining) if remaining < 500 * 1024 * 1024 => {
783                    log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
784                    self.check_mem_usage = false;
785                }
786                Err(e) => {
787                    log::warn!("Unable to check memory usage: {e}");
788                    self.check_mem_usage = false;
789                }
790                Ok(_) => {}
791            }
792        }
793    }
794
795    /// Returns a tuple of (usage, limit).
796    fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
797        let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
798
799        let mut fd_count: u64 = 0;
800        for entry in dir {
801            // short-circuit and return on error
802            entry.context("Failed to read entry in '/proc/self/fd'")?;
803            fd_count += 1;
804        }
805
806        let (soft_limit, _) =
807            nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
808                .context("Failed to get the fd limit")?;
809
810        Ok((fd_count, soft_limit))
811    }
812
813    /// Returns the number of bytes remaining.
814    fn memory_remaining(&mut self) -> anyhow::Result<u64> {
815        let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
816            .context("Failed to get the page size")?
817            .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
818
819        let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
820            .context("Failed to get the number of available pages of physical memory")?
821            .ok_or_else(|| {
822                anyhow::anyhow!(
823                    "Failed to get the number of available pages of physical memory (no errno)"
824                )
825            })?;
826
827        let page_size: u64 = page_size.try_into().unwrap();
828        let avl_pages: u64 = avl_pages.try_into().unwrap();
829
830        Ok(page_size * avl_pages)
831    }
832
833    pub fn shmem(&self) -> &ShMemBlock<ManagerShmem> {
834        &self.shmem
835    }
836}
837
838pub struct ManagerConfig {
839    // deterministic source of randomness for this manager
840    pub random: Xoshiro256PlusPlus,
841
842    // map of ip addresses to graph nodes
843    pub ip_assignment: IpAssignment<u32>,
844
845    // routing information for paths between graph nodes
846    pub routing_info: RoutingInfo<u32>,
847
848    // bandwidths of hosts at ip addresses
849    pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
850
851    // a list of hosts and their processes
852    pub hosts: Vec<HostInfo>,
853}
854
855/// Helper function to initialize the global [`Host`] before running the closure.
856fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
857    host_iter.for_each(|host| {
858        worker::Worker::set_active_host(host);
859        worker::Worker::with_active_host(|host| {
860            f(host);
861        })
862        .unwrap();
863        worker::Worker::take_active_host()
864    });
865}
866
867/// Get the raw speed of the experiment machine.
868fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
869    const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
870    let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
871    Ok(khz * 1000)
872}
873
874fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
875    let libname_c = CString::new(libname).unwrap();
876    let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
877
878    // scope needed to make sure the CStr is dropped before we free libpath_c
879    let libpath = if !libpath_c.is_null() {
880        let libpath = unsafe { CStr::from_ptr(libpath_c) };
881        let libpath = OsStr::from_bytes(libpath.to_bytes());
882        Some(PathBuf::from(libpath.to_os_string()))
883    } else {
884        None
885    };
886
887    unsafe { libc::free(libpath_c as *mut libc::c_void) };
888
889    let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
890
891    let bytes = libpath.as_os_str().as_bytes();
892    if bytes.iter().any(|c| *c == b' ' || *c == b':') {
893        // These are unescapable separators in LD_PRELOAD.
894        anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
895    }
896
897    log::debug!(
898        "Found required preload library {} at path {}",
899        libname,
900        libpath.display(),
901    );
902
903    Ok(libpath)
904}