shadow_rs/core/
manager.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use log::warn;
13use rand::seq::SliceRandom;
14use rand_xoshiro::Xoshiro256PlusPlus;
15use scheduler::thread_per_core::ThreadPerCoreSched;
16use scheduler::thread_per_host::ThreadPerHostSched;
17use scheduler::{HostIter, Scheduler};
18use shadow_shim_helper_rs::HostId;
19use shadow_shim_helper_rs::emulated_time::EmulatedTime;
20use shadow_shim_helper_rs::option::FfiOption;
21use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
22use shadow_shim_helper_rs::simulation_time::SimulationTime;
23use shadow_shmem::allocator::ShMemBlock;
24
25use crate::core::configuration::{self, ConfigOptions, Flatten};
26use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
27use crate::core::cpu;
28use crate::core::resource_usage;
29use crate::core::runahead::Runahead;
30use crate::core::sim_config::{Bandwidth, HostInfo};
31use crate::core::sim_stats;
32use crate::core::worker;
33use crate::cshadow as c;
34use crate::host::host::{Host, HostParameters};
35use crate::network::dns::DnsBuilder;
36use crate::network::graph::{IpAssignment, RoutingInfo};
37use crate::utility;
38use crate::utility::childpid_watcher::ChildPidWatcher;
39use crate::utility::status_bar::Status;
40
41pub struct Manager<'a> {
42    manager_config: Option<ManagerConfig>,
43    controller: &'a Controller<'a>,
44    config: &'a ConfigOptions,
45
46    raw_frequency: u64,
47    native_tsc_frequency: u64,
48    end_time: EmulatedTime,
49
50    data_path: PathBuf,
51    hosts_path: PathBuf,
52
53    preload_paths: Arc<Vec<PathBuf>>,
54
55    check_fd_usage: bool,
56    check_mem_usage: bool,
57
58    meminfo_file: std::fs::File,
59    shmem: ShMemBlock<'static, ManagerShmem>,
60}
61
62impl<'a> Manager<'a> {
63    pub fn new(
64        manager_config: ManagerConfig,
65        controller: &'a Controller<'a>,
66        config: &'a ConfigOptions,
67        end_time: EmulatedTime,
68    ) -> anyhow::Result<Self> {
69        // get the system's CPU frequency
70        let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
71            let default_freq = 2_500_000_000; // 2.5 GHz
72            log::debug!(
73                "Failed to get raw CPU frequency, using {} Hz instead: {}",
74                default_freq,
75                e
76            );
77            default_freq
78        });
79
80        let native_tsc_frequency = if let Some(f) = shadow_tsc::Tsc::native_cycles_per_second() {
81            f
82        } else {
83            warn!(
84                "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
85            );
86            raw_frequency
87        };
88
89        let mut preload_paths = Vec::new();
90
91        // we always preload the injector lib to ensure that the shim is loaded into the managed
92        // processes
93        const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
94        preload_paths.push(
95            get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
96                format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
97            })?,
98        );
99
100        // preload libc lib if option is enabled
101        const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
102        if config.experimental.use_preload_libc.unwrap() {
103            let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
104                format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
105            })?;
106            preload_paths.push(path);
107        } else {
108            log::info!("Preloading the libc library is disabled");
109        };
110
111        // preload openssl rng lib if option is enabled
112        const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
113        if config.experimental.use_preload_openssl_rng.unwrap() {
114            let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
115                format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
116            })?;
117            preload_paths.push(path);
118        } else {
119            log::info!("Preloading the openssl rng library is disabled");
120        };
121
122        // preload openssl crypto lib if option is enabled
123        const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
124        if config.experimental.use_preload_openssl_crypto.unwrap() {
125            let path =
126                get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
127                    format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
128                })?;
129            preload_paths.push(path);
130        } else {
131            log::info!("Preloading the openssl crypto library is disabled");
132        };
133
134        // use the working dir to generate absolute paths
135        let cwd = std::env::current_dir()?;
136        let template_path = config
137            .general
138            .template_directory
139            .flatten_ref()
140            .map(|x| cwd.clone().join(x));
141        let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
142        let hosts_path = data_path.join("hosts");
143
144        if let Some(template_path) = template_path {
145            log::debug!(
146                "Copying template directory '{}' to '{}'",
147                template_path.display(),
148                data_path.display()
149            );
150
151            // copy the template directory to the data directory path
152            utility::copy_dir_all(&template_path, &data_path).with_context(|| {
153                format!(
154                    "Failed to copy template directory '{}' to '{}'",
155                    template_path.display(),
156                    data_path.display()
157                )
158            })?;
159
160            // create the hosts directory if it doesn't exist
161            let result = std::fs::create_dir(&hosts_path);
162            if let Err(e) = result {
163                if e.kind() != std::io::ErrorKind::AlreadyExists {
164                    return Err(e).context(format!(
165                        "Failed to create hosts directory '{}'",
166                        hosts_path.display()
167                    ));
168                }
169            }
170        } else {
171            // create the data and hosts directories
172            std::fs::create_dir(&data_path).with_context(|| {
173                format!("Failed to create data directory '{}'", data_path.display())
174            })?;
175            std::fs::create_dir(&hosts_path).with_context(|| {
176                format!(
177                    "Failed to create hosts directory '{}'",
178                    hosts_path.display(),
179                )
180            })?;
181        }
182
183        // save the processed config as yaml
184        let config_out_filename = data_path.join("processed-config.yaml");
185        let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
186            format!("Failed to create file '{}'", config_out_filename.display())
187        })?;
188
189        serde_yaml::to_writer(config_out_file, &config).with_context(|| {
190            format!(
191                "Failed to write processed config yaml to file '{}'",
192                config_out_filename.display()
193            )
194        })?;
195
196        let meminfo_file =
197            std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
198
199        let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
200            log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
201            native_preemption_config: if config.native_preemption_enabled() {
202                FfiOption::Some(NativePreemptionConfig {
203                    native_duration: config.native_preemption_native_interval()?,
204                    sim_duration: config.native_preemption_sim_interval(),
205                })
206            } else {
207                FfiOption::None
208            },
209        });
210
211        Ok(Self {
212            manager_config: Some(manager_config),
213            controller,
214            config,
215            raw_frequency,
216            native_tsc_frequency,
217            end_time,
218            data_path,
219            hosts_path,
220            preload_paths: Arc::new(preload_paths),
221            check_fd_usage: true,
222            check_mem_usage: true,
223            meminfo_file,
224            shmem,
225        })
226    }
227
228    pub fn run(
229        mut self,
230        status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
231    ) -> anyhow::Result<u32> {
232        let mut manager_config = self.manager_config.take().unwrap();
233
234        let min_runahead_config: Option<Duration> = self
235            .config
236            .experimental
237            .runahead
238            .flatten()
239            .map(|x| x.into());
240        let min_runahead_config: Option<SimulationTime> =
241            min_runahead_config.map(|x| x.try_into().unwrap());
242
243        let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
244        let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
245        let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
246
247        let smallest_latency = SimulationTime::from_nanos(
248            manager_config
249                .routing_info
250                .get_smallest_latency_ns()
251                .unwrap(),
252        );
253
254        let parallelism: usize = match self.config.general.parallelism.unwrap() {
255            0 => {
256                let cores = cpu::count_physical_cores().try_into().unwrap();
257                log::info!("The parallelism option was 0, so using parallelism={cores}");
258                cores
259            }
260            x => x.try_into().unwrap(),
261        };
262
263        // Set up the global DNS before building the hosts
264        let mut dns_builder = DnsBuilder::new();
265
266        // Assign the host id only once to guarantee it stays associated with its host.
267        let host_init: Vec<(&HostInfo, HostId)> = manager_config
268            .hosts
269            .iter()
270            .enumerate()
271            .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
272            .collect();
273
274        for (info, id) in &host_init {
275            // Extract the host address.
276            let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
277                unreachable!("IPv6 not supported");
278            };
279
280            // Register in the global DNS.
281            dns_builder
282                .register(*id, addr, info.name.clone())
283                .with_context(|| {
284                    format!(
285                        "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
286                        *id, addr, info.name
287                    )
288                })?;
289        }
290
291        // Convert to a global read-only DNS struct.
292        let dns = dns_builder.into_dns()?;
293
294        // Now build the hosts using the assigned host ids.
295        // note: there are several return points before we add these hosts to the scheduler and we
296        // would leak memory if we return before then, but not worrying about that since the issues
297        // will go away when we move the hosts to rust, and if we don't add them to the scheduler
298        // then it means there was an error and we're going to exit anyways
299        let mut hosts: Vec<_> = host_init
300            .iter()
301            .map(|(info, id)| {
302                self.build_host(*id, info)
303                    .with_context(|| format!("Failed to build host '{}'", info.name))
304            })
305            .collect::<anyhow::Result<_>>()?;
306
307        // shuffle the list of hosts to make sure that they are randomly assigned by the scheduler
308        hosts.shuffle(&mut manager_config.random);
309
310        let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
311
312        // an infinite iterator that always returns `<Option<Option<u32>>>::Some`
313        let cpu_iter =
314            std::iter::from_fn(|| {
315                // if cpu pinning is enabled, return Some(Some(cpu_id)), otherwise return Some(None)
316                Some(use_cpu_pinning.then(|| {
317                    u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
318                }))
319            });
320
321        // shadow is parallelized at the host level, so we don't need more parallelism than the
322        // number of hosts
323        let parallelism = std::cmp::min(parallelism, hosts.len());
324
325        // should have either all `Some` values, or all `None` values
326        let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
327        if cpus[0].is_some() {
328            log::debug!("Pinning to cpus: {:?}", cpus);
329            assert!(cpus.iter().all(|x| x.is_some()));
330        } else {
331            log::debug!("Not pinning to CPUs");
332            assert!(cpus.iter().all(|x| x.is_none()));
333        }
334        assert_eq!(cpus.len(), parallelism);
335
336        // set the simulation's global state
337        worker::WORKER_SHARED
338            .borrow_mut()
339            .replace(worker::WorkerShared {
340                ip_assignment: manager_config.ip_assignment,
341                routing_info: manager_config.routing_info,
342                host_bandwidths: manager_config.host_bandwidths,
343                // safe since the DNS type has an internal mutex
344                dns,
345                num_plugin_errors: AtomicU32::new(0),
346                // allow the status logger's state to be updated from anywhere
347                status_logger_state: status_logger_state.map(Arc::clone),
348                runahead: Runahead::new(
349                    self.config.experimental.use_dynamic_runahead.unwrap(),
350                    smallest_latency,
351                    min_runahead_config,
352                ),
353                child_pid_watcher: ChildPidWatcher::new(),
354                event_queues: hosts
355                    .iter()
356                    .map(|x| (x.id(), x.event_queue().clone()))
357                    .collect(),
358                bootstrap_end_time,
359                sim_end_time: self.end_time,
360            });
361
362        // scope used so that the scheduler is dropped before we log the global counters below
363        {
364            let mut scheduler = match self.config.experimental.scheduler.unwrap() {
365                configuration::Scheduler::ThreadPerHost => {
366                    std::thread_local! {
367                        /// A thread-local required by the thread-per-host scheduler.
368                        static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
369                    }
370                    Scheduler::ThreadPerHost(ThreadPerHostSched::new(
371                        &cpus,
372                        &SCHED_HOST_STORAGE,
373                        hosts,
374                    ))
375                }
376                configuration::Scheduler::ThreadPerCore => {
377                    Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
378                        &cpus,
379                        hosts,
380                        self.config.experimental.use_worker_spinning.unwrap(),
381                    ))
382                }
383            };
384
385            // initialize the thread-local Worker
386            scheduler.scope(|s| {
387                s.run(|thread_id| {
388                    worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
389                });
390            });
391
392            // the current simulation interval
393            let mut window = Some((
394                EmulatedTime::SIMULATION_START,
395                EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
396            ));
397
398            // the next event times for each thread; allocated here to avoid re-allocating each
399            // scheduling loop
400            let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
401                vec![AtomicRefCell::new(None); scheduler.parallelism()];
402
403            // how often to log heartbeat messages
404            let heartbeat_interval = self
405                .config
406                .general
407                .heartbeat_interval
408                .flatten()
409                .map(|x| Duration::from(x).try_into().unwrap());
410
411            let mut last_heartbeat = EmulatedTime::SIMULATION_START;
412            let mut time_of_last_usage_check = std::time::Instant::now();
413
414            // the scheduling loop
415            while let Some((window_start, window_end)) = window {
416                // update the status logger
417                let display_time = std::cmp::min(window_start, window_end);
418                worker::WORKER_SHARED
419                    .borrow()
420                    .as_ref()
421                    .unwrap()
422                    .update_status_logger(|state| {
423                        state.current = display_time;
424                    });
425
426                // run the events
427                scheduler.scope(|s| {
428                    // run the closure on each of the scheduler's threads
429                    s.run_with_data(
430                        &thread_next_event_times,
431                        // each call of the closure is given an abstract thread-specific host
432                        // iterator, and an element of 'thread_next_event_times'
433                        move |_, hosts, next_event_time| {
434                            let mut next_event_time = next_event_time.borrow_mut();
435
436                            worker::Worker::reset_next_event_time();
437                            worker::Worker::set_round_end_time(window_end);
438
439                            for_each_host(hosts, |host| {
440                                let host_next_event_time = {
441                                    host.lock_shmem();
442                                    host.execute(window_end);
443                                    let host_next_event_time = host.next_event_time();
444                                    host.unlock_shmem();
445                                    host_next_event_time
446                                };
447                                *next_event_time = [*next_event_time, host_next_event_time]
448                                    .into_iter()
449                                    .flatten() // filter out None
450                                    .reduce(std::cmp::min);
451                            });
452
453                            let packet_next_event_time = worker::Worker::get_next_event_time();
454
455                            *next_event_time = [*next_event_time, packet_next_event_time]
456                                .into_iter()
457                                .flatten() // filter out None
458                                .reduce(std::cmp::min);
459                        },
460                    );
461
462                    // log a heartbeat message every 'heartbeat_interval' amount of simulated time
463                    if let Some(heartbeat_interval) = heartbeat_interval {
464                        if window_start > last_heartbeat + heartbeat_interval {
465                            last_heartbeat = window_start;
466                            self.log_heartbeat(window_start);
467                        }
468                    }
469
470                    // check resource usage every 30 real seconds
471                    let current_time = std::time::Instant::now();
472                    if current_time.duration_since(time_of_last_usage_check)
473                        > Duration::from_secs(30)
474                    {
475                        time_of_last_usage_check = current_time;
476                        self.check_resource_usage();
477                    }
478                });
479
480                // get the minimum next event time for all threads (also resets the next event times
481                // to None while we have them borrowed)
482                let min_next_event_time = thread_next_event_times
483                    .iter()
484                    // the take() resets it to None for the next scheduling loop
485                    .filter_map(|x| x.borrow_mut().take())
486                    .reduce(std::cmp::min)
487                    .unwrap_or(EmulatedTime::MAX);
488
489                log::debug!(
490                    "Finished execution window [{}--{}], next event at {}",
491                    (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
492                    (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
493                    (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
494                );
495
496                // notify controller that we finished this round, and the time of our next event in
497                // order to fast-forward our execute window if possible
498                window = self
499                    .controller
500                    .manager_finished_current_round(min_next_event_time);
501            }
502
503            scheduler.scope(|s| {
504                s.run_with_hosts(move |_, hosts| {
505                    for_each_host(hosts, |host| {
506                        worker::Worker::set_current_time(self.end_time);
507                        host.free_all_applications();
508                        host.shutdown();
509                        worker::Worker::clear_current_time();
510                    });
511                });
512            });
513
514            // add each thread's local sim statistics to the global sim statistics.
515            scheduler.scope(|s| {
516                s.run(|_| {
517                    worker::Worker::add_to_global_sim_stats();
518                });
519            });
520
521            scheduler.join();
522        }
523
524        // simulation is finished, so update the status logger
525        worker::WORKER_SHARED
526            .borrow()
527            .as_ref()
528            .unwrap()
529            .update_status_logger(|state| {
530                state.current = self.end_time;
531            });
532
533        let num_plugin_errors = worker::WORKER_SHARED
534            .borrow()
535            .as_ref()
536            .unwrap()
537            .plugin_error_count();
538
539        // drop the simulation's global state
540        // must drop before the allocation counters have been checked
541        worker::WORKER_SHARED.borrow_mut().take();
542
543        // since the scheduler was dropped, all workers should have completed and the global object
544        // and syscall counters should have been updated
545
546        worker::with_global_sim_stats(|stats| {
547            if self.config.experimental.use_syscall_counters.unwrap() {
548                log::info!(
549                    "Global syscall counts: {}",
550                    stats.syscall_counts.lock().unwrap()
551                );
552            }
553            if self.config.experimental.use_object_counters.unwrap() {
554                let alloc_counts = stats.alloc_counts.lock().unwrap();
555                let dealloc_counts = stats.dealloc_counts.lock().unwrap();
556                log::info!("Global allocated object counts: {}", alloc_counts);
557                log::info!("Global deallocated object counts: {}", dealloc_counts);
558
559                if *alloc_counts == *dealloc_counts {
560                    log::info!("We allocated and deallocated the same number of objects :)");
561                } else {
562                    // don't change the formatting of this line as we search for it in test cases
563                    log::warn!("Memory leak detected");
564                }
565            }
566
567            let stats_filename = self.data_path.clone().join("sim-stats.json");
568            sim_stats::write_stats_to_file(&stats_filename, stats)
569        })?;
570
571        Ok(num_plugin_errors)
572    }
573
574    fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
575        let hostname = CString::new(&*host_info.name).unwrap();
576
577        // scope used to enforce drop order for pointers
578        let host = {
579            let params = HostParameters {
580                // the manager sets this ID
581                id: host_id,
582                // the manager sets this CPU frequency
583                cpu_frequency: self.raw_frequency,
584                node_seed: host_info.seed,
585                hostname,
586                node_id: host_info.network_node_id,
587                ip_addr: match host_info.ip_addr.unwrap() {
588                    std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
589                    // the config only allows ipv4 addresses, so this shouldn't happen
590                    std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
591                },
592                sim_end_time: self.end_time,
593                requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
594                requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
595                cpu_threshold: host_info.cpu_threshold,
596                cpu_precision: host_info.cpu_precision,
597                log_level: host_info
598                    .log_level
599                    .map(|x| x.to_c_loglevel())
600                    .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
601                pcap_config: host_info.pcap_config,
602                qdisc: host_info.qdisc,
603                init_sock_recv_buf_size: host_info.recv_buf_size,
604                autotune_recv_buf: host_info.autotune_recv_buf,
605                init_sock_send_buf_size: host_info.send_buf_size,
606                autotune_send_buf: host_info.autotune_send_buf,
607                native_tsc_frequency: self.native_tsc_frequency,
608                model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
609                max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
610                unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
611                unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
612                strace_logging_options: self.config.strace_logging_mode(),
613                shim_log_level: host_info
614                    .log_level
615                    .unwrap_or_else(|| self.config.general.log_level.unwrap())
616                    .to_c_loglevel(),
617                use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
618                use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
619                use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
620            };
621
622            Box::new(Host::new(
623                params,
624                &self.hosts_path,
625                self.raw_frequency,
626                self.shmem(),
627                self.preload_paths.clone(),
628            ))
629        };
630
631        host.lock_shmem();
632
633        for proc in &host_info.processes {
634            let plugin_path =
635                CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
636            let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
637            let pause_for_debugging = host_info.pause_for_debugging;
638
639            let argv: Vec<CString> = proc
640                .args
641                .iter()
642                .map(|x| CString::new(x.as_bytes()).unwrap())
643                .collect();
644
645            let envv: Vec<CString> = proc
646                .env
647                .clone()
648                .into_iter()
649                .map(|(x, y)| {
650                    let mut x: OsString = String::from(x).into();
651                    x.push("=");
652                    x.push(y);
653                    CString::new(x.as_bytes()).unwrap()
654                })
655                .collect();
656
657            host.continue_execution_timer();
658
659            host.add_application(
660                proc.start_time,
661                proc.shutdown_time,
662                proc.shutdown_signal,
663                plugin_name,
664                plugin_path,
665                argv,
666                envv,
667                pause_for_debugging,
668                proc.expected_final_state,
669            );
670
671            host.stop_execution_timer();
672        }
673
674        host.unlock_shmem();
675
676        Ok(host)
677    }
678
679    fn log_heartbeat(&mut self, now: EmulatedTime) {
680        let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
681        if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
682            let err = nix::errno::Errno::last();
683            log::warn!("Unable to get shadow's resource usage: {}", err);
684            return;
685        }
686
687        // the sysinfo syscall also would give memory usage info, but it's less detailed
688        let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
689
690        // the linux man page says this is in kilobytes, but it seems to be in kibibytes
691        let max_memory = (resources.ru_maxrss as f64) / 1048576.0; // KiB->GiB
692        let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
693        let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
694
695        // tornettools assumes a specific log format for this message, so don't change it without
696        // testing that tornettools can parse resource usage information from the shadow log
697        // https://github.com/shadow/tornettools/blob/6c00856c3f08899da30bfc452b6a055572cc4536/tornettools/parse_rusage.py#L58-L86
698        log::info!(
699            "Process resource usage at simtime {} reported by getrusage(): \
700            ru_maxrss={:.03} GiB, \
701            ru_utime={:.03} minutes, \
702            ru_stime={:.03} minutes, \
703            ru_nvcsw={}, \
704            ru_nivcsw={}",
705            (now - EmulatedTime::SIMULATION_START).as_nanos(),
706            max_memory,
707            user_time_minutes,
708            system_time_minutes,
709            resources.ru_nvcsw,
710            resources.ru_nivcsw,
711        );
712
713        // there are different ways of calculating system memory usage (for example 'free' will
714        // calculate used memory differently than 'htop'), so we'll log the values we think are
715        // useful, and something parsing the log can calculate whatever it wants
716        log::info!(
717            "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
718            (now - EmulatedTime::SIMULATION_START).as_nanos(),
719            serde_json::to_string(&mem_info).unwrap(),
720        );
721    }
722
723    fn check_resource_usage(&mut self) {
724        if self.check_fd_usage {
725            match self.fd_usage() {
726                // if more than 90% in use
727                Ok((usage, limit)) if usage > limit * 90 / 100 => {
728                    log::warn!(
729                        "Using more than 90% ({usage}/{limit}) of available file descriptors"
730                    );
731                    self.check_fd_usage = false;
732                }
733                Err(e) => {
734                    log::warn!("Unable to check fd usage: {e}");
735                    self.check_fd_usage = false;
736                }
737                Ok(_) => {}
738            }
739        }
740
741        if self.check_mem_usage {
742            match self.memory_remaining() {
743                // if less than 500 MiB available
744                Ok(remaining) if remaining < 500 * 1024 * 1024 => {
745                    log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
746                    self.check_mem_usage = false;
747                }
748                Err(e) => {
749                    log::warn!("Unable to check memory usage: {e}");
750                    self.check_mem_usage = false;
751                }
752                Ok(_) => {}
753            }
754        }
755    }
756
757    /// Returns a tuple of (usage, limit).
758    fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
759        let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
760
761        let mut fd_count: u64 = 0;
762        for entry in dir {
763            // short-circuit and return on error
764            entry.context("Failed to read entry in '/proc/self/fd'")?;
765            fd_count += 1;
766        }
767
768        let (soft_limit, _) =
769            nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
770                .context("Failed to get the fd limit")?;
771
772        Ok((fd_count, soft_limit))
773    }
774
775    /// Returns the number of bytes remaining.
776    fn memory_remaining(&mut self) -> anyhow::Result<u64> {
777        let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
778            .context("Failed to get the page size")?
779            .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
780
781        let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
782            .context("Failed to get the number of available pages of physical memory")?
783            .ok_or_else(|| {
784                anyhow::anyhow!(
785                    "Failed to get the number of available pages of physical memory (no errno)"
786                )
787            })?;
788
789        let page_size: u64 = page_size.try_into().unwrap();
790        let avl_pages: u64 = avl_pages.try_into().unwrap();
791
792        Ok(page_size * avl_pages)
793    }
794
795    pub fn shmem(&self) -> &ShMemBlock<ManagerShmem> {
796        &self.shmem
797    }
798}
799
800pub struct ManagerConfig {
801    // deterministic source of randomness for this manager
802    pub random: Xoshiro256PlusPlus,
803
804    // map of ip addresses to graph nodes
805    pub ip_assignment: IpAssignment<u32>,
806
807    // routing information for paths between graph nodes
808    pub routing_info: RoutingInfo<u32>,
809
810    // bandwidths of hosts at ip addresses
811    pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
812
813    // a list of hosts and their processes
814    pub hosts: Vec<HostInfo>,
815}
816
817/// Helper function to initialize the global [`Host`] before running the closure.
818fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
819    host_iter.for_each(|host| {
820        worker::Worker::set_active_host(host);
821        worker::Worker::with_active_host(|host| {
822            f(host);
823        })
824        .unwrap();
825        worker::Worker::take_active_host()
826    });
827}
828
829/// Get the raw speed of the experiment machine.
830fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
831    const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
832    let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
833    Ok(khz * 1000)
834}
835
836fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
837    let libname_c = CString::new(libname).unwrap();
838    let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
839
840    // scope needed to make sure the CStr is dropped before we free libpath_c
841    let libpath = if !libpath_c.is_null() {
842        let libpath = unsafe { CStr::from_ptr(libpath_c) };
843        let libpath = OsStr::from_bytes(libpath.to_bytes());
844        Some(PathBuf::from(libpath.to_os_string()))
845    } else {
846        None
847    };
848
849    unsafe { libc::free(libpath_c as *mut libc::c_void) };
850
851    let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
852
853    let bytes = libpath.as_os_str().as_bytes();
854    if bytes.iter().any(|c| *c == b' ' || *c == b':') {
855        // These are unescapable separators in LD_PRELOAD.
856        anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
857    }
858
859    log::debug!(
860        "Found required preload library {} at path {}",
861        libname,
862        libpath.display(),
863    );
864
865    Ok(libpath)
866}