1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use linux_api::prctl::ArchPrctlOp;
13use log::{debug, warn};
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16use scheduler::thread_per_core::ThreadPerCoreSched;
17use scheduler::thread_per_host::ThreadPerHostSched;
18use scheduler::{HostIter, Scheduler};
19use shadow_shim_helper_rs::HostId;
20use shadow_shim_helper_rs::emulated_time::EmulatedTime;
21use shadow_shim_helper_rs::option::FfiOption;
22use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
23use shadow_shim_helper_rs::simulation_time::SimulationTime;
24use shadow_shmem::allocator::ShMemBlock;
25
26use crate::core::configuration::{self, ConfigOptions, Flatten};
27use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
28use crate::core::cpu;
29use crate::core::resource_usage;
30use crate::core::runahead::Runahead;
31use crate::core::sim_config::{Bandwidth, HostInfo};
32use crate::core::sim_stats;
33use crate::core::worker;
34use crate::cshadow as c;
35use crate::host::host::{Host, HostParameters};
36use crate::network::dns::DnsBuilder;
37use crate::network::graph::{IpAssignment, RoutingInfo};
38use crate::utility;
39use crate::utility::childpid_watcher::ChildPidWatcher;
40use crate::utility::status_bar::Status;
41
42pub struct Manager<'a> {
43 manager_config: Option<ManagerConfig>,
44 controller: &'a Controller<'a>,
45 config: &'a ConfigOptions,
46
47 raw_frequency: u64,
48 native_tsc_frequency: u64,
49 end_time: EmulatedTime,
50
51 data_path: PathBuf,
52 hosts_path: PathBuf,
53
54 preload_paths: Arc<Vec<PathBuf>>,
55
56 check_fd_usage: bool,
57 check_mem_usage: bool,
58
59 meminfo_file: std::fs::File,
60 shmem: ShMemBlock<'static, ManagerShmem>,
61}
62
63impl<'a> Manager<'a> {
64 pub fn new(
65 manager_config: ManagerConfig,
66 controller: &'a Controller<'a>,
67 config: &'a ConfigOptions,
68 end_time: EmulatedTime,
69 ) -> anyhow::Result<Self> {
70 let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
72 let default_freq = 2_500_000_000; log::debug!("Failed to get raw CPU frequency, using {default_freq} Hz instead: {e}");
74 default_freq
75 });
76
77 let native_tsc_frequency = if let Some(f) = asm_util::tsc::Tsc::native_cycles_per_second() {
78 f
79 } else {
80 warn!(
81 "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
82 );
83 raw_frequency
84 };
85
86 let mut preload_paths = Vec::new();
87
88 const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
91 preload_paths.push(
92 get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
93 format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
94 })?,
95 );
96
97 const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
99 if config.experimental.use_preload_libc.unwrap() {
100 let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
101 format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
102 })?;
103 preload_paths.push(path);
104 } else {
105 log::info!("Preloading the libc library is disabled");
106 };
107
108 const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
110 if config.experimental.use_preload_openssl_rng.unwrap() {
111 let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
112 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
113 })?;
114 preload_paths.push(path);
115 } else {
116 log::info!("Preloading the openssl rng library is disabled");
117 };
118
119 const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
121 if config.experimental.use_preload_openssl_crypto.unwrap() {
122 let path =
123 get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
124 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
125 })?;
126 preload_paths.push(path);
127 } else {
128 log::info!("Preloading the openssl crypto library is disabled");
129 };
130
131 let cwd = std::env::current_dir()?;
133 let template_path = config
134 .general
135 .template_directory
136 .flatten_ref()
137 .map(|x| cwd.clone().join(x));
138 let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
139 let hosts_path = data_path.join("hosts");
140
141 if let Some(template_path) = template_path {
142 log::debug!(
143 "Copying template directory '{}' to '{}'",
144 template_path.display(),
145 data_path.display()
146 );
147
148 utility::copy_dir_all(&template_path, &data_path).with_context(|| {
150 format!(
151 "Failed to copy template directory '{}' to '{}'",
152 template_path.display(),
153 data_path.display()
154 )
155 })?;
156
157 let result = std::fs::create_dir(&hosts_path);
159 if let Err(e) = result
160 && e.kind() != std::io::ErrorKind::AlreadyExists
161 {
162 return Err(e).context(format!(
163 "Failed to create hosts directory '{}'",
164 hosts_path.display()
165 ));
166 }
167 } else {
168 std::fs::create_dir(&data_path).with_context(|| {
170 format!("Failed to create data directory '{}'", data_path.display())
171 })?;
172 std::fs::create_dir(&hosts_path).with_context(|| {
173 format!(
174 "Failed to create hosts directory '{}'",
175 hosts_path.display(),
176 )
177 })?;
178 }
179
180 let config_out_filename = data_path.join("processed-config.yaml");
182 let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
183 format!("Failed to create file '{}'", config_out_filename.display())
184 })?;
185
186 serde_yaml::to_writer(config_out_file, &config).with_context(|| {
187 format!(
188 "Failed to write processed config yaml to file '{}'",
189 config_out_filename.display()
190 )
191 })?;
192
193 let meminfo_file =
194 std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
195
196 let emulate_cpuid = {
198 let supports_rdrand = asm_util::cpuid::supports_rdrand();
199 let supports_rdseed = asm_util::cpuid::supports_rdseed();
200 if !(supports_rdrand || supports_rdseed) {
201 debug!(
203 "No rdrand nor rdseed support. cpuid emulation is unnecessary, so skipping."
204 );
205 false
206 } else {
207 let res = unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 0) };
215 match res {
216 Ok(_) => {
217 unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 1) }
219 .unwrap_or_else(|e| panic!("Couldn't re-enable cpuid: {e:?}"));
220 debug!(
221 "CPU supports rdrand and/or rdseed, and platform supports intercepting cpuid. Enabling cpuid emulation."
222 );
223 true
224 }
225 Err(e) => {
226 warn!(
227 "CPU appears to support rdrand and/or rdseed, but platform doesn't support emulating cpuid ({e:?}). This may break determinism."
228 );
229 false
230 }
231 }
232 }
233 };
234
235 let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
236 log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
237 native_preemption_config: if config.native_preemption_enabled() {
238 FfiOption::Some(NativePreemptionConfig {
239 native_duration: config.native_preemption_native_interval()?,
240 sim_duration: config.native_preemption_sim_interval(),
241 })
242 } else {
243 FfiOption::None
244 },
245 emulate_cpuid,
246 });
247
248 Ok(Self {
249 manager_config: Some(manager_config),
250 controller,
251 config,
252 raw_frequency,
253 native_tsc_frequency,
254 end_time,
255 data_path,
256 hosts_path,
257 preload_paths: Arc::new(preload_paths),
258 check_fd_usage: true,
259 check_mem_usage: true,
260 meminfo_file,
261 shmem,
262 })
263 }
264
265 pub fn run(
266 mut self,
267 status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
268 ) -> anyhow::Result<u32> {
269 let mut manager_config = self.manager_config.take().unwrap();
270
271 let min_runahead_config: Option<Duration> = self
272 .config
273 .experimental
274 .runahead
275 .flatten()
276 .map(|x| x.into());
277 let min_runahead_config: Option<SimulationTime> =
278 min_runahead_config.map(|x| x.try_into().unwrap());
279
280 let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
281 let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
282 let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
283
284 let smallest_latency = SimulationTime::from_nanos(
285 manager_config
286 .routing_info
287 .get_smallest_latency_ns()
288 .unwrap(),
289 );
290
291 let parallelism: usize = match self.config.general.parallelism.unwrap() {
292 0 => {
293 let cores = cpu::count_physical_cores().try_into().unwrap();
294 log::info!("The parallelism option was 0, so using parallelism={cores}");
295 cores
296 }
297 x => x.try_into().unwrap(),
298 };
299
300 let mut dns_builder = DnsBuilder::new();
302
303 let host_init: Vec<(&HostInfo, HostId)> = manager_config
305 .hosts
306 .iter()
307 .enumerate()
308 .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
309 .collect();
310
311 for (info, id) in &host_init {
312 let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
314 unreachable!("IPv6 not supported");
315 };
316
317 dns_builder
319 .register(*id, addr, info.name.clone())
320 .with_context(|| {
321 format!(
322 "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
323 *id, addr, info.name
324 )
325 })?;
326 }
327
328 let dns = dns_builder.into_dns()?;
330
331 let mut hosts: Vec<_> = host_init
337 .iter()
338 .map(|(info, id)| {
339 self.build_host(*id, info)
340 .with_context(|| format!("Failed to build host '{}'", info.name))
341 })
342 .collect::<anyhow::Result<_>>()?;
343
344 hosts.shuffle(&mut manager_config.random);
346
347 let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
348
349 let cpu_iter =
351 std::iter::from_fn(|| {
352 Some(use_cpu_pinning.then(|| {
354 u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
355 }))
356 });
357
358 let parallelism = std::cmp::min(parallelism, hosts.len());
361
362 let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
364 if cpus[0].is_some() {
365 log::debug!("Pinning to cpus: {cpus:?}");
366 assert!(cpus.iter().all(|x| x.is_some()));
367 } else {
368 log::debug!("Not pinning to CPUs");
369 assert!(cpus.iter().all(|x| x.is_none()));
370 }
371 assert_eq!(cpus.len(), parallelism);
372
373 worker::WORKER_SHARED
375 .borrow_mut()
376 .replace(worker::WorkerShared {
377 ip_assignment: manager_config.ip_assignment,
378 routing_info: manager_config.routing_info,
379 host_bandwidths: manager_config.host_bandwidths,
380 dns,
382 num_plugin_errors: AtomicU32::new(0),
383 status_logger_state: status_logger_state.map(Arc::clone),
385 runahead: Runahead::new(
386 self.config.experimental.use_dynamic_runahead.unwrap(),
387 smallest_latency,
388 min_runahead_config,
389 ),
390 child_pid_watcher: ChildPidWatcher::new(),
391 event_queues: hosts
392 .iter()
393 .map(|x| (x.id(), x.event_queue().clone()))
394 .collect(),
395 bootstrap_end_time,
396 sim_end_time: self.end_time,
397 });
398
399 {
401 let mut scheduler = match self.config.experimental.scheduler.unwrap() {
402 configuration::Scheduler::ThreadPerHost => {
403 std::thread_local! {
404 static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
406 }
407 Scheduler::ThreadPerHost(ThreadPerHostSched::new(
408 &cpus,
409 &SCHED_HOST_STORAGE,
410 hosts,
411 ))
412 }
413 configuration::Scheduler::ThreadPerCore => {
414 Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
415 &cpus,
416 hosts,
417 self.config.experimental.use_worker_spinning.unwrap(),
418 ))
419 }
420 };
421
422 scheduler.scope(|s| {
424 s.run(|thread_id| {
425 worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
426 });
427 });
428
429 let mut window = Some((
431 EmulatedTime::SIMULATION_START,
432 EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
433 ));
434
435 let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
438 vec![AtomicRefCell::new(None); scheduler.parallelism()];
439
440 let heartbeat_interval = self
442 .config
443 .general
444 .heartbeat_interval
445 .flatten()
446 .map(|x| Duration::from(x).try_into().unwrap());
447
448 let mut last_heartbeat = EmulatedTime::SIMULATION_START;
449 let mut time_of_last_usage_check = std::time::Instant::now();
450
451 while let Some((window_start, window_end)) = window {
453 let display_time = std::cmp::min(window_start, window_end);
455 worker::WORKER_SHARED
456 .borrow()
457 .as_ref()
458 .unwrap()
459 .update_status_logger(|state| {
460 state.current = display_time;
461 });
462
463 scheduler.scope(|s| {
465 s.run_with_data(
467 &thread_next_event_times,
468 move |_, hosts, next_event_time| {
471 let mut next_event_time = next_event_time.borrow_mut();
472
473 worker::Worker::reset_next_event_time();
474 worker::Worker::set_round_end_time(window_end);
475
476 for_each_host(hosts, |host| {
477 let host_next_event_time = {
478 host.lock_shmem();
479 host.execute(window_end);
480 let host_next_event_time = host.next_event_time();
481 host.unlock_shmem();
482 host_next_event_time
483 };
484 *next_event_time = [*next_event_time, host_next_event_time]
485 .into_iter()
486 .flatten() .reduce(std::cmp::min);
488 });
489
490 let packet_next_event_time = worker::Worker::get_next_event_time();
491
492 *next_event_time = [*next_event_time, packet_next_event_time]
493 .into_iter()
494 .flatten() .reduce(std::cmp::min);
496 },
497 );
498
499 if let Some(heartbeat_interval) = heartbeat_interval
501 && window_start > last_heartbeat + heartbeat_interval
502 {
503 last_heartbeat = window_start;
504 self.log_heartbeat(window_start);
505 }
506
507 let current_time = std::time::Instant::now();
509 if current_time.duration_since(time_of_last_usage_check)
510 > Duration::from_secs(30)
511 {
512 time_of_last_usage_check = current_time;
513 self.check_resource_usage();
514 }
515 });
516
517 let min_next_event_time = thread_next_event_times
520 .iter()
521 .filter_map(|x| x.borrow_mut().take())
523 .reduce(std::cmp::min)
524 .unwrap_or(EmulatedTime::MAX);
525
526 log::debug!(
527 "Finished execution window [{}--{}], next event at {}",
528 (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
529 (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
530 (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
531 );
532
533 window = self
536 .controller
537 .manager_finished_current_round(min_next_event_time);
538 }
539
540 scheduler.scope(|s| {
541 s.run_with_hosts(move |_, hosts| {
542 for_each_host(hosts, |host| {
543 worker::Worker::set_current_time(self.end_time);
544 host.free_all_applications();
545 host.shutdown();
546 worker::Worker::clear_current_time();
547 });
548 });
549 });
550
551 scheduler.scope(|s| {
553 s.run(|_| {
554 worker::Worker::add_to_global_sim_stats();
555 });
556 });
557
558 scheduler.join();
559 }
560
561 worker::WORKER_SHARED
563 .borrow()
564 .as_ref()
565 .unwrap()
566 .update_status_logger(|state| {
567 state.current = self.end_time;
568 });
569
570 let num_plugin_errors = worker::WORKER_SHARED
571 .borrow()
572 .as_ref()
573 .unwrap()
574 .plugin_error_count();
575
576 worker::WORKER_SHARED.borrow_mut().take();
579
580 worker::with_global_sim_stats(|stats| {
584 if self.config.experimental.use_syscall_counters.unwrap() {
585 log::info!(
586 "Global syscall counts: {}",
587 stats.syscall_counts.lock().unwrap()
588 );
589 }
590 if self.config.experimental.use_object_counters.unwrap() {
591 let alloc_counts = stats.alloc_counts.lock().unwrap();
592 let dealloc_counts = stats.dealloc_counts.lock().unwrap();
593 log::info!("Global allocated object counts: {alloc_counts}");
594 log::info!("Global deallocated object counts: {dealloc_counts}");
595
596 if *alloc_counts == *dealloc_counts {
597 log::info!("We allocated and deallocated the same number of objects :)");
598 } else {
599 log::warn!("Memory leak detected");
601 }
602 }
603
604 let stats_filename = self.data_path.clone().join("sim-stats.json");
605 sim_stats::write_stats_to_file(&stats_filename, stats)
606 })?;
607
608 Ok(num_plugin_errors)
609 }
610
611 fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
612 let hostname = CString::new(&*host_info.name).unwrap();
613
614 let host = {
616 let params = HostParameters {
617 id: host_id,
619 cpu_frequency: self.raw_frequency,
621 node_seed: host_info.seed,
622 hostname,
623 node_id: host_info.network_node_id,
624 ip_addr: match host_info.ip_addr.unwrap() {
625 std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
626 std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
628 },
629 sim_end_time: self.end_time,
630 requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
631 requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
632 cpu_threshold: host_info.cpu_threshold,
633 cpu_precision: host_info.cpu_precision,
634 log_level: host_info
635 .log_level
636 .map(|x| x.to_c_loglevel())
637 .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
638 pcap_config: host_info.pcap_config,
639 qdisc: host_info.qdisc,
640 init_sock_recv_buf_size: host_info.recv_buf_size,
641 autotune_recv_buf: host_info.autotune_recv_buf,
642 init_sock_send_buf_size: host_info.send_buf_size,
643 autotune_send_buf: host_info.autotune_send_buf,
644 native_tsc_frequency: self.native_tsc_frequency,
645 model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
646 max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
647 unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
648 unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
649 strace_logging_options: self.config.strace_logging_mode(),
650 shim_log_level: host_info
651 .log_level
652 .unwrap_or_else(|| self.config.general.log_level.unwrap())
653 .to_c_loglevel(),
654 use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
655 use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
656 use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
657 };
658
659 Box::new(Host::new(
660 params,
661 &self.hosts_path,
662 self.raw_frequency,
663 self.shmem(),
664 self.preload_paths.clone(),
665 ))
666 };
667
668 host.lock_shmem();
669
670 for proc in &host_info.processes {
671 let plugin_path =
672 CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
673 let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
674 let pause_for_debugging = host_info.pause_for_debugging;
675
676 let argv: Vec<CString> = proc
677 .args
678 .iter()
679 .map(|x| CString::new(x.as_bytes()).unwrap())
680 .collect();
681
682 let envv: Vec<CString> = proc
683 .env
684 .clone()
685 .into_iter()
686 .map(|(x, y)| {
687 let mut x: OsString = String::from(x).into();
688 x.push("=");
689 x.push(y);
690 CString::new(x.as_bytes()).unwrap()
691 })
692 .collect();
693
694 host.continue_execution_timer();
695
696 host.add_application(
697 proc.start_time,
698 proc.shutdown_time,
699 proc.shutdown_signal,
700 plugin_name,
701 plugin_path,
702 argv,
703 envv,
704 pause_for_debugging,
705 proc.expected_final_state,
706 );
707
708 host.stop_execution_timer();
709 }
710
711 host.unlock_shmem();
712
713 Ok(host)
714 }
715
716 fn log_heartbeat(&mut self, now: EmulatedTime) {
717 let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
718 if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
719 let err = nix::errno::Errno::last();
720 log::warn!("Unable to get shadow's resource usage: {err}");
721 return;
722 }
723
724 let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
726
727 let max_memory = (resources.ru_maxrss as f64) / 1048576.0; let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
730 let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
731
732 log::info!(
736 "Process resource usage at simtime {} reported by getrusage(): \
737 ru_maxrss={:.03} GiB, \
738 ru_utime={:.03} minutes, \
739 ru_stime={:.03} minutes, \
740 ru_nvcsw={}, \
741 ru_nivcsw={}",
742 (now - EmulatedTime::SIMULATION_START).as_nanos(),
743 max_memory,
744 user_time_minutes,
745 system_time_minutes,
746 resources.ru_nvcsw,
747 resources.ru_nivcsw,
748 );
749
750 log::info!(
754 "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
755 (now - EmulatedTime::SIMULATION_START).as_nanos(),
756 serde_json::to_string(&mem_info).unwrap(),
757 );
758 }
759
760 fn check_resource_usage(&mut self) {
761 if self.check_fd_usage {
762 match self.fd_usage() {
763 Ok((usage, limit)) if usage > limit * 90 / 100 => {
765 log::warn!(
766 "Using more than 90% ({usage}/{limit}) of available file descriptors"
767 );
768 self.check_fd_usage = false;
769 }
770 Err(e) => {
771 log::warn!("Unable to check fd usage: {e}");
772 self.check_fd_usage = false;
773 }
774 Ok(_) => {}
775 }
776 }
777
778 if self.check_mem_usage {
779 match self.memory_remaining() {
780 Ok(remaining) if remaining < 500 * 1024 * 1024 => {
782 log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
783 self.check_mem_usage = false;
784 }
785 Err(e) => {
786 log::warn!("Unable to check memory usage: {e}");
787 self.check_mem_usage = false;
788 }
789 Ok(_) => {}
790 }
791 }
792 }
793
794 fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
796 let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
797
798 let mut fd_count: u64 = 0;
799 for entry in dir {
800 entry.context("Failed to read entry in '/proc/self/fd'")?;
802 fd_count += 1;
803 }
804
805 let (soft_limit, _) =
806 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
807 .context("Failed to get the fd limit")?;
808
809 Ok((fd_count, soft_limit))
810 }
811
812 fn memory_remaining(&mut self) -> anyhow::Result<u64> {
814 let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
815 .context("Failed to get the page size")?
816 .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
817
818 let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
819 .context("Failed to get the number of available pages of physical memory")?
820 .ok_or_else(|| {
821 anyhow::anyhow!(
822 "Failed to get the number of available pages of physical memory (no errno)"
823 )
824 })?;
825
826 let page_size: u64 = page_size.try_into().unwrap();
827 let avl_pages: u64 = avl_pages.try_into().unwrap();
828
829 Ok(page_size * avl_pages)
830 }
831
832 pub fn shmem(&self) -> &ShMemBlock<'_, ManagerShmem> {
833 &self.shmem
834 }
835}
836
837pub struct ManagerConfig {
838 pub random: Xoshiro256PlusPlus,
840
841 pub ip_assignment: IpAssignment<u32>,
843
844 pub routing_info: RoutingInfo<u32>,
846
847 pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
849
850 pub hosts: Vec<HostInfo>,
852}
853
854fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
856 host_iter.for_each(|host| {
857 worker::Worker::set_active_host(host);
858 worker::Worker::with_active_host(|host| {
859 f(host);
860 })
861 .unwrap();
862 worker::Worker::take_active_host()
863 });
864}
865
866fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
868 const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
869 let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
870 Ok(khz * 1000)
871}
872
873fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
874 let libname_c = CString::new(libname).unwrap();
875 let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
876
877 let libpath = if !libpath_c.is_null() {
879 let libpath = unsafe { CStr::from_ptr(libpath_c) };
880 let libpath = OsStr::from_bytes(libpath.to_bytes());
881 Some(PathBuf::from(libpath.to_os_string()))
882 } else {
883 None
884 };
885
886 unsafe { libc::free(libpath_c as *mut libc::c_void) };
887
888 let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
889
890 let bytes = libpath.as_os_str().as_bytes();
891 if bytes.iter().any(|c| *c == b' ' || *c == b':') {
892 anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
894 }
895
896 log::debug!(
897 "Found required preload library {} at path {}",
898 libname,
899 libpath.display(),
900 );
901
902 Ok(libpath)
903}