1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use linux_api::prctl::ArchPrctlOp;
13use log::{debug, warn};
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16use scheduler::thread_per_core::ThreadPerCoreSched;
17use scheduler::thread_per_host::ThreadPerHostSched;
18use scheduler::{HostIter, Scheduler};
19use shadow_shim_helper_rs::HostId;
20use shadow_shim_helper_rs::emulated_time::EmulatedTime;
21use shadow_shim_helper_rs::option::FfiOption;
22use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
23use shadow_shim_helper_rs::simulation_time::SimulationTime;
24use shadow_shmem::allocator::ShMemBlock;
25
26use crate::core::configuration::{self, ConfigOptions, Flatten};
27use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
28use crate::core::cpu;
29use crate::core::resource_usage;
30use crate::core::runahead::Runahead;
31use crate::core::sim_config::{Bandwidth, HostInfo};
32use crate::core::sim_stats;
33use crate::core::worker;
34use crate::cshadow as c;
35use crate::host::host::{Host, HostParameters};
36use crate::network::dns::DnsBuilder;
37use crate::network::graph::{IpAssignment, RoutingInfo};
38use crate::utility;
39use crate::utility::childpid_watcher::ChildPidWatcher;
40use crate::utility::status_bar::Status;
41
42pub struct Manager<'a> {
43 manager_config: Option<ManagerConfig>,
44 controller: &'a Controller<'a>,
45 config: &'a ConfigOptions,
46
47 raw_frequency: u64,
48 native_tsc_frequency: u64,
49 end_time: EmulatedTime,
50
51 data_path: PathBuf,
52 hosts_path: PathBuf,
53
54 preload_paths: Arc<Vec<PathBuf>>,
55
56 check_fd_usage: bool,
57 check_mem_usage: bool,
58
59 meminfo_file: std::fs::File,
60 shmem: ShMemBlock<'static, ManagerShmem>,
61}
62
63impl<'a> Manager<'a> {
64 pub fn new(
65 manager_config: ManagerConfig,
66 controller: &'a Controller<'a>,
67 config: &'a ConfigOptions,
68 end_time: EmulatedTime,
69 ) -> anyhow::Result<Self> {
70 let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
72 let default_freq = 2_500_000_000; log::debug!(
74 "Failed to get raw CPU frequency, using {} Hz instead: {}",
75 default_freq,
76 e
77 );
78 default_freq
79 });
80
81 let native_tsc_frequency = if let Some(f) = asm_util::tsc::Tsc::native_cycles_per_second() {
82 f
83 } else {
84 warn!(
85 "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
86 );
87 raw_frequency
88 };
89
90 let mut preload_paths = Vec::new();
91
92 const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
95 preload_paths.push(
96 get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
97 format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
98 })?,
99 );
100
101 const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
103 if config.experimental.use_preload_libc.unwrap() {
104 let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
105 format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
106 })?;
107 preload_paths.push(path);
108 } else {
109 log::info!("Preloading the libc library is disabled");
110 };
111
112 const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
114 if config.experimental.use_preload_openssl_rng.unwrap() {
115 let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
116 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
117 })?;
118 preload_paths.push(path);
119 } else {
120 log::info!("Preloading the openssl rng library is disabled");
121 };
122
123 const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
125 if config.experimental.use_preload_openssl_crypto.unwrap() {
126 let path =
127 get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
128 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
129 })?;
130 preload_paths.push(path);
131 } else {
132 log::info!("Preloading the openssl crypto library is disabled");
133 };
134
135 let cwd = std::env::current_dir()?;
137 let template_path = config
138 .general
139 .template_directory
140 .flatten_ref()
141 .map(|x| cwd.clone().join(x));
142 let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
143 let hosts_path = data_path.join("hosts");
144
145 if let Some(template_path) = template_path {
146 log::debug!(
147 "Copying template directory '{}' to '{}'",
148 template_path.display(),
149 data_path.display()
150 );
151
152 utility::copy_dir_all(&template_path, &data_path).with_context(|| {
154 format!(
155 "Failed to copy template directory '{}' to '{}'",
156 template_path.display(),
157 data_path.display()
158 )
159 })?;
160
161 let result = std::fs::create_dir(&hosts_path);
163 if let Err(e) = result {
164 if e.kind() != std::io::ErrorKind::AlreadyExists {
165 return Err(e).context(format!(
166 "Failed to create hosts directory '{}'",
167 hosts_path.display()
168 ));
169 }
170 }
171 } else {
172 std::fs::create_dir(&data_path).with_context(|| {
174 format!("Failed to create data directory '{}'", data_path.display())
175 })?;
176 std::fs::create_dir(&hosts_path).with_context(|| {
177 format!(
178 "Failed to create hosts directory '{}'",
179 hosts_path.display(),
180 )
181 })?;
182 }
183
184 let config_out_filename = data_path.join("processed-config.yaml");
186 let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
187 format!("Failed to create file '{}'", config_out_filename.display())
188 })?;
189
190 serde_yaml::to_writer(config_out_file, &config).with_context(|| {
191 format!(
192 "Failed to write processed config yaml to file '{}'",
193 config_out_filename.display()
194 )
195 })?;
196
197 let meminfo_file =
198 std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
199
200 let emulate_cpuid = {
202 let supports_rdrand = unsafe { asm_util::cpuid::supports_rdrand() };
204 let supports_rdseed = unsafe { asm_util::cpuid::supports_rdseed() };
205 if !(supports_rdrand || supports_rdseed) {
206 debug!(
208 "No rdrand nor rdseed support. cpuid emulation is unnecessary, so skipping."
209 );
210 false
211 } else {
212 let res = unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 0) };
220 match res {
221 Ok(_) => {
222 unsafe { linux_api::prctl::arch_prctl(ArchPrctlOp::ARCH_SET_CPUID, 1) }
224 .unwrap_or_else(|e| panic!("Couldn't re-enable cpuid: {e:?}"));
225 debug!(
226 "CPU supports rdrand and/or rdseed, and platform supports intercepting cpuid. Enabling cpuid emulation."
227 );
228 true
229 }
230 Err(e) => {
231 warn!(
232 "CPU appears to support rdrand and/or rdseed, but platform doesn't support emulating cpuid ({e:?}). This may break determinism."
233 );
234 false
235 }
236 }
237 }
238 };
239
240 let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
241 log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
242 native_preemption_config: if config.native_preemption_enabled() {
243 FfiOption::Some(NativePreemptionConfig {
244 native_duration: config.native_preemption_native_interval()?,
245 sim_duration: config.native_preemption_sim_interval(),
246 })
247 } else {
248 FfiOption::None
249 },
250 emulate_cpuid,
251 });
252
253 Ok(Self {
254 manager_config: Some(manager_config),
255 controller,
256 config,
257 raw_frequency,
258 native_tsc_frequency,
259 end_time,
260 data_path,
261 hosts_path,
262 preload_paths: Arc::new(preload_paths),
263 check_fd_usage: true,
264 check_mem_usage: true,
265 meminfo_file,
266 shmem,
267 })
268 }
269
270 pub fn run(
271 mut self,
272 status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
273 ) -> anyhow::Result<u32> {
274 let mut manager_config = self.manager_config.take().unwrap();
275
276 let min_runahead_config: Option<Duration> = self
277 .config
278 .experimental
279 .runahead
280 .flatten()
281 .map(|x| x.into());
282 let min_runahead_config: Option<SimulationTime> =
283 min_runahead_config.map(|x| x.try_into().unwrap());
284
285 let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
286 let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
287 let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
288
289 let smallest_latency = SimulationTime::from_nanos(
290 manager_config
291 .routing_info
292 .get_smallest_latency_ns()
293 .unwrap(),
294 );
295
296 let parallelism: usize = match self.config.general.parallelism.unwrap() {
297 0 => {
298 let cores = cpu::count_physical_cores().try_into().unwrap();
299 log::info!("The parallelism option was 0, so using parallelism={cores}");
300 cores
301 }
302 x => x.try_into().unwrap(),
303 };
304
305 let mut dns_builder = DnsBuilder::new();
307
308 let host_init: Vec<(&HostInfo, HostId)> = manager_config
310 .hosts
311 .iter()
312 .enumerate()
313 .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
314 .collect();
315
316 for (info, id) in &host_init {
317 let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
319 unreachable!("IPv6 not supported");
320 };
321
322 dns_builder
324 .register(*id, addr, info.name.clone())
325 .with_context(|| {
326 format!(
327 "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
328 *id, addr, info.name
329 )
330 })?;
331 }
332
333 let dns = dns_builder.into_dns()?;
335
336 let mut hosts: Vec<_> = host_init
342 .iter()
343 .map(|(info, id)| {
344 self.build_host(*id, info)
345 .with_context(|| format!("Failed to build host '{}'", info.name))
346 })
347 .collect::<anyhow::Result<_>>()?;
348
349 hosts.shuffle(&mut manager_config.random);
351
352 let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
353
354 let cpu_iter =
356 std::iter::from_fn(|| {
357 Some(use_cpu_pinning.then(|| {
359 u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
360 }))
361 });
362
363 let parallelism = std::cmp::min(parallelism, hosts.len());
366
367 let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
369 if cpus[0].is_some() {
370 log::debug!("Pinning to cpus: {:?}", cpus);
371 assert!(cpus.iter().all(|x| x.is_some()));
372 } else {
373 log::debug!("Not pinning to CPUs");
374 assert!(cpus.iter().all(|x| x.is_none()));
375 }
376 assert_eq!(cpus.len(), parallelism);
377
378 worker::WORKER_SHARED
380 .borrow_mut()
381 .replace(worker::WorkerShared {
382 ip_assignment: manager_config.ip_assignment,
383 routing_info: manager_config.routing_info,
384 host_bandwidths: manager_config.host_bandwidths,
385 dns,
387 num_plugin_errors: AtomicU32::new(0),
388 status_logger_state: status_logger_state.map(Arc::clone),
390 runahead: Runahead::new(
391 self.config.experimental.use_dynamic_runahead.unwrap(),
392 smallest_latency,
393 min_runahead_config,
394 ),
395 child_pid_watcher: ChildPidWatcher::new(),
396 event_queues: hosts
397 .iter()
398 .map(|x| (x.id(), x.event_queue().clone()))
399 .collect(),
400 bootstrap_end_time,
401 sim_end_time: self.end_time,
402 });
403
404 {
406 let mut scheduler = match self.config.experimental.scheduler.unwrap() {
407 configuration::Scheduler::ThreadPerHost => {
408 std::thread_local! {
409 static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
411 }
412 Scheduler::ThreadPerHost(ThreadPerHostSched::new(
413 &cpus,
414 &SCHED_HOST_STORAGE,
415 hosts,
416 ))
417 }
418 configuration::Scheduler::ThreadPerCore => {
419 Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
420 &cpus,
421 hosts,
422 self.config.experimental.use_worker_spinning.unwrap(),
423 ))
424 }
425 };
426
427 scheduler.scope(|s| {
429 s.run(|thread_id| {
430 worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
431 });
432 });
433
434 let mut window = Some((
436 EmulatedTime::SIMULATION_START,
437 EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
438 ));
439
440 let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
443 vec![AtomicRefCell::new(None); scheduler.parallelism()];
444
445 let heartbeat_interval = self
447 .config
448 .general
449 .heartbeat_interval
450 .flatten()
451 .map(|x| Duration::from(x).try_into().unwrap());
452
453 let mut last_heartbeat = EmulatedTime::SIMULATION_START;
454 let mut time_of_last_usage_check = std::time::Instant::now();
455
456 while let Some((window_start, window_end)) = window {
458 let display_time = std::cmp::min(window_start, window_end);
460 worker::WORKER_SHARED
461 .borrow()
462 .as_ref()
463 .unwrap()
464 .update_status_logger(|state| {
465 state.current = display_time;
466 });
467
468 scheduler.scope(|s| {
470 s.run_with_data(
472 &thread_next_event_times,
473 move |_, hosts, next_event_time| {
476 let mut next_event_time = next_event_time.borrow_mut();
477
478 worker::Worker::reset_next_event_time();
479 worker::Worker::set_round_end_time(window_end);
480
481 for_each_host(hosts, |host| {
482 let host_next_event_time = {
483 host.lock_shmem();
484 host.execute(window_end);
485 let host_next_event_time = host.next_event_time();
486 host.unlock_shmem();
487 host_next_event_time
488 };
489 *next_event_time = [*next_event_time, host_next_event_time]
490 .into_iter()
491 .flatten() .reduce(std::cmp::min);
493 });
494
495 let packet_next_event_time = worker::Worker::get_next_event_time();
496
497 *next_event_time = [*next_event_time, packet_next_event_time]
498 .into_iter()
499 .flatten() .reduce(std::cmp::min);
501 },
502 );
503
504 if let Some(heartbeat_interval) = heartbeat_interval {
506 if window_start > last_heartbeat + heartbeat_interval {
507 last_heartbeat = window_start;
508 self.log_heartbeat(window_start);
509 }
510 }
511
512 let current_time = std::time::Instant::now();
514 if current_time.duration_since(time_of_last_usage_check)
515 > Duration::from_secs(30)
516 {
517 time_of_last_usage_check = current_time;
518 self.check_resource_usage();
519 }
520 });
521
522 let min_next_event_time = thread_next_event_times
525 .iter()
526 .filter_map(|x| x.borrow_mut().take())
528 .reduce(std::cmp::min)
529 .unwrap_or(EmulatedTime::MAX);
530
531 log::debug!(
532 "Finished execution window [{}--{}], next event at {}",
533 (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
534 (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
535 (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
536 );
537
538 window = self
541 .controller
542 .manager_finished_current_round(min_next_event_time);
543 }
544
545 scheduler.scope(|s| {
546 s.run_with_hosts(move |_, hosts| {
547 for_each_host(hosts, |host| {
548 worker::Worker::set_current_time(self.end_time);
549 host.free_all_applications();
550 host.shutdown();
551 worker::Worker::clear_current_time();
552 });
553 });
554 });
555
556 scheduler.scope(|s| {
558 s.run(|_| {
559 worker::Worker::add_to_global_sim_stats();
560 });
561 });
562
563 scheduler.join();
564 }
565
566 worker::WORKER_SHARED
568 .borrow()
569 .as_ref()
570 .unwrap()
571 .update_status_logger(|state| {
572 state.current = self.end_time;
573 });
574
575 let num_plugin_errors = worker::WORKER_SHARED
576 .borrow()
577 .as_ref()
578 .unwrap()
579 .plugin_error_count();
580
581 worker::WORKER_SHARED.borrow_mut().take();
584
585 worker::with_global_sim_stats(|stats| {
589 if self.config.experimental.use_syscall_counters.unwrap() {
590 log::info!(
591 "Global syscall counts: {}",
592 stats.syscall_counts.lock().unwrap()
593 );
594 }
595 if self.config.experimental.use_object_counters.unwrap() {
596 let alloc_counts = stats.alloc_counts.lock().unwrap();
597 let dealloc_counts = stats.dealloc_counts.lock().unwrap();
598 log::info!("Global allocated object counts: {}", alloc_counts);
599 log::info!("Global deallocated object counts: {}", dealloc_counts);
600
601 if *alloc_counts == *dealloc_counts {
602 log::info!("We allocated and deallocated the same number of objects :)");
603 } else {
604 log::warn!("Memory leak detected");
606 }
607 }
608
609 let stats_filename = self.data_path.clone().join("sim-stats.json");
610 sim_stats::write_stats_to_file(&stats_filename, stats)
611 })?;
612
613 Ok(num_plugin_errors)
614 }
615
616 fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
617 let hostname = CString::new(&*host_info.name).unwrap();
618
619 let host = {
621 let params = HostParameters {
622 id: host_id,
624 cpu_frequency: self.raw_frequency,
626 node_seed: host_info.seed,
627 hostname,
628 node_id: host_info.network_node_id,
629 ip_addr: match host_info.ip_addr.unwrap() {
630 std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
631 std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
633 },
634 sim_end_time: self.end_time,
635 requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
636 requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
637 cpu_threshold: host_info.cpu_threshold,
638 cpu_precision: host_info.cpu_precision,
639 log_level: host_info
640 .log_level
641 .map(|x| x.to_c_loglevel())
642 .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
643 pcap_config: host_info.pcap_config,
644 qdisc: host_info.qdisc,
645 init_sock_recv_buf_size: host_info.recv_buf_size,
646 autotune_recv_buf: host_info.autotune_recv_buf,
647 init_sock_send_buf_size: host_info.send_buf_size,
648 autotune_send_buf: host_info.autotune_send_buf,
649 native_tsc_frequency: self.native_tsc_frequency,
650 model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
651 max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
652 unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
653 unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
654 strace_logging_options: self.config.strace_logging_mode(),
655 shim_log_level: host_info
656 .log_level
657 .unwrap_or_else(|| self.config.general.log_level.unwrap())
658 .to_c_loglevel(),
659 use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
660 use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
661 use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
662 };
663
664 Box::new(Host::new(
665 params,
666 &self.hosts_path,
667 self.raw_frequency,
668 self.shmem(),
669 self.preload_paths.clone(),
670 ))
671 };
672
673 host.lock_shmem();
674
675 for proc in &host_info.processes {
676 let plugin_path =
677 CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
678 let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
679 let pause_for_debugging = host_info.pause_for_debugging;
680
681 let argv: Vec<CString> = proc
682 .args
683 .iter()
684 .map(|x| CString::new(x.as_bytes()).unwrap())
685 .collect();
686
687 let envv: Vec<CString> = proc
688 .env
689 .clone()
690 .into_iter()
691 .map(|(x, y)| {
692 let mut x: OsString = String::from(x).into();
693 x.push("=");
694 x.push(y);
695 CString::new(x.as_bytes()).unwrap()
696 })
697 .collect();
698
699 host.continue_execution_timer();
700
701 host.add_application(
702 proc.start_time,
703 proc.shutdown_time,
704 proc.shutdown_signal,
705 plugin_name,
706 plugin_path,
707 argv,
708 envv,
709 pause_for_debugging,
710 proc.expected_final_state,
711 );
712
713 host.stop_execution_timer();
714 }
715
716 host.unlock_shmem();
717
718 Ok(host)
719 }
720
721 fn log_heartbeat(&mut self, now: EmulatedTime) {
722 let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
723 if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
724 let err = nix::errno::Errno::last();
725 log::warn!("Unable to get shadow's resource usage: {}", err);
726 return;
727 }
728
729 let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
731
732 let max_memory = (resources.ru_maxrss as f64) / 1048576.0; let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
735 let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
736
737 log::info!(
741 "Process resource usage at simtime {} reported by getrusage(): \
742 ru_maxrss={:.03} GiB, \
743 ru_utime={:.03} minutes, \
744 ru_stime={:.03} minutes, \
745 ru_nvcsw={}, \
746 ru_nivcsw={}",
747 (now - EmulatedTime::SIMULATION_START).as_nanos(),
748 max_memory,
749 user_time_minutes,
750 system_time_minutes,
751 resources.ru_nvcsw,
752 resources.ru_nivcsw,
753 );
754
755 log::info!(
759 "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
760 (now - EmulatedTime::SIMULATION_START).as_nanos(),
761 serde_json::to_string(&mem_info).unwrap(),
762 );
763 }
764
765 fn check_resource_usage(&mut self) {
766 if self.check_fd_usage {
767 match self.fd_usage() {
768 Ok((usage, limit)) if usage > limit * 90 / 100 => {
770 log::warn!(
771 "Using more than 90% ({usage}/{limit}) of available file descriptors"
772 );
773 self.check_fd_usage = false;
774 }
775 Err(e) => {
776 log::warn!("Unable to check fd usage: {e}");
777 self.check_fd_usage = false;
778 }
779 Ok(_) => {}
780 }
781 }
782
783 if self.check_mem_usage {
784 match self.memory_remaining() {
785 Ok(remaining) if remaining < 500 * 1024 * 1024 => {
787 log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
788 self.check_mem_usage = false;
789 }
790 Err(e) => {
791 log::warn!("Unable to check memory usage: {e}");
792 self.check_mem_usage = false;
793 }
794 Ok(_) => {}
795 }
796 }
797 }
798
799 fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
801 let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
802
803 let mut fd_count: u64 = 0;
804 for entry in dir {
805 entry.context("Failed to read entry in '/proc/self/fd'")?;
807 fd_count += 1;
808 }
809
810 let (soft_limit, _) =
811 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
812 .context("Failed to get the fd limit")?;
813
814 Ok((fd_count, soft_limit))
815 }
816
817 fn memory_remaining(&mut self) -> anyhow::Result<u64> {
819 let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
820 .context("Failed to get the page size")?
821 .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
822
823 let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
824 .context("Failed to get the number of available pages of physical memory")?
825 .ok_or_else(|| {
826 anyhow::anyhow!(
827 "Failed to get the number of available pages of physical memory (no errno)"
828 )
829 })?;
830
831 let page_size: u64 = page_size.try_into().unwrap();
832 let avl_pages: u64 = avl_pages.try_into().unwrap();
833
834 Ok(page_size * avl_pages)
835 }
836
837 pub fn shmem(&self) -> &ShMemBlock<ManagerShmem> {
838 &self.shmem
839 }
840}
841
842pub struct ManagerConfig {
843 pub random: Xoshiro256PlusPlus,
845
846 pub ip_assignment: IpAssignment<u32>,
848
849 pub routing_info: RoutingInfo<u32>,
851
852 pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
854
855 pub hosts: Vec<HostInfo>,
857}
858
859fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
861 host_iter.for_each(|host| {
862 worker::Worker::set_active_host(host);
863 worker::Worker::with_active_host(|host| {
864 f(host);
865 })
866 .unwrap();
867 worker::Worker::take_active_host()
868 });
869}
870
871fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
873 const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
874 let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
875 Ok(khz * 1000)
876}
877
878fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
879 let libname_c = CString::new(libname).unwrap();
880 let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
881
882 let libpath = if !libpath_c.is_null() {
884 let libpath = unsafe { CStr::from_ptr(libpath_c) };
885 let libpath = OsStr::from_bytes(libpath.to_bytes());
886 Some(PathBuf::from(libpath.to_os_string()))
887 } else {
888 None
889 };
890
891 unsafe { libc::free(libpath_c as *mut libc::c_void) };
892
893 let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
894
895 let bytes = libpath.as_os_str().as_bytes();
896 if bytes.iter().any(|c| *c == b' ' || *c == b':') {
897 anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
899 }
900
901 log::debug!(
902 "Found required preload library {} at path {}",
903 libname,
904 libpath.display(),
905 );
906
907 Ok(libpath)
908}