1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ffi::{CStr, CString, OsStr, OsString};
4use std::os::unix::ffi::OsStrExt;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU32;
8use std::time::Duration;
9
10use anyhow::Context;
11use atomic_refcell::AtomicRefCell;
12use log::warn;
13use rand::seq::SliceRandom;
14use rand_xoshiro::Xoshiro256PlusPlus;
15use scheduler::thread_per_core::ThreadPerCoreSched;
16use scheduler::thread_per_host::ThreadPerHostSched;
17use scheduler::{HostIter, Scheduler};
18use shadow_shim_helper_rs::HostId;
19use shadow_shim_helper_rs::emulated_time::EmulatedTime;
20use shadow_shim_helper_rs::option::FfiOption;
21use shadow_shim_helper_rs::shim_shmem::{ManagerShmem, NativePreemptionConfig};
22use shadow_shim_helper_rs::simulation_time::SimulationTime;
23use shadow_shmem::allocator::ShMemBlock;
24
25use crate::core::configuration::{self, ConfigOptions, Flatten};
26use crate::core::controller::{Controller, ShadowStatusBarState, SimController};
27use crate::core::cpu;
28use crate::core::resource_usage;
29use crate::core::runahead::Runahead;
30use crate::core::sim_config::{Bandwidth, HostInfo};
31use crate::core::sim_stats;
32use crate::core::worker;
33use crate::cshadow as c;
34use crate::host::host::{Host, HostParameters};
35use crate::network::dns::DnsBuilder;
36use crate::network::graph::{IpAssignment, RoutingInfo};
37use crate::utility;
38use crate::utility::childpid_watcher::ChildPidWatcher;
39use crate::utility::status_bar::Status;
40
41pub struct Manager<'a> {
42 manager_config: Option<ManagerConfig>,
43 controller: &'a Controller<'a>,
44 config: &'a ConfigOptions,
45
46 raw_frequency: u64,
47 native_tsc_frequency: u64,
48 end_time: EmulatedTime,
49
50 data_path: PathBuf,
51 hosts_path: PathBuf,
52
53 preload_paths: Arc<Vec<PathBuf>>,
54
55 check_fd_usage: bool,
56 check_mem_usage: bool,
57
58 meminfo_file: std::fs::File,
59 shmem: ShMemBlock<'static, ManagerShmem>,
60}
61
62impl<'a> Manager<'a> {
63 pub fn new(
64 manager_config: ManagerConfig,
65 controller: &'a Controller<'a>,
66 config: &'a ConfigOptions,
67 end_time: EmulatedTime,
68 ) -> anyhow::Result<Self> {
69 let raw_frequency = get_raw_cpu_frequency_hz().unwrap_or_else(|e| {
71 let default_freq = 2_500_000_000; log::debug!(
73 "Failed to get raw CPU frequency, using {} Hz instead: {}",
74 default_freq,
75 e
76 );
77 default_freq
78 });
79
80 let native_tsc_frequency = if let Some(f) = shadow_tsc::Tsc::native_cycles_per_second() {
81 f
82 } else {
83 warn!(
84 "Couldn't find native TSC frequency. Emulated rdtsc may use a rate different than managed code expects"
85 );
86 raw_frequency
87 };
88
89 let mut preload_paths = Vec::new();
90
91 const PRELOAD_INJECTOR_LIB: &str = "libshadow_injector.so";
94 preload_paths.push(
95 get_required_preload_path(PRELOAD_INJECTOR_LIB).with_context(|| {
96 format!("Failed to get path to preload library '{PRELOAD_INJECTOR_LIB}'")
97 })?,
98 );
99
100 const PRELOAD_LIBC_LIB: &str = "libshadow_libc.so";
102 if config.experimental.use_preload_libc.unwrap() {
103 let path = get_required_preload_path(PRELOAD_LIBC_LIB).with_context(|| {
104 format!("Failed to get path to preload library '{PRELOAD_LIBC_LIB}'")
105 })?;
106 preload_paths.push(path);
107 } else {
108 log::info!("Preloading the libc library is disabled");
109 };
110
111 const PRELOAD_OPENSSL_RNG_LIB: &str = "libshadow_openssl_rng.so";
113 if config.experimental.use_preload_openssl_rng.unwrap() {
114 let path = get_required_preload_path(PRELOAD_OPENSSL_RNG_LIB).with_context(|| {
115 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_RNG_LIB}'")
116 })?;
117 preload_paths.push(path);
118 } else {
119 log::info!("Preloading the openssl rng library is disabled");
120 };
121
122 const PRELOAD_OPENSSL_CRYPTO_LIB: &str = "libshadow_openssl_crypto.so";
124 if config.experimental.use_preload_openssl_crypto.unwrap() {
125 let path =
126 get_required_preload_path(PRELOAD_OPENSSL_CRYPTO_LIB).with_context(|| {
127 format!("Failed to get path to preload library '{PRELOAD_OPENSSL_CRYPTO_LIB}'")
128 })?;
129 preload_paths.push(path);
130 } else {
131 log::info!("Preloading the openssl crypto library is disabled");
132 };
133
134 let cwd = std::env::current_dir()?;
136 let template_path = config
137 .general
138 .template_directory
139 .flatten_ref()
140 .map(|x| cwd.clone().join(x));
141 let data_path = cwd.join(config.general.data_directory.as_ref().unwrap());
142 let hosts_path = data_path.join("hosts");
143
144 if let Some(template_path) = template_path {
145 log::debug!(
146 "Copying template directory '{}' to '{}'",
147 template_path.display(),
148 data_path.display()
149 );
150
151 utility::copy_dir_all(&template_path, &data_path).with_context(|| {
153 format!(
154 "Failed to copy template directory '{}' to '{}'",
155 template_path.display(),
156 data_path.display()
157 )
158 })?;
159
160 let result = std::fs::create_dir(&hosts_path);
162 if let Err(e) = result {
163 if e.kind() != std::io::ErrorKind::AlreadyExists {
164 return Err(e).context(format!(
165 "Failed to create hosts directory '{}'",
166 hosts_path.display()
167 ));
168 }
169 }
170 } else {
171 std::fs::create_dir(&data_path).with_context(|| {
173 format!("Failed to create data directory '{}'", data_path.display())
174 })?;
175 std::fs::create_dir(&hosts_path).with_context(|| {
176 format!(
177 "Failed to create hosts directory '{}'",
178 hosts_path.display(),
179 )
180 })?;
181 }
182
183 let config_out_filename = data_path.join("processed-config.yaml");
185 let config_out_file = std::fs::File::create(&config_out_filename).with_context(|| {
186 format!("Failed to create file '{}'", config_out_filename.display())
187 })?;
188
189 serde_yaml::to_writer(config_out_file, &config).with_context(|| {
190 format!(
191 "Failed to write processed config yaml to file '{}'",
192 config_out_filename.display()
193 )
194 })?;
195
196 let meminfo_file =
197 std::fs::File::open("/proc/meminfo").context("Failed to open '/proc/meminfo'")?;
198
199 let shmem = shadow_shmem::allocator::shmalloc(ManagerShmem {
200 log_start_time_micros: unsafe { c::logger_get_global_start_time_micros() },
201 native_preemption_config: if config.native_preemption_enabled() {
202 FfiOption::Some(NativePreemptionConfig {
203 native_duration: config.native_preemption_native_interval()?,
204 sim_duration: config.native_preemption_sim_interval(),
205 })
206 } else {
207 FfiOption::None
208 },
209 });
210
211 Ok(Self {
212 manager_config: Some(manager_config),
213 controller,
214 config,
215 raw_frequency,
216 native_tsc_frequency,
217 end_time,
218 data_path,
219 hosts_path,
220 preload_paths: Arc::new(preload_paths),
221 check_fd_usage: true,
222 check_mem_usage: true,
223 meminfo_file,
224 shmem,
225 })
226 }
227
228 pub fn run(
229 mut self,
230 status_logger_state: Option<&Arc<Status<ShadowStatusBarState>>>,
231 ) -> anyhow::Result<u32> {
232 let mut manager_config = self.manager_config.take().unwrap();
233
234 let min_runahead_config: Option<Duration> = self
235 .config
236 .experimental
237 .runahead
238 .flatten()
239 .map(|x| x.into());
240 let min_runahead_config: Option<SimulationTime> =
241 min_runahead_config.map(|x| x.try_into().unwrap());
242
243 let bootstrap_end_time: Duration = self.config.general.bootstrap_end_time.unwrap().into();
244 let bootstrap_end_time: SimulationTime = bootstrap_end_time.try_into().unwrap();
245 let bootstrap_end_time = EmulatedTime::SIMULATION_START + bootstrap_end_time;
246
247 let smallest_latency = SimulationTime::from_nanos(
248 manager_config
249 .routing_info
250 .get_smallest_latency_ns()
251 .unwrap(),
252 );
253
254 let parallelism: usize = match self.config.general.parallelism.unwrap() {
255 0 => {
256 let cores = cpu::count_physical_cores().try_into().unwrap();
257 log::info!("The parallelism option was 0, so using parallelism={cores}");
258 cores
259 }
260 x => x.try_into().unwrap(),
261 };
262
263 let mut dns_builder = DnsBuilder::new();
265
266 let host_init: Vec<(&HostInfo, HostId)> = manager_config
268 .hosts
269 .iter()
270 .enumerate()
271 .map(|(i, info)| (info, HostId::from(u32::try_from(i).unwrap())))
272 .collect();
273
274 for (info, id) in &host_init {
275 let std::net::IpAddr::V4(addr) = info.ip_addr.unwrap() else {
277 unreachable!("IPv6 not supported");
278 };
279
280 dns_builder
282 .register(*id, addr, info.name.clone())
283 .with_context(|| {
284 format!(
285 "Failed to register a host with id='{:?}', addr='{}', and name='{}' in the DNS module",
286 *id, addr, info.name
287 )
288 })?;
289 }
290
291 let dns = dns_builder.into_dns()?;
293
294 let mut hosts: Vec<_> = host_init
300 .iter()
301 .map(|(info, id)| {
302 self.build_host(*id, info)
303 .with_context(|| format!("Failed to build host '{}'", info.name))
304 })
305 .collect::<anyhow::Result<_>>()?;
306
307 hosts.shuffle(&mut manager_config.random);
309
310 let use_cpu_pinning = self.config.experimental.use_cpu_pinning.unwrap();
311
312 let cpu_iter =
314 std::iter::from_fn(|| {
315 Some(use_cpu_pinning.then(|| {
317 u32::try_from(unsafe { c::affinity_getGoodWorkerAffinity() }).unwrap()
318 }))
319 });
320
321 let parallelism = std::cmp::min(parallelism, hosts.len());
324
325 let cpus: Vec<Option<u32>> = cpu_iter.take(parallelism).collect();
327 if cpus[0].is_some() {
328 log::debug!("Pinning to cpus: {:?}", cpus);
329 assert!(cpus.iter().all(|x| x.is_some()));
330 } else {
331 log::debug!("Not pinning to CPUs");
332 assert!(cpus.iter().all(|x| x.is_none()));
333 }
334 assert_eq!(cpus.len(), parallelism);
335
336 worker::WORKER_SHARED
338 .borrow_mut()
339 .replace(worker::WorkerShared {
340 ip_assignment: manager_config.ip_assignment,
341 routing_info: manager_config.routing_info,
342 host_bandwidths: manager_config.host_bandwidths,
343 dns,
345 num_plugin_errors: AtomicU32::new(0),
346 status_logger_state: status_logger_state.map(Arc::clone),
348 runahead: Runahead::new(
349 self.config.experimental.use_dynamic_runahead.unwrap(),
350 smallest_latency,
351 min_runahead_config,
352 ),
353 child_pid_watcher: ChildPidWatcher::new(),
354 event_queues: hosts
355 .iter()
356 .map(|x| (x.id(), x.event_queue().clone()))
357 .collect(),
358 bootstrap_end_time,
359 sim_end_time: self.end_time,
360 });
361
362 {
364 let mut scheduler = match self.config.experimental.scheduler.unwrap() {
365 configuration::Scheduler::ThreadPerHost => {
366 std::thread_local! {
367 static SCHED_HOST_STORAGE: RefCell<Option<Box<Host>>> = const { RefCell::new(None) };
369 }
370 Scheduler::ThreadPerHost(ThreadPerHostSched::new(
371 &cpus,
372 &SCHED_HOST_STORAGE,
373 hosts,
374 ))
375 }
376 configuration::Scheduler::ThreadPerCore => {
377 Scheduler::ThreadPerCore(ThreadPerCoreSched::new(
378 &cpus,
379 hosts,
380 self.config.experimental.use_worker_spinning.unwrap(),
381 ))
382 }
383 };
384
385 scheduler.scope(|s| {
387 s.run(|thread_id| {
388 worker::Worker::new_for_this_thread(worker::WorkerThreadID(thread_id as u32))
389 });
390 });
391
392 let mut window = Some((
394 EmulatedTime::SIMULATION_START,
395 EmulatedTime::SIMULATION_START + SimulationTime::NANOSECOND,
396 ));
397
398 let thread_next_event_times: Vec<AtomicRefCell<Option<EmulatedTime>>> =
401 vec![AtomicRefCell::new(None); scheduler.parallelism()];
402
403 let heartbeat_interval = self
405 .config
406 .general
407 .heartbeat_interval
408 .flatten()
409 .map(|x| Duration::from(x).try_into().unwrap());
410
411 let mut last_heartbeat = EmulatedTime::SIMULATION_START;
412 let mut time_of_last_usage_check = std::time::Instant::now();
413
414 while let Some((window_start, window_end)) = window {
416 let display_time = std::cmp::min(window_start, window_end);
418 worker::WORKER_SHARED
419 .borrow()
420 .as_ref()
421 .unwrap()
422 .update_status_logger(|state| {
423 state.current = display_time;
424 });
425
426 scheduler.scope(|s| {
428 s.run_with_data(
430 &thread_next_event_times,
431 move |_, hosts, next_event_time| {
434 let mut next_event_time = next_event_time.borrow_mut();
435
436 worker::Worker::reset_next_event_time();
437 worker::Worker::set_round_end_time(window_end);
438
439 for_each_host(hosts, |host| {
440 let host_next_event_time = {
441 host.lock_shmem();
442 host.execute(window_end);
443 let host_next_event_time = host.next_event_time();
444 host.unlock_shmem();
445 host_next_event_time
446 };
447 *next_event_time = [*next_event_time, host_next_event_time]
448 .into_iter()
449 .flatten() .reduce(std::cmp::min);
451 });
452
453 let packet_next_event_time = worker::Worker::get_next_event_time();
454
455 *next_event_time = [*next_event_time, packet_next_event_time]
456 .into_iter()
457 .flatten() .reduce(std::cmp::min);
459 },
460 );
461
462 if let Some(heartbeat_interval) = heartbeat_interval {
464 if window_start > last_heartbeat + heartbeat_interval {
465 last_heartbeat = window_start;
466 self.log_heartbeat(window_start);
467 }
468 }
469
470 let current_time = std::time::Instant::now();
472 if current_time.duration_since(time_of_last_usage_check)
473 > Duration::from_secs(30)
474 {
475 time_of_last_usage_check = current_time;
476 self.check_resource_usage();
477 }
478 });
479
480 let min_next_event_time = thread_next_event_times
483 .iter()
484 .filter_map(|x| x.borrow_mut().take())
486 .reduce(std::cmp::min)
487 .unwrap_or(EmulatedTime::MAX);
488
489 log::debug!(
490 "Finished execution window [{}--{}], next event at {}",
491 (window_start - EmulatedTime::SIMULATION_START).as_nanos(),
492 (window_end - EmulatedTime::SIMULATION_START).as_nanos(),
493 (min_next_event_time - EmulatedTime::SIMULATION_START).as_nanos(),
494 );
495
496 window = self
499 .controller
500 .manager_finished_current_round(min_next_event_time);
501 }
502
503 scheduler.scope(|s| {
504 s.run_with_hosts(move |_, hosts| {
505 for_each_host(hosts, |host| {
506 worker::Worker::set_current_time(self.end_time);
507 host.free_all_applications();
508 host.shutdown();
509 worker::Worker::clear_current_time();
510 });
511 });
512 });
513
514 scheduler.scope(|s| {
516 s.run(|_| {
517 worker::Worker::add_to_global_sim_stats();
518 });
519 });
520
521 scheduler.join();
522 }
523
524 worker::WORKER_SHARED
526 .borrow()
527 .as_ref()
528 .unwrap()
529 .update_status_logger(|state| {
530 state.current = self.end_time;
531 });
532
533 let num_plugin_errors = worker::WORKER_SHARED
534 .borrow()
535 .as_ref()
536 .unwrap()
537 .plugin_error_count();
538
539 worker::WORKER_SHARED.borrow_mut().take();
542
543 worker::with_global_sim_stats(|stats| {
547 if self.config.experimental.use_syscall_counters.unwrap() {
548 log::info!(
549 "Global syscall counts: {}",
550 stats.syscall_counts.lock().unwrap()
551 );
552 }
553 if self.config.experimental.use_object_counters.unwrap() {
554 let alloc_counts = stats.alloc_counts.lock().unwrap();
555 let dealloc_counts = stats.dealloc_counts.lock().unwrap();
556 log::info!("Global allocated object counts: {}", alloc_counts);
557 log::info!("Global deallocated object counts: {}", dealloc_counts);
558
559 if *alloc_counts == *dealloc_counts {
560 log::info!("We allocated and deallocated the same number of objects :)");
561 } else {
562 log::warn!("Memory leak detected");
564 }
565 }
566
567 let stats_filename = self.data_path.clone().join("sim-stats.json");
568 sim_stats::write_stats_to_file(&stats_filename, stats)
569 })?;
570
571 Ok(num_plugin_errors)
572 }
573
574 fn build_host(&self, host_id: HostId, host_info: &HostInfo) -> anyhow::Result<Box<Host>> {
575 let hostname = CString::new(&*host_info.name).unwrap();
576
577 let host = {
579 let params = HostParameters {
580 id: host_id,
582 cpu_frequency: self.raw_frequency,
584 node_seed: host_info.seed,
585 hostname,
586 node_id: host_info.network_node_id,
587 ip_addr: match host_info.ip_addr.unwrap() {
588 std::net::IpAddr::V4(ip) => u32::to_be(ip.into()),
589 std::net::IpAddr::V6(_) => unreachable!("IPv6 not supported"),
591 },
592 sim_end_time: self.end_time,
593 requested_bw_down_bits: host_info.bandwidth_down_bits.unwrap(),
594 requested_bw_up_bits: host_info.bandwidth_up_bits.unwrap(),
595 cpu_threshold: host_info.cpu_threshold,
596 cpu_precision: host_info.cpu_precision,
597 log_level: host_info
598 .log_level
599 .map(|x| x.to_c_loglevel())
600 .unwrap_or(logger::_LogLevel_LOGLEVEL_UNSET),
601 pcap_config: host_info.pcap_config,
602 qdisc: host_info.qdisc,
603 init_sock_recv_buf_size: host_info.recv_buf_size,
604 autotune_recv_buf: host_info.autotune_recv_buf,
605 init_sock_send_buf_size: host_info.send_buf_size,
606 autotune_send_buf: host_info.autotune_send_buf,
607 native_tsc_frequency: self.native_tsc_frequency,
608 model_unblocked_syscall_latency: self.config.model_unblocked_syscall_latency(),
609 max_unapplied_cpu_latency: self.config.max_unapplied_cpu_latency(),
610 unblocked_syscall_latency: self.config.unblocked_syscall_latency(),
611 unblocked_vdso_latency: self.config.unblocked_vdso_latency(),
612 strace_logging_options: self.config.strace_logging_mode(),
613 shim_log_level: host_info
614 .log_level
615 .unwrap_or_else(|| self.config.general.log_level.unwrap())
616 .to_c_loglevel(),
617 use_new_tcp: self.config.experimental.use_new_tcp.unwrap(),
618 use_mem_mapper: self.config.experimental.use_memory_manager.unwrap(),
619 use_syscall_counters: self.config.experimental.use_syscall_counters.unwrap(),
620 };
621
622 Box::new(Host::new(
623 params,
624 &self.hosts_path,
625 self.raw_frequency,
626 self.shmem(),
627 self.preload_paths.clone(),
628 ))
629 };
630
631 host.lock_shmem();
632
633 for proc in &host_info.processes {
634 let plugin_path =
635 CString::new(proc.plugin.clone().into_os_string().as_bytes()).unwrap();
636 let plugin_name = CString::new(proc.plugin.file_name().unwrap().as_bytes()).unwrap();
637 let pause_for_debugging = host_info.pause_for_debugging;
638
639 let argv: Vec<CString> = proc
640 .args
641 .iter()
642 .map(|x| CString::new(x.as_bytes()).unwrap())
643 .collect();
644
645 let envv: Vec<CString> = proc
646 .env
647 .clone()
648 .into_iter()
649 .map(|(x, y)| {
650 let mut x: OsString = String::from(x).into();
651 x.push("=");
652 x.push(y);
653 CString::new(x.as_bytes()).unwrap()
654 })
655 .collect();
656
657 host.continue_execution_timer();
658
659 host.add_application(
660 proc.start_time,
661 proc.shutdown_time,
662 proc.shutdown_signal,
663 plugin_name,
664 plugin_path,
665 argv,
666 envv,
667 pause_for_debugging,
668 proc.expected_final_state,
669 );
670
671 host.stop_execution_timer();
672 }
673
674 host.unlock_shmem();
675
676 Ok(host)
677 }
678
679 fn log_heartbeat(&mut self, now: EmulatedTime) {
680 let mut resources: libc::rusage = unsafe { std::mem::zeroed() };
681 if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut resources) } != 0 {
682 let err = nix::errno::Errno::last();
683 log::warn!("Unable to get shadow's resource usage: {}", err);
684 return;
685 }
686
687 let mem_info = resource_usage::meminfo(&mut self.meminfo_file).unwrap();
689
690 let max_memory = (resources.ru_maxrss as f64) / 1048576.0; let user_time_minutes = (resources.ru_utime.tv_sec as f64) / 60.0;
693 let system_time_minutes = (resources.ru_stime.tv_sec as f64) / 60.0;
694
695 log::info!(
699 "Process resource usage at simtime {} reported by getrusage(): \
700 ru_maxrss={:.03} GiB, \
701 ru_utime={:.03} minutes, \
702 ru_stime={:.03} minutes, \
703 ru_nvcsw={}, \
704 ru_nivcsw={}",
705 (now - EmulatedTime::SIMULATION_START).as_nanos(),
706 max_memory,
707 user_time_minutes,
708 system_time_minutes,
709 resources.ru_nvcsw,
710 resources.ru_nivcsw,
711 );
712
713 log::info!(
717 "System memory usage in bytes at simtime {} ns reported by /proc/meminfo: {}",
718 (now - EmulatedTime::SIMULATION_START).as_nanos(),
719 serde_json::to_string(&mem_info).unwrap(),
720 );
721 }
722
723 fn check_resource_usage(&mut self) {
724 if self.check_fd_usage {
725 match self.fd_usage() {
726 Ok((usage, limit)) if usage > limit * 90 / 100 => {
728 log::warn!(
729 "Using more than 90% ({usage}/{limit}) of available file descriptors"
730 );
731 self.check_fd_usage = false;
732 }
733 Err(e) => {
734 log::warn!("Unable to check fd usage: {e}");
735 self.check_fd_usage = false;
736 }
737 Ok(_) => {}
738 }
739 }
740
741 if self.check_mem_usage {
742 match self.memory_remaining() {
743 Ok(remaining) if remaining < 500 * 1024 * 1024 => {
745 log::warn!("Only {} MiB of memory available", remaining / 1024 / 1024);
746 self.check_mem_usage = false;
747 }
748 Err(e) => {
749 log::warn!("Unable to check memory usage: {e}");
750 self.check_mem_usage = false;
751 }
752 Ok(_) => {}
753 }
754 }
755 }
756
757 fn fd_usage(&mut self) -> anyhow::Result<(u64, u64)> {
759 let dir = std::fs::read_dir("/proc/self/fd").context("Failed to open '/proc/self/fd'")?;
760
761 let mut fd_count: u64 = 0;
762 for entry in dir {
763 entry.context("Failed to read entry in '/proc/self/fd'")?;
765 fd_count += 1;
766 }
767
768 let (soft_limit, _) =
769 nix::sys::resource::getrlimit(nix::sys::resource::Resource::RLIMIT_NOFILE)
770 .context("Failed to get the fd limit")?;
771
772 Ok((fd_count, soft_limit))
773 }
774
775 fn memory_remaining(&mut self) -> anyhow::Result<u64> {
777 let page_size = nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
778 .context("Failed to get the page size")?
779 .ok_or_else(|| anyhow::anyhow!("Failed to get the page size (no errno)"))?;
780
781 let avl_pages = nix::unistd::sysconf(nix::unistd::SysconfVar::_AVPHYS_PAGES)
782 .context("Failed to get the number of available pages of physical memory")?
783 .ok_or_else(|| {
784 anyhow::anyhow!(
785 "Failed to get the number of available pages of physical memory (no errno)"
786 )
787 })?;
788
789 let page_size: u64 = page_size.try_into().unwrap();
790 let avl_pages: u64 = avl_pages.try_into().unwrap();
791
792 Ok(page_size * avl_pages)
793 }
794
795 pub fn shmem(&self) -> &ShMemBlock<ManagerShmem> {
796 &self.shmem
797 }
798}
799
800pub struct ManagerConfig {
801 pub random: Xoshiro256PlusPlus,
803
804 pub ip_assignment: IpAssignment<u32>,
806
807 pub routing_info: RoutingInfo<u32>,
809
810 pub host_bandwidths: HashMap<std::net::IpAddr, Bandwidth>,
812
813 pub hosts: Vec<HostInfo>,
815}
816
817fn for_each_host(host_iter: &mut HostIter<Box<Host>>, mut f: impl FnMut(&Host)) {
819 host_iter.for_each(|host| {
820 worker::Worker::set_active_host(host);
821 worker::Worker::with_active_host(|host| {
822 f(host);
823 })
824 .unwrap();
825 worker::Worker::take_active_host()
826 });
827}
828
829fn get_raw_cpu_frequency_hz() -> anyhow::Result<u64> {
831 const CONFIG_CPU_MAX_FREQ_FILE: &str = "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq";
832 let khz: u64 = std::fs::read_to_string(CONFIG_CPU_MAX_FREQ_FILE)?.parse()?;
833 Ok(khz * 1000)
834}
835
836fn get_required_preload_path(libname: &str) -> anyhow::Result<PathBuf> {
837 let libname_c = CString::new(libname).unwrap();
838 let libpath_c = unsafe { c::scanRpathForLib(libname_c.as_ptr()) };
839
840 let libpath = if !libpath_c.is_null() {
842 let libpath = unsafe { CStr::from_ptr(libpath_c) };
843 let libpath = OsStr::from_bytes(libpath.to_bytes());
844 Some(PathBuf::from(libpath.to_os_string()))
845 } else {
846 None
847 };
848
849 unsafe { libc::free(libpath_c as *mut libc::c_void) };
850
851 let libpath = libpath.ok_or_else(|| anyhow::anyhow!(format!("Could not library in rpath")))?;
852
853 let bytes = libpath.as_os_str().as_bytes();
854 if bytes.iter().any(|c| *c == b' ' || *c == b':') {
855 anyhow::bail!("Preload path contains LD_PRELOAD-incompatible characters: {libpath:?}");
857 }
858
859 log::debug!(
860 "Found required preload library {} at path {}",
861 libname,
862 libpath.display(),
863 );
864
865 Ok(libpath)
866}