shadow_rs/core/
controller.rs

1use std::io::IsTerminal;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Context;
6use rand::SeedableRng;
7use rand_xoshiro::Xoshiro256PlusPlus;
8use shadow_shim_helper_rs::emulated_time::EmulatedTime;
9use shadow_shim_helper_rs::simulation_time::SimulationTime;
10use shadow_shim_helper_rs::util::time::TimeParts;
11
12use crate::core::configuration::ConfigOptions;
13use crate::core::manager::{Manager, ManagerConfig};
14use crate::core::sim_config::SimConfig;
15use crate::core::worker;
16use crate::utility::status_bar::{self, StatusBar, StatusPrinter};
17
18pub struct Controller<'a> {
19    // general options and user configuration for the simulation
20    config: &'a ConfigOptions,
21    sim_config: Option<SimConfig>,
22
23    // the simulator should attempt to end immediately after this time
24    end_time: EmulatedTime,
25}
26
27impl<'a> Controller<'a> {
28    pub fn new(sim_config: SimConfig, config: &'a ConfigOptions) -> Self {
29        let end_time: Duration = config.general.stop_time.unwrap().into();
30        let end_time: SimulationTime = end_time.try_into().unwrap();
31        let end_time = EmulatedTime::SIMULATION_START + end_time;
32
33        Self {
34            config,
35            sim_config: Some(sim_config),
36            end_time,
37        }
38    }
39
40    pub fn run(mut self) -> anyhow::Result<()> {
41        let mut sim_config = self.sim_config.take().unwrap();
42
43        let status_logger = self.config.general.progress.unwrap().then(|| {
44            let state = ShadowStatusBarState::new(self.end_time);
45
46            if std::io::stderr().lock().is_terminal() {
47                let redraw_interval = Duration::from_millis(1000);
48                StatusLogger::Bar(StatusBar::new(state, redraw_interval))
49            } else {
50                StatusLogger::Printer(StatusPrinter::new(state))
51            }
52        });
53
54        let manager_config = ManagerConfig {
55            random: Xoshiro256PlusPlus::from_rng(&mut sim_config.random),
56            ip_assignment: sim_config.ip_assignment,
57            routing_info: sim_config.routing_info,
58            host_bandwidths: sim_config.host_bandwidths,
59            hosts: sim_config.hosts,
60        };
61
62        let manager = Manager::new(manager_config, &self, self.config, self.end_time)
63            .context("Failed to initialize the manager")?;
64
65        log::info!("Running simulation");
66        let num_plugin_errors = manager.run(status_logger.as_ref().map(|x| x.status()))?;
67        log::info!("Finished simulation");
68
69        if num_plugin_errors > 0 {
70            return Err(anyhow::anyhow!(
71                "{num_plugin_errors} managed processes in unexpected final state"
72            ));
73        }
74
75        Ok(())
76    }
77}
78
79/// Controller methods that are accessed by the manager.
80pub trait SimController {
81    fn manager_finished_current_round(
82        &self,
83        min_next_event_time: EmulatedTime,
84    ) -> Option<(EmulatedTime, EmulatedTime)>;
85}
86
87impl SimController for Controller<'_> {
88    fn manager_finished_current_round(
89        &self,
90        min_next_event_time: EmulatedTime,
91    ) -> Option<(EmulatedTime, EmulatedTime)> {
92        // TODO: once we get multiple managers, we have to block them here until they have all
93        // notified us that they are finished
94
95        let runahead = worker::WORKER_SHARED
96            .borrow()
97            .as_ref()
98            .unwrap()
99            .runahead
100            .get();
101        assert_ne!(runahead, SimulationTime::ZERO);
102
103        let new_start = min_next_event_time;
104
105        // update the new window end as one interval past the new window start, making sure we don't
106        // run over the experiment end time
107        let new_end = new_start.checked_add(runahead).unwrap_or(EmulatedTime::MAX);
108        let new_end = std::cmp::min(new_end, self.end_time);
109
110        let continue_running = new_start < new_end;
111        continue_running.then_some((new_start, new_end))
112    }
113}
114
115#[derive(Debug)]
116pub struct ShadowStatusBarState {
117    start: std::time::Instant,
118    pub current: EmulatedTime,
119    end: EmulatedTime,
120    pub num_failed_processes: u32,
121}
122
123impl std::fmt::Display for ShadowStatusBarState {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        let sim_current = self.current.duration_since(&EmulatedTime::SIMULATION_START);
126        let sim_end = self.end.duration_since(&EmulatedTime::SIMULATION_START);
127        let frac = sim_current.as_millis() as f32 / sim_end.as_millis() as f32;
128
129        let sim_current = TimeParts::from_nanos(sim_current.as_nanos());
130        let sim_end = TimeParts::from_nanos(sim_end.as_nanos());
131        let realtime = TimeParts::from_nanos(self.start.elapsed().as_nanos());
132
133        write!(
134            f,
135            "{}% — simulated: {}/{}, realtime: {}, processes failed: {}",
136            (frac * 100.0).round() as i8,
137            sim_current.fmt_hr_min_sec_milli(),
138            sim_end.fmt_hr_min_sec(),
139            realtime.fmt_hr_min_sec(),
140            self.num_failed_processes,
141        )
142    }
143}
144
145impl ShadowStatusBarState {
146    pub fn new(end: EmulatedTime) -> Self {
147        Self {
148            start: std::time::Instant::now(),
149            current: EmulatedTime::SIMULATION_START,
150            end,
151            num_failed_processes: 0,
152        }
153    }
154}
155
156enum StatusLogger<T: 'static + status_bar::StatusBarState> {
157    Printer(StatusPrinter<T>),
158    Bar(StatusBar<T>),
159}
160
161impl<T: 'static + status_bar::StatusBarState> StatusLogger<T> {
162    pub fn status(&self) -> &Arc<status_bar::Status<T>> {
163        match self {
164            Self::Printer(x) => x.status(),
165            Self::Bar(x) => x.status(),
166        }
167    }
168}