shadow_rs/core/
manager.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use linux_api::prctl::ArchPrctlOp;
13use log::{debug, warn};
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16use scheduler::thread_per_core::ThreadPerCoreSched;
17use scheduler::thread_per_host::ThreadPerHostSched;
18use scheduler::{HostIter, Scheduler};
19use shadow_shim_helper_rs::HostId;
20use shadow_shim_helper_rs::emulated_time::EmulatedTime;
21use shadow_shim_helper_rs::option::FfiOption;
22use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
23use shadow_shim_helper_rs::simulation_time::SimulationTime;
24use shadow_shmem::allocator::ShMemBlock;
25
26use crate::core::configuration::{self, ConfigOptions, Flatten};
27use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
28use crate::core::cpu;
29use crate::core::resource_usage;
30use crate::core::runahead::Runahead;
31use crate::core::sim_config::{Bandwidth, HostInfo};
32use crate::core::sim_stats;
33use crate::core::worker;
34use crate::cshadow as c;
35use crate::host::host::{Host, HostParameters};
36use crate::network::dns::DnsBuilder;
37use crate::network::graph::{IpAssignment, RoutingInfo};
38use crate::utility;
39use crate::utility::childpid_watcher::ChildPidWatcher;
40use crate::utility::status_bar::Status;
41
42pub struct Manager<'a> {
43    manager_config: Option<ManagerConfig>,
44    controller: &'a Controller<'a>,
45    config: &'a ConfigOptions,
46
47    raw_frequency: u64,
48    native_tsc_frequency: u64,
49    end_time: EmulatedTime,
50
51    data_path: PathBuf,
52    hosts_path: PathBuf,
53
54    preload_paths: Arc<Vec<PathBuf>>,
55
56    check_fd_usage: bool,
57    check_mem_usage: bool,
58
59    meminfo_file: std::fs::File,
60    shmem: ShMemBlock<'static, ManagerShmem>,
61}
62
63impl<'a> Manager<'a> {
64    pub fn new(
65        manager_config: ManagerConfig,
66        controller: &'a Controller<'a>,
67        config: &'a ConfigOptions,
68        end_time: EmulatedTime,
69    ) -> anyhow::Result<Self> {
70        // get the system's CPU frequency
71        let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
72            let default_freq = 2_500_000_000; // 2.5 GHz
73            log::debug!(
74                "Failed to get raw CPU frequency, using {} Hz instead: {}",
75                default_freq,
76                e
77            );
78            default_freq
79        });
80
81        let native_tsc_frequency = if let Some(f) = asm_util::tsc::Tsc::native_cycles_per_second() {
82            f
83        } else {
84            warn!(
85                "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
86            );
87            raw_frequency
88        };
89
90        let mut preload_paths = Vec::new();
91
92        // we always preload the injector lib to ensure that the shim is loaded into the managed
93        // processes
94        const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
95        preload_paths.push(
96            get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
97                format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
98            })?,
99        );
100
101        // preload libc lib if option is enabled
102        const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
103        if config.experimental.use_preload_libc.unwrap() {
104            let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
105                format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
106            })?;
107            preload_paths.push(path);
108        } else {
109            log::info!("Preloading the libc library is disabled");
110        };
111
112        // preload openssl rng lib if option is enabled
113        const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
114        if config.experimental.use_preload_openssl_rng.unwrap() {
115            let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
116                format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
117            })?;
118            preload_paths.push(path);
119        } else {
120            log::info!("Preloading the openssl rng library is disabled");
121        };
122
123        // preload openssl crypto lib if option is enabled
124        const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
125        if config.experimental.use_preload_openssl_crypto.unwrap() {
126            let path =
127                get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
128                    format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
129                })?;
130            preload_paths.push(path);
131        } else {
132            log::info!("Preloading the openssl crypto library is disabled");
133        };
134
135        // use the working dir to generate absolute paths
136        let cwd = std::env::current_dir()?;
137        let template_path = config
138            .general
139            .template_directory
140            .flatten_ref()
141            .map(|x| cwd.clone().join(x));
142        let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
143        let hosts_path = data_path.join("hosts");
144
145        if let Some(template_path) = template_path {
146            log::debug!(
147                "Copying template directory '{}' to '{}'",
148                template_path.display(),
149                data_path.display()
150            );
151
152            // copy the template directory to the data directory path
153            utility::copy_dir_all(&template_path, &data_path).with_context(|| {
154                format!(
155                    "Failed to copy template directory '{}' to '{}'",
156                    template_path.display(),
157                    data_path.display()
158                )
159            })?;
160
161            // create the hosts directory if it doesn't exist
162            let result = std::fs::create_dir(&hosts_path);
163            if let Err(e) = result {
164                if e.kind() != std::io::ErrorKind::AlreadyExists {
165                    return Err(e).context(format!(
166                        "Failed to create hosts directory '{}'",
167                        hosts_path.display()
168                    ));
169                }
170            }
171        } else {
172            // create the data and hosts directories
173            std::fs::create_dir(&data_path).with_context(|| {
174                format!("Failed to create data directory '{}'", data_path.display())
175            })?;
176            std::fs::create_dir(&hosts_path).with_context(|| {
177                format!(
178                    "Failed to create hosts directory '{}'",
179                    hosts_path.display(),
180                )
181            })?;
182        }
183
184        // save the processed config as yaml
185        let config_out_filename = data_path.join("processed-config.yaml");
186        let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
187            format!("Failed to create file '{}'", config_out_filename.display())
188        })?;
189
190        serde_yaml::to_writer(config_out_file, &config).with_context(|| {
191            format!(
192                "Failed to write processed config yaml to file '{}'",
193                config_out_filename.display()
194            )
195        })?;
196
197        let meminfo_file =
198            std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
199
200        // Determind whether we can and should emulate cpuid in the shim.
201        let emulate_cpuid = {
202            // SAFETY: we don't support running in esoteric environments where cpuid isn't available.
203            let supports_rdrand = unsafe { asm_util::cpuid::supports_rdrand() };
204            let supports_rdseed = unsafe { asm_util::cpuid::supports_rdseed() };
205            if !(supports_rdrand || supports_rdseed) {
206                // No need to emulate cpuid.
207                debug!(
208                    "No rdrand nor rdseed support. cpuid emulation is unnecessary, so skipping."
209                );
210                false
211            } else {
212                // CPU has `rdrand` and/or `rdseed`, which produce
213                // non-deterministic results by design.  We want to trap and
214                // emulate `cpuid` in the shim to mask this support so that
215                // managed programs (hopefully) don't use it.
216
217                // Test whether the current platform actually supports intercepting cpuid.
218                // This is dependent on the CPU model and kernel version.
219                let res = unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 0) };
220                match res {
221                    Ok(_) => {
222                        // Re-enable cpuid for ourselves.
223                        unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 1) }
224                            .unwrap_or_else(|e| panic!("Couldn't re-enable cpuid: {e:?}"));
225                        debug!(
226                            "CPU supports rdrand and/or rdseed, and platform supports intercepting cpuid. Enabling cpuid emulation."
227                        );
228                        true
229                    }
230                    Err(e) => {
231                        warn!(
232                            "CPU appears to support rdrand and/or rdseed, but platform doesn't support emulating cpuid ({e:?}). This may break determinism."
233                        );
234                        false
235                    }
236                }
237            }
238        };
239
240        let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
241            log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
242            native_preemption_config: if config.native_preemption_enabled() {
243                FfiOption::Some(NativePreemptionConfig {
244                    native_duration: config.native_preemption_native_interval()?,
245                    sim_duration: config.native_preemption_sim_interval(),
246                })
247            } else {
248                FfiOption::None
249            },
250            emulate_cpuid,
251        });
252
253        Ok(Self {
254            manager_config: Some(manager_config),
255            controller,
256            config,
257            raw_frequency,
258            native_tsc_frequency,
259            end_time,
260            data_path,
261            hosts_path,
262            preload_paths: Arc::new(preload_paths),
263            check_fd_usage: true,
264            check_mem_usage: true,
265            meminfo_file,
266            shmem,
267        })
268    }
269
270    pub fn run(
271        mut self,
272        status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
273    ) -> anyhow::Result<u32> {
274        let mut manager_config = self.manager_config.take().unwrap();
275
276        let min_runahead_config: Option<Duration> = self
277            .config
278            .experimental
279            .runahead
280            .flatten()
281            .map(|x| x.into());
282        let min_runahead_config: Option<SimulationTime> =
283            min_runahead_config.map(|x| x.try_into().unwrap());
284
285        let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
286        let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
287        let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
288
289        let smallest_latency = SimulationTime::from_nanos(
290            manager_config
291                .routing_info
292                .get_smallest_latency_ns()
293                .unwrap(),
294        );
295
296        let parallelism: usize = match self.config.general.parallelism.unwrap() {
297            0 => {
298                let cores = cpu::count_physical_cores().try_into().unwrap();
299                log::info!("The parallelism option was 0, so using parallelism={cores}");
300                cores
301            }
302            x => x.try_into().unwrap(),
303        };
304
305        // Set up the global DNS before building the hosts
306        let mut dns_builder = DnsBuilder::new();
307
308        // Assign the host id only once to guarantee it stays associated with its host.
309        let host_init: Vec<(&HostInfo, HostId)> = manager_config
310            .hosts
311            .iter()
312            .enumerate()
313            .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
314            .collect();
315
316        for (info, id) in &host_init {
317            // Extract the host address.
318            let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
319                unreachable!("IPv6 not supported");
320            };
321
322            // Register in the global DNS.
323            dns_builder
324                .register(*id, addr, info.name.clone())
325                .with_context(|| {
326                    format!(
327                        "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
328                        *id, addr, info.name
329                    )
330                })?;
331        }
332
333        // Convert to a global read-only DNS struct.
334        let dns = dns_builder.into_dns()?;
335
336        // Now build the hosts using the assigned host ids.
337        // note: there are several return points before we add these hosts to the scheduler and we
338        // would leak memory if we return before then, but not worrying about that since the issues
339        // will go away when we move the hosts to rust, and if we don't add them to the scheduler
340        // then it means there was an error and we're going to exit anyways
341        let mut hosts: Vec<_> = host_init
342            .iter()
343            .map(|(info, id)| {
344                self.build_host(*id, info)
345                    .with_context(|| format!("Failed to build host '{}'", info.name))
346            })
347            .collect::<anyhow::Result<_>>()?;
348
349        // shuffle the list of hosts to make sure that they are randomly assigned by the scheduler
350        hosts.shuffle(&mut manager_config.random);
351
352        let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
353
354        // an infinite iterator that always returns `<Option<Option<u32>>>::Some`
355        let cpu_iter =
356            std::iter::from_fn(|| {
357                // if cpu pinning is enabled, return Some(Some(cpu_id)), otherwise return Some(None)
358                Some(use_cpu_pinning.then(|| {
359                    u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
360                }))
361            });
362
363        // shadow is parallelized at the host level, so we don't need more parallelism than the
364        // number of hosts
365        let parallelism = std::cmp::min(parallelism, hosts.len());
366
367        // should have either all `Some` values, or all `None` values
368        let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
369        if cpus[0].is_some() {
370            log::debug!("Pinning to cpus: {:?}", cpus);
371            assert!(cpus.iter().all(|x| x.is_some()));
372        } else {
373            log::debug!("Not pinning to CPUs");
374            assert!(cpus.iter().all(|x| x.is_none()));
375        }
376        assert_eq!(cpus.len(), parallelism);
377
378        // set the simulation's global state
379        worker::WORKER_SHARED
380            .borrow_mut()
381            .replace(worker::WorkerShared {
382                ip_assignment: manager_config.ip_assignment,
383                routing_info: manager_config.routing_info,
384                host_bandwidths: manager_config.host_bandwidths,
385                // safe since the DNS type has an internal mutex
386                dns,
387                num_plugin_errors: AtomicU32::new(0),
388                // allow the status logger's state to be updated from anywhere
389                status_logger_state: status_logger_state.map(Arc::clone),
390                runahead: Runahead::new(
391                    self.config.experimental.use_dynamic_runahead.unwrap(),
392                    smallest_latency,
393                    min_runahead_config,
394                ),
395                child_pid_watcher: ChildPidWatcher::new(),
396                event_queues: hosts
397                    .iter()
398                    .map(|x| (x.id(), x.event_queue().clone()))
399                    .collect(),
400                bootstrap_end_time,
401                sim_end_time: self.end_time,
402            });
403
404        // scope used so that the scheduler is dropped before we log the global counters below
405        {
406            let mut scheduler = match self.config.experimental.scheduler.unwrap() {
407                configuration::Scheduler::ThreadPerHost => {
408                    std::thread_local! {
409                        /// A thread-local required by the thread-per-host scheduler.
410                        static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
411                    }
412                    Scheduler::ThreadPerHost(ThreadPerHostSched::new(
413                        &cpus,
414                        &SCHED_HOST_STORAGE,
415                        hosts,
416                    ))
417                }
418                configuration::Scheduler::ThreadPerCore => {
419                    Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
420                        &cpus,
421                        hosts,
422                        self.config.experimental.use_worker_spinning.unwrap(),
423                    ))
424                }
425            };
426
427            // initialize the thread-local Worker
428            scheduler.scope(|s| {
429                s.run(|thread_id| {
430                    worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
431                });
432            });
433
434            // the current simulation interval
435            let mut window = Some((
436                EmulatedTime::SIMULATION_START,
437                EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
438            ));
439
440            // the next event times for each thread; allocated here to avoid re-allocating each
441            // scheduling loop
442            let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
443                vec![AtomicRefCell::new(None); scheduler.parallelism()];
444
445            // how often to log heartbeat messages
446            let heartbeat_interval = self
447                .config
448                .general
449                .heartbeat_interval
450                .flatten()
451                .map(|x| Duration::from(x).try_into().unwrap());
452
453            let mut last_heartbeat = EmulatedTime::SIMULATION_START;
454            let mut time_of_last_usage_check = std::time::Instant::now();
455
456            // the scheduling loop
457            while let Some((window_start, window_end)) = window {
458                // update the status logger
459                let display_time = std::cmp::min(window_start, window_end);
460                worker::WORKER_SHARED
461                    .borrow()
462                    .as_ref()
463                    .unwrap()
464                    .update_status_logger(|state| {
465                        state.current = display_time;
466                    });
467
468                // run the events
469                scheduler.scope(|s| {
470                    // run the closure on each of the scheduler's threads
471                    s.run_with_data(
472                        &thread_next_event_times,
473                        // each call of the closure is given an abstract thread-specific host
474                        // iterator, and an element of 'thread_next_event_times'
475                        move |_, hosts, next_event_time| {
476                            let mut next_event_time = next_event_time.borrow_mut();
477
478                            worker::Worker::reset_next_event_time();
479                            worker::Worker::set_round_end_time(window_end);
480
481                            for_each_host(hosts, |host| {
482                                let host_next_event_time = {
483                                    host.lock_shmem();
484                                    host.execute(window_end);
485                                    let host_next_event_time = host.next_event_time();
486                                    host.unlock_shmem();
487                                    host_next_event_time
488                                };
489                                *next_event_time = [*next_event_time, host_next_event_time]
490                                    .into_iter()
491                                    .flatten() // filter out None
492                                    .reduce(std::cmp::min);
493                            });
494
495                            let packet_next_event_time = worker::Worker::get_next_event_time();
496
497                            *next_event_time = [*next_event_time, packet_next_event_time]
498                                .into_iter()
499                                .flatten() // filter out None
500                                .reduce(std::cmp::min);
501                        },
502                    );
503
504                    // log a heartbeat message every 'heartbeat_interval' amount of simulated time
505                    if let Some(heartbeat_interval) = heartbeat_interval {
506                        if window_start > last_heartbeat + heartbeat_interval {
507                            last_heartbeat = window_start;
508                            self.log_heartbeat(window_start);
509                        }
510                    }
511
512                    // check resource usage every 30 real seconds
513                    let current_time = std::time::Instant::now();
514                    if current_time.duration_since(time_of_last_usage_check)
515                        > Duration::from_secs(30)
516                    {
517                        time_of_last_usage_check = current_time;
518                        self.check_resource_usage();
519                    }
520                });
521
522                // get the minimum next event time for all threads (also resets the next event times
523                // to None while we have them borrowed)
524                let min_next_event_time = thread_next_event_times
525                    .iter()
526                    // the take() resets it to None for the next scheduling loop
527                    .filter_map(|x| x.borrow_mut().take())
528                    .reduce(std::cmp::min)
529                    .unwrap_or(EmulatedTime::MAX);
530
531                log::debug!(
532                    "Finished execution window [{}--{}], next event at {}",
533                    (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
534                    (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
535                    (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
536                );
537
538                // notify controller that we finished this round, and the time of our next event in
539                // order to fast-forward our execute window if possible
540                window = self
541                    .controller
542                    .manager_finished_current_round(min_next_event_time);
543            }
544
545            scheduler.scope(|s| {
546                s.run_with_hosts(move |_, hosts| {
547                    for_each_host(hosts, |host| {
548                        worker::Worker::set_current_time(self.end_time);
549                        host.free_all_applications();
550                        host.shutdown();
551                        worker::Worker::clear_current_time();
552                    });
553                });
554            });
555
556            // add each thread's local sim statistics to the global sim statistics.
557            scheduler.scope(|s| {
558                s.run(|_| {
559                    worker::Worker::add_to_global_sim_stats();
560                });
561            });
562
563            scheduler.join();
564        }
565
566        // simulation is finished, so update the status logger
567        worker::WORKER_SHARED
568            .borrow()
569            .as_ref()
570            .unwrap()
571            .update_status_logger(|state| {
572                state.current = self.end_time;
573            });
574
575        let num_plugin_errors = worker::WORKER_SHARED
576            .borrow()
577            .as_ref()
578            .unwrap()
579            .plugin_error_count();
580
581        // drop the simulation's global state
582        // must drop before the allocation counters have been checked
583        worker::WORKER_SHARED.borrow_mut().take();
584
585        // since the scheduler was dropped, all workers should have completed and the global object
586        // and syscall counters should have been updated
587
588        worker::with_global_sim_stats(|stats| {
589            if self.config.experimental.use_syscall_counters.unwrap() {
590                log::info!(
591                    "Global syscall counts: {}",
592                    stats.syscall_counts.lock().unwrap()
593                );
594            }
595            if self.config.experimental.use_object_counters.unwrap() {
596                let alloc_counts = stats.alloc_counts.lock().unwrap();
597                let dealloc_counts = stats.dealloc_counts.lock().unwrap();
598                log::info!("Global allocated object counts: {}", alloc_counts);
599                log::info!("Global deallocated object counts: {}", dealloc_counts);
600
601                if *alloc_counts == *dealloc_counts {
602                    log::info!("We allocated and deallocated the same number of objects :)");
603                } else {
604                    // don't change the formatting of this line as we search for it in test cases
605                    log::warn!("Memory leak detected");
606                }
607            }
608
609            let stats_filename = self.data_path.clone().join("sim-stats.json");
610            sim_stats::write_stats_to_file(&stats_filename, stats)
611        })?;
612
613        Ok(num_plugin_errors)
614    }
615
616    fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
617        let hostname = CString::new(&*host_info.name).unwrap();
618
619        // scope used to enforce drop order for pointers
620        let host = {
621            let params = HostParameters {
622                // the manager sets this ID
623                id: host_id,
624                // the manager sets this CPU frequency
625                cpu_frequency: self.raw_frequency,
626                node_seed: host_info.seed,
627                hostname,
628                node_id: host_info.network_node_id,
629                ip_addr: match host_info.ip_addr.unwrap() {
630                    std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
631                    // the config only allows ipv4 addresses, so this shouldn't happen
632                    std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
633                },
634                sim_end_time: self.end_time,
635                requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
636                requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
637                cpu_threshold: host_info.cpu_threshold,
638                cpu_precision: host_info.cpu_precision,
639                log_level: host_info
640                    .log_level
641                    .map(|x| x.to_c_loglevel())
642                    .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
643                pcap_config: host_info.pcap_config,
644                qdisc: host_info.qdisc,
645                init_sock_recv_buf_size: host_info.recv_buf_size,
646                autotune_recv_buf: host_info.autotune_recv_buf,
647                init_sock_send_buf_size: host_info.send_buf_size,
648                autotune_send_buf: host_info.autotune_send_buf,
649                native_tsc_frequency: self.native_tsc_frequency,
650                model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
651                max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
652                unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
653                unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
654                strace_logging_options: self.config.strace_logging_mode(),
655                shim_log_level: host_info
656                    .log_level
657                    .unwrap_or_else(|| self.config.general.log_level.unwrap())
658                    .to_c_loglevel(),
659                use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
660                use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
661                use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
662            };
663
664            Box::new(Host::new(
665                params,
666                &self.hosts_path,
667                self.raw_frequency,
668                self.shmem(),
669                self.preload_paths.clone(),
670            ))
671        };
672
673        host.lock_shmem();
674
675        for proc in &host_info.processes {
676            let plugin_path =
677                CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
678            let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
679            let pause_for_debugging = host_info.pause_for_debugging;
680
681            let argv: Vec<CString> = proc
682                .args
683                .iter()
684                .map(|x| CString::new(x.as_bytes()).unwrap())
685                .collect();
686
687            let envv: Vec<CString> = proc
688                .env
689                .clone()
690                .into_iter()
691                .map(|(x, y)| {
692                    let mut x: OsString = String::from(x).into();
693                    x.push("=");
694                    x.push(y);
695                    CString::new(x.as_bytes()).unwrap()
696                })
697                .collect();
698
699            host.continue_execution_timer();
700
701            host.add_application(
702                proc.start_time,
703                proc.shutdown_time,
704                proc.shutdown_signal,
705                plugin_name,
706                plugin_path,
707                argv,
708                envv,
709                pause_for_debugging,
710                proc.expected_final_state,
711            );
712
713            host.stop_execution_timer();
714        }
715
716        host.unlock_shmem();
717
718        Ok(host)
719    }
720
721    fn log_heartbeat(&mut self, now: EmulatedTime) {
722        let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
723        if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
724            let err = nix::errno::Errno::last();
725            log::warn!("Unable to get shadow's resource usage: {}", err);
726            return;
727        }
728
729        // the sysinfo syscall also would give memory usage info, but it's less detailed
730        let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
731
732        // the linux man page says this is in kilobytes, but it seems to be in kibibytes
733        let max_memory = (resources.ru_maxrss as f64) / 1048576.0; // KiB->GiB
734        let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
735        let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
736
737        // tornettools assumes a specific log format for this message, so don't change it without
738        // testing that tornettools can parse resource usage information from the shadow log
739        // https://github.com/shadow/tornettools/blob/6c00856c3f08899da30bfc452b6a055572cc4536/tornettools/parse_rusage.py#L58-L86
740        log::info!(
741            "Process resource usage at simtime {} reported by getrusage(): \
742            ru_maxrss={:.03} GiB, \
743            ru_utime={:.03} minutes, \
744            ru_stime={:.03} minutes, \
745            ru_nvcsw={}, \
746            ru_nivcsw={}",
747            (now - EmulatedTime::SIMULATION_START).as_nanos(),
748            max_memory,
749            user_time_minutes,
750            system_time_minutes,
751            resources.ru_nvcsw,
752            resources.ru_nivcsw,
753        );
754
755        // there are different ways of calculating system memory usage (for example 'free' will
756        // calculate used memory differently than 'htop'), so we'll log the values we think are
757        // useful, and something parsing the log can calculate whatever it wants
758        log::info!(
759            "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
760            (now - EmulatedTime::SIMULATION_START).as_nanos(),
761            serde_json::to_string(&mem_info).unwrap(),
762        );
763    }
764
765    fn check_resource_usage(&mut self) {
766        if self.check_fd_usage {
767            match self.fd_usage() {
768                // if more than 90% in use
769                Ok((usage, limit)) if usage > limit * 90 / 100 => {
770                    log::warn!(
771                        "Using more than 90% ({usage}/{limit}) of available file descriptors"
772                    );
773                    self.check_fd_usage = false;
774                }
775                Err(e) => {
776                    log::warn!("Unable to check fd usage: {e}");
777                    self.check_fd_usage = false;
778                }
779                Ok(_) => {}
780            }
781        }
782
783        if self.check_mem_usage {
784            match self.memory_remaining() {
785                // if less than 500 MiB available
786                Ok(remaining) if remaining < 500 * 1024 * 1024 => {
787                    log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
788                    self.check_mem_usage = false;
789                }
790                Err(e) => {
791                    log::warn!("Unable to check memory usage: {e}");
792                    self.check_mem_usage = false;
793                }
794                Ok(_) => {}
795            }
796        }
797    }
798
799    /// Returns a tuple of (usage, limit).
800    fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
801        let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
802
803        let mut fd_count: u64 = 0;
804        for entry in dir {
805            // short-circuit and return on error
806            entry.context("Failed to read entry in '/proc/self/fd'")?;
807            fd_count += 1;
808        }
809
810        let (soft_limit, _) =
811            nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
812                .context("Failed to get the fd limit")?;
813
814        Ok((fd_count, soft_limit))
815    }
816
817    /// Returns the number of bytes remaining.
818    fn memory_remaining(&mut self) -> anyhow::Result<u64> {
819        let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
820            .context("Failed to get the page size")?
821            .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
822
823        let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
824            .context("Failed to get the number of available pages of physical memory")?
825            .ok_or_else(|| {
826                anyhow::anyhow!(
827                    "Failed to get the number of available pages of physical memory (no errno)"
828                )
829            })?;
830
831        let page_size: u64 = page_size.try_into().unwrap();
832        let avl_pages: u64 = avl_pages.try_into().unwrap();
833
834        Ok(page_size * avl_pages)
835    }
836
837    pub fn shmem(&self) -> &ShMemBlock<ManagerShmem> {
838        &self.shmem
839    }
840}
841
842pub struct ManagerConfig {
843    // deterministic source of randomness for this manager
844    pub random: Xoshiro256PlusPlus,
845
846    // map of ip addresses to graph nodes
847    pub ip_assignment: IpAssignment<u32>,
848
849    // routing information for paths between graph nodes
850    pub routing_info: RoutingInfo<u32>,
851
852    // bandwidths of hosts at ip addresses
853    pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
854
855    // a list of hosts and their processes
856    pub hosts: Vec<HostInfo>,
857}
858
859/// Helper function to initialize the global [`Host`] before running the closure.
860fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
861    host_iter.for_each(|host| {
862        worker::Worker::set_active_host(host);
863        worker::Worker::with_active_host(|host| {
864            f(host);
865        })
866        .unwrap();
867        worker::Worker::take_active_host()
868    });
869}
870
871/// Get the raw speed of the experiment machine.
872fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
873    const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
874    let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
875    Ok(khz * 1000)
876}
877
878fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
879    let libname_c = CString::new(libname).unwrap();
880    let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
881
882    // scope needed to make sure the CStr is dropped before we free libpath_c
883    let libpath = if !libpath_c.is_null() {
884        let libpath = unsafe { CStr::from_ptr(libpath_c) };
885        let libpath = OsStr::from_bytes(libpath.to_bytes());
886        Some(PathBuf::from(libpath.to_os_string()))
887    } else {
888        None
889    };
890
891    unsafe { libc::free(libpath_c as *mut libc::c_void) };
892
893    let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
894
895    let bytes = libpath.as_os_str().as_bytes();
896    if bytes.iter().any(|c| *c == b' ' || *c == b':') {
897        // These are unescapable separators in LD_PRELOAD.
898        anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
899    }
900
901    log::debug!(
902        "Found required preload library {} at path {}",
903        libname,
904        libpath.display(),
905    );
906
907    Ok(libpath)
908}