1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use linux_api::prctl::ArchPrctlOp;
13use log::{debug, warn};
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16use scheduler::thread_per_core::ThreadPerCoreSched;
17use scheduler::thread_per_host::ThreadPerHostSched;
18use scheduler::{HostIter, Scheduler};
19use shadow_shim_helper_rs::HostId;
20use shadow_shim_helper_rs::emulated_time::EmulatedTime;
21use shadow_shim_helper_rs::option::FfiOption;
22use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
23use shadow_shim_helper_rs::simulation_time::SimulationTime;
24use shadow_shmem::allocator::ShMemBlock;
25
26use crate::core::configuration::{self, ConfigOptions, Flatten};
27use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
28use crate::core::cpu;
29use crate::core::resource_usage;
30use crate::core::runahead::Runahead;
31use crate::core::sim_config::{Bandwidth, HostInfo};
32use crate::core::sim_stats;
33use crate::core::worker;
34use crate::cshadow as c;
35use crate::host::host::{Host, HostParameters};
36use crate::network::dns::DnsBuilder;
37use crate::network::graph::{IpAssignment, RoutingInfo};
38use crate::utility;
39use crate::utility::childpid_watcher::ChildPidWatcher;
40use crate::utility::status_bar::Status;
41
42pub struct Manager<'a> {
43 manager_config: Option<ManagerConfig>,
44 controller: &'a Controller<'a>,
45 config: &'a ConfigOptions,
46
47 raw_frequency: u64,
48 native_tsc_frequency: u64,
49 end_time: EmulatedTime,
50
51 data_path: PathBuf,
52 hosts_path: PathBuf,
53
54 preload_paths: Arc<Vec<PathBuf>>,
55
56 check_fd_usage: bool,
57 check_mem_usage: bool,
58
59 meminfo_file: std::fs::File,
60 shmem: ShMemBlock<'static, ManagerShmem>,
61}
62
63impl<'a> Manager<'a> {
64 pub fn new(
65 manager_config: ManagerConfig,
66 controller: &'a Controller<'a>,
67 config: &'a ConfigOptions,
68 end_time: EmulatedTime,
69 ) -> anyhow::Result<Self> {
70 let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
72 let default_freq = 2_500_000_000; log::debug!("Failed to get raw CPU frequency, using {default_freq} Hz instead: {e}");
74 default_freq
75 });
76
77 let native_tsc_frequency = if let Some(f) = asm_util::tsc::Tsc::native_cycles_per_second() {
78 f
79 } else {
80 warn!(
81 "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
82 );
83 raw_frequency
84 };
85
86 let mut preload_paths = Vec::new();
87
88 const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
91 preload_paths.push(
92 get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
93 format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
94 })?,
95 );
96
97 const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
99 if config.experimental.use_preload_libc.unwrap() {
100 let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
101 format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
102 })?;
103 preload_paths.push(path);
104 } else {
105 log::info!("Preloading the libc library is disabled");
106 };
107
108 const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
110 if config.experimental.use_preload_openssl_rng.unwrap() {
111 let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
112 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
113 })?;
114 preload_paths.push(path);
115 } else {
116 log::info!("Preloading the openssl rng library is disabled");
117 };
118
119 const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
121 if config.experimental.use_preload_openssl_crypto.unwrap() {
122 let path =
123 get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
124 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
125 })?;
126 preload_paths.push(path);
127 } else {
128 log::info!("Preloading the openssl crypto library is disabled");
129 };
130
131 let cwd = std::env::current_dir()?;
133 let template_path = config
134 .general
135 .template_directory
136 .flatten_ref()
137 .map(|x| cwd.clone().join(x));
138 let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
139 let hosts_path = data_path.join("hosts");
140
141 if let Some(template_path) = template_path {
142 log::debug!(
143 "Copying template directory '{}' to '{}'",
144 template_path.display(),
145 data_path.display()
146 );
147
148 utility::copy_dir_all(&template_path, &data_path).with_context(|| {
150 format!(
151 "Failed to copy template directory '{}' to '{}'",
152 template_path.display(),
153 data_path.display()
154 )
155 })?;
156
157 let result = std::fs::create_dir(&hosts_path);
159 if let Err(e) = result {
160 if e.kind() != std::io::ErrorKind::AlreadyExists {
161 return Err(e).context(format!(
162 "Failed to create hosts directory '{}'",
163 hosts_path.display()
164 ));
165 }
166 }
167 } else {
168 std::fs::create_dir(&data_path).with_context(|| {
170 format!("Failed to create data directory '{}'", data_path.display())
171 })?;
172 std::fs::create_dir(&hosts_path).with_context(|| {
173 format!(
174 "Failed to create hosts directory '{}'",
175 hosts_path.display(),
176 )
177 })?;
178 }
179
180 let config_out_filename = data_path.join("processed-config.yaml");
182 let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
183 format!("Failed to create file '{}'", config_out_filename.display())
184 })?;
185
186 serde_yaml::to_writer(config_out_file, &config).with_context(|| {
187 format!(
188 "Failed to write processed config yaml to file '{}'",
189 config_out_filename.display()
190 )
191 })?;
192
193 let meminfo_file =
194 std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
195
196 let emulate_cpuid = {
198 let supports_rdrand = unsafe { asm_util::cpuid::supports_rdrand() };
200 let supports_rdseed = unsafe { asm_util::cpuid::supports_rdseed() };
201 if !(supports_rdrand || supports_rdseed) {
202 debug!(
204 "No rdrand nor rdseed support. cpuid emulation is unnecessary, so skipping."
205 );
206 false
207 } else {
208 let res = unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 0) };
216 match res {
217 Ok(_) => {
218 unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 1) }
220 .unwrap_or_else(|e| panic!("Couldn't re-enable cpuid: {e:?}"));
221 debug!(
222 "CPU supports rdrand and/or rdseed, and platform supports intercepting cpuid. Enabling cpuid emulation."
223 );
224 true
225 }
226 Err(e) => {
227 warn!(
228 "CPU appears to support rdrand and/or rdseed, but platform doesn't support emulating cpuid ({e:?}). This may break determinism."
229 );
230 false
231 }
232 }
233 }
234 };
235
236 let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
237 log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
238 native_preemption_config: if config.native_preemption_enabled() {
239 FfiOption::Some(NativePreemptionConfig {
240 native_duration: config.native_preemption_native_interval()?,
241 sim_duration: config.native_preemption_sim_interval(),
242 })
243 } else {
244 FfiOption::None
245 },
246 emulate_cpuid,
247 });
248
249 Ok(Self {
250 manager_config: Some(manager_config),
251 controller,
252 config,
253 raw_frequency,
254 native_tsc_frequency,
255 end_time,
256 data_path,
257 hosts_path,
258 preload_paths: Arc::new(preload_paths),
259 check_fd_usage: true,
260 check_mem_usage: true,
261 meminfo_file,
262 shmem,
263 })
264 }
265
266 pub fn run(
267 mut self,
268 status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
269 ) -> anyhow::Result<u32> {
270 let mut manager_config = self.manager_config.take().unwrap();
271
272 let min_runahead_config: Option<Duration> = self
273 .config
274 .experimental
275 .runahead
276 .flatten()
277 .map(|x| x.into());
278 let min_runahead_config: Option<SimulationTime> =
279 min_runahead_config.map(|x| x.try_into().unwrap());
280
281 let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
282 let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
283 let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
284
285 let smallest_latency = SimulationTime::from_nanos(
286 manager_config
287 .routing_info
288 .get_smallest_latency_ns()
289 .unwrap(),
290 );
291
292 let parallelism: usize = match self.config.general.parallelism.unwrap() {
293 0 => {
294 let cores = cpu::count_physical_cores().try_into().unwrap();
295 log::info!("The parallelism option was 0, so using parallelism={cores}");
296 cores
297 }
298 x => x.try_into().unwrap(),
299 };
300
301 let mut dns_builder = DnsBuilder::new();
303
304 let host_init: Vec<(&HostInfo, HostId)> = manager_config
306 .hosts
307 .iter()
308 .enumerate()
309 .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
310 .collect();
311
312 for (info, id) in &host_init {
313 let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
315 unreachable!("IPv6 not supported");
316 };
317
318 dns_builder
320 .register(*id, addr, info.name.clone())
321 .with_context(|| {
322 format!(
323 "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
324 *id, addr, info.name
325 )
326 })?;
327 }
328
329 let dns = dns_builder.into_dns()?;
331
332 let mut hosts: Vec<_> = host_init
338 .iter()
339 .map(|(info, id)| {
340 self.build_host(*id, info)
341 .with_context(|| format!("Failed to build host '{}'", info.name))
342 })
343 .collect::<anyhow::Result<_>>()?;
344
345 hosts.shuffle(&mut manager_config.random);
347
348 let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
349
350 let cpu_iter =
352 std::iter::from_fn(|| {
353 Some(use_cpu_pinning.then(|| {
355 u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
356 }))
357 });
358
359 let parallelism = std::cmp::min(parallelism, hosts.len());
362
363 let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
365 if cpus[0].is_some() {
366 log::debug!("Pinning to cpus: {cpus:?}");
367 assert!(cpus.iter().all(|x| x.is_some()));
368 } else {
369 log::debug!("Not pinning to CPUs");
370 assert!(cpus.iter().all(|x| x.is_none()));
371 }
372 assert_eq!(cpus.len(), parallelism);
373
374 worker::WORKER_SHARED
376 .borrow_mut()
377 .replace(worker::WorkerShared {
378 ip_assignment: manager_config.ip_assignment,
379 routing_info: manager_config.routing_info,
380 host_bandwidths: manager_config.host_bandwidths,
381 dns,
383 num_plugin_errors: AtomicU32::new(0),
384 status_logger_state: status_logger_state.map(Arc::clone),
386 runahead: Runahead::new(
387 self.config.experimental.use_dynamic_runahead.unwrap(),
388 smallest_latency,
389 min_runahead_config,
390 ),
391 child_pid_watcher: ChildPidWatcher::new(),
392 event_queues: hosts
393 .iter()
394 .map(|x| (x.id(), x.event_queue().clone()))
395 .collect(),
396 bootstrap_end_time,
397 sim_end_time: self.end_time,
398 });
399
400 {
402 let mut scheduler = match self.config.experimental.scheduler.unwrap() {
403 configuration::Scheduler::ThreadPerHost => {
404 std::thread_local! {
405 static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
407 }
408 Scheduler::ThreadPerHost(ThreadPerHostSched::new(
409 &cpus,
410 &SCHED_HOST_STORAGE,
411 hosts,
412 ))
413 }
414 configuration::Scheduler::ThreadPerCore => {
415 Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
416 &cpus,
417 hosts,
418 self.config.experimental.use_worker_spinning.unwrap(),
419 ))
420 }
421 };
422
423 scheduler.scope(|s| {
425 s.run(|thread_id| {
426 worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
427 });
428 });
429
430 let mut window = Some((
432 EmulatedTime::SIMULATION_START,
433 EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
434 ));
435
436 let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
439 vec![AtomicRefCell::new(None); scheduler.parallelism()];
440
441 let heartbeat_interval = self
443 .config
444 .general
445 .heartbeat_interval
446 .flatten()
447 .map(|x| Duration::from(x).try_into().unwrap());
448
449 let mut last_heartbeat = EmulatedTime::SIMULATION_START;
450 let mut time_of_last_usage_check = std::time::Instant::now();
451
452 while let Some((window_start, window_end)) = window {
454 let display_time = std::cmp::min(window_start, window_end);
456 worker::WORKER_SHARED
457 .borrow()
458 .as_ref()
459 .unwrap()
460 .update_status_logger(|state| {
461 state.current = display_time;
462 });
463
464 scheduler.scope(|s| {
466 s.run_with_data(
468 &thread_next_event_times,
469 move |_, hosts, next_event_time| {
472 let mut next_event_time = next_event_time.borrow_mut();
473
474 worker::Worker::reset_next_event_time();
475 worker::Worker::set_round_end_time(window_end);
476
477 for_each_host(hosts, |host| {
478 let host_next_event_time = {
479 host.lock_shmem();
480 host.execute(window_end);
481 let host_next_event_time = host.next_event_time();
482 host.unlock_shmem();
483 host_next_event_time
484 };
485 *next_event_time = [*next_event_time, host_next_event_time]
486 .into_iter()
487 .flatten() .reduce(std::cmp::min);
489 });
490
491 let packet_next_event_time = worker::Worker::get_next_event_time();
492
493 *next_event_time = [*next_event_time, packet_next_event_time]
494 .into_iter()
495 .flatten() .reduce(std::cmp::min);
497 },
498 );
499
500 if let Some(heartbeat_interval) = heartbeat_interval {
502 if window_start > last_heartbeat + heartbeat_interval {
503 last_heartbeat = window_start;
504 self.log_heartbeat(window_start);
505 }
506 }
507
508 let current_time = std::time::Instant::now();
510 if current_time.duration_since(time_of_last_usage_check)
511 > Duration::from_secs(30)
512 {
513 time_of_last_usage_check = current_time;
514 self.check_resource_usage();
515 }
516 });
517
518 let min_next_event_time = thread_next_event_times
521 .iter()
522 .filter_map(|x| x.borrow_mut().take())
524 .reduce(std::cmp::min)
525 .unwrap_or(EmulatedTime::MAX);
526
527 log::debug!(
528 "Finished execution window [{}--{}], next event at {}",
529 (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
530 (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
531 (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
532 );
533
534 window = self
537 .controller
538 .manager_finished_current_round(min_next_event_time);
539 }
540
541 scheduler.scope(|s| {
542 s.run_with_hosts(move |_, hosts| {
543 for_each_host(hosts, |host| {
544 worker::Worker::set_current_time(self.end_time);
545 host.free_all_applications();
546 host.shutdown();
547 worker::Worker::clear_current_time();
548 });
549 });
550 });
551
552 scheduler.scope(|s| {
554 s.run(|_| {
555 worker::Worker::add_to_global_sim_stats();
556 });
557 });
558
559 scheduler.join();
560 }
561
562 worker::WORKER_SHARED
564 .borrow()
565 .as_ref()
566 .unwrap()
567 .update_status_logger(|state| {
568 state.current = self.end_time;
569 });
570
571 let num_plugin_errors = worker::WORKER_SHARED
572 .borrow()
573 .as_ref()
574 .unwrap()
575 .plugin_error_count();
576
577 worker::WORKER_SHARED.borrow_mut().take();
580
581 worker::with_global_sim_stats(|stats| {
585 if self.config.experimental.use_syscall_counters.unwrap() {
586 log::info!(
587 "Global syscall counts: {}",
588 stats.syscall_counts.lock().unwrap()
589 );
590 }
591 if self.config.experimental.use_object_counters.unwrap() {
592 let alloc_counts = stats.alloc_counts.lock().unwrap();
593 let dealloc_counts = stats.dealloc_counts.lock().unwrap();
594 log::info!("Global allocated object counts: {alloc_counts}");
595 log::info!("Global deallocated object counts: {dealloc_counts}");
596
597 if *alloc_counts == *dealloc_counts {
598 log::info!("We allocated and deallocated the same number of objects :)");
599 } else {
600 log::warn!("Memory leak detected");
602 }
603 }
604
605 let stats_filename = self.data_path.clone().join("sim-stats.json");
606 sim_stats::write_stats_to_file(&stats_filename, stats)
607 })?;
608
609 Ok(num_plugin_errors)
610 }
611
612 fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
613 let hostname = CString::new(&*host_info.name).unwrap();
614
615 let host = {
617 let params = HostParameters {
618 id: host_id,
620 cpu_frequency: self.raw_frequency,
622 node_seed: host_info.seed,
623 hostname,
624 node_id: host_info.network_node_id,
625 ip_addr: match host_info.ip_addr.unwrap() {
626 std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
627 std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
629 },
630 sim_end_time: self.end_time,
631 requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
632 requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
633 cpu_threshold: host_info.cpu_threshold,
634 cpu_precision: host_info.cpu_precision,
635 log_level: host_info
636 .log_level
637 .map(|x| x.to_c_loglevel())
638 .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
639 pcap_config: host_info.pcap_config,
640 qdisc: host_info.qdisc,
641 init_sock_recv_buf_size: host_info.recv_buf_size,
642 autotune_recv_buf: host_info.autotune_recv_buf,
643 init_sock_send_buf_size: host_info.send_buf_size,
644 autotune_send_buf: host_info.autotune_send_buf,
645 native_tsc_frequency: self.native_tsc_frequency,
646 model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
647 max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
648 unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
649 unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
650 strace_logging_options: self.config.strace_logging_mode(),
651 shim_log_level: host_info
652 .log_level
653 .unwrap_or_else(|| self.config.general.log_level.unwrap())
654 .to_c_loglevel(),
655 use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
656 use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
657 use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
658 };
659
660 Box::new(Host::new(
661 params,
662 &self.hosts_path,
663 self.raw_frequency,
664 self.shmem(),
665 self.preload_paths.clone(),
666 ))
667 };
668
669 host.lock_shmem();
670
671 for proc in &host_info.processes {
672 let plugin_path =
673 CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
674 let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
675 let pause_for_debugging = host_info.pause_for_debugging;
676
677 let argv: Vec<CString> = proc
678 .args
679 .iter()
680 .map(|x| CString::new(x.as_bytes()).unwrap())
681 .collect();
682
683 let envv: Vec<CString> = proc
684 .env
685 .clone()
686 .into_iter()
687 .map(|(x, y)| {
688 let mut x: OsString = String::from(x).into();
689 x.push("=");
690 x.push(y);
691 CString::new(x.as_bytes()).unwrap()
692 })
693 .collect();
694
695 host.continue_execution_timer();
696
697 host.add_application(
698 proc.start_time,
699 proc.shutdown_time,
700 proc.shutdown_signal,
701 plugin_name,
702 plugin_path,
703 argv,
704 envv,
705 pause_for_debugging,
706 proc.expected_final_state,
707 );
708
709 host.stop_execution_timer();
710 }
711
712 host.unlock_shmem();
713
714 Ok(host)
715 }
716
717 fn log_heartbeat(&mut self, now: EmulatedTime) {
718 let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
719 if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
720 let err = nix::errno::Errno::last();
721 log::warn!("Unable to get shadow's resource usage: {err}");
722 return;
723 }
724
725 let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
727
728 let max_memory = (resources.ru_maxrss as f64) / 1048576.0; let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
731 let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
732
733 log::info!(
737 "Process resource usage at simtime {} reported by getrusage(): \
738 ru_maxrss={:.03} GiB, \
739 ru_utime={:.03} minutes, \
740 ru_stime={:.03} minutes, \
741 ru_nvcsw={}, \
742 ru_nivcsw={}",
743 (now - EmulatedTime::SIMULATION_START).as_nanos(),
744 max_memory,
745 user_time_minutes,
746 system_time_minutes,
747 resources.ru_nvcsw,
748 resources.ru_nivcsw,
749 );
750
751 log::info!(
755 "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
756 (now - EmulatedTime::SIMULATION_START).as_nanos(),
757 serde_json::to_string(&mem_info).unwrap(),
758 );
759 }
760
761 fn check_resource_usage(&mut self) {
762 if self.check_fd_usage {
763 match self.fd_usage() {
764 Ok((usage, limit)) if usage > limit * 90 / 100 => {
766 log::warn!(
767 "Using more than 90% ({usage}/{limit}) of available file descriptors"
768 );
769 self.check_fd_usage = false;
770 }
771 Err(e) => {
772 log::warn!("Unable to check fd usage: {e}");
773 self.check_fd_usage = false;
774 }
775 Ok(_) => {}
776 }
777 }
778
779 if self.check_mem_usage {
780 match self.memory_remaining() {
781 Ok(remaining) if remaining < 500 * 1024 * 1024 => {
783 log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
784 self.check_mem_usage = false;
785 }
786 Err(e) => {
787 log::warn!("Unable to check memory usage: {e}");
788 self.check_mem_usage = false;
789 }
790 Ok(_) => {}
791 }
792 }
793 }
794
795 fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
797 let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
798
799 let mut fd_count: u64 = 0;
800 for entry in dir {
801 entry.context("Failed to read entry in '/proc/self/fd'")?;
803 fd_count += 1;
804 }
805
806 let (soft_limit, _) =
807 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
808 .context("Failed to get the fd limit")?;
809
810 Ok((fd_count, soft_limit))
811 }
812
813 fn memory_remaining(&mut self) -> anyhow::Result<u64> {
815 let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
816 .context("Failed to get the page size")?
817 .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
818
819 let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
820 .context("Failed to get the number of available pages of physical memory")?
821 .ok_or_else(|| {
822 anyhow::anyhow!(
823 "Failed to get the number of available pages of physical memory (no errno)"
824 )
825 })?;
826
827 let page_size: u64 = page_size.try_into().unwrap();
828 let avl_pages: u64 = avl_pages.try_into().unwrap();
829
830 Ok(page_size * avl_pages)
831 }
832
833 pub fn shmem(&self) -> &ShMemBlock<ManagerShmem> {
834 &self.shmem
835 }
836}
837
838pub struct ManagerConfig {
839 pub random: Xoshiro256PlusPlus,
841
842 pub ip_assignment: IpAssignment<u32>,
844
845 pub routing_info: RoutingInfo<u32>,
847
848 pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
850
851 pub hosts: Vec<HostInfo>,
853}
854
855fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
857 host_iter.for_each(|host| {
858 worker::Worker::set_active_host(host);
859 worker::Worker::with_active_host(|host| {
860 f(host);
861 })
862 .unwrap();
863 worker::Worker::take_active_host()
864 });
865}
866
867fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
869 const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
870 let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
871 Ok(khz * 1000)
872}
873
874fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
875 let libname_c = CString::new(libname).unwrap();
876 let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
877
878 let libpath = if !libpath_c.is_null() {
880 let libpath = unsafe { CStr::from_ptr(libpath_c) };
881 let libpath = OsStr::from_bytes(libpath.to_bytes());
882 Some(PathBuf::from(libpath.to_os_string()))
883 } else {
884 None
885 };
886
887 unsafe { libc::free(libpath_c as *mut libc::c_void) };
888
889 let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
890
891 let bytes = libpath.as_os_str().as_bytes();
892 if bytes.iter().any(|c| *c == b' ' || *c == b':') {
893 anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
895 }
896
897 log::debug!(
898 "Found required preload library {} at path {}",
899 libname,
900 libpath.display(),
901 );
902
903 Ok(libpath)
904}