scheduler/
lib.rs

1//! Scheduler for Shadow discrete-event simulations.
2//!
3//! In Shadow, each host has a queue of events it must process, and within a given scheduling round
4//! the host can process these events independently of all other hosts. This means that Shadow can
5//! process each host in parallel.
6//!
7//! For a given list of hosts, the scheduler must tell each host to run and process its events. This
8//! must occur in parallel with minimal overhead. With a typical thread pool you might create a new
9//! task for each host and run all of the tasks on the thread pool, but this is too slow for Shadow
10//! and results in a huge runtime performance loss (simulation run time increases by over 10x). Most
11//! thread pools also don't have a method of specifying which task (and therefore which host) runs
12//! on which CPU core, which is an important performance optimization on NUMA architectures.
13//!
14//! The scheduler in this library uses a thread pool optimized for running the same task across all
15//! threads. This means that the scheduler takes a single function/closure and runs it on each
16//! thread simultaneously (and sometimes repeatedly) until all of the hosts have been processed. The
17//! implementation details depend on which scheduler is in use ( [`ThreadPerCoreSched`] or
18//! [`ThreadPerHostSched`]), but all schedulers share a common interface so that they can easily be
19//! switched out.
20//!
21//! The [`Scheduler`] provides a simple wrapper to make it easier to support both schedulers, which
22//! is useful if you want to choose one at runtime. The schedulers use a "[scoped
23//! threads][std::thread::scope]" design to simplify the calling code. This helps the calling code
24//! share data with the scheduler without requiring the caller to use locking or "unsafe" to do so.
25//!
26//! ```
27//! # use scheduler::thread_per_core::ThreadPerCoreSched;
28//! # use std::sync::atomic::{AtomicU32, Ordering};
29//! # #[derive(Debug)]
30//! # struct Host(u16);
31//! # impl Host {
32//! #     pub fn new(id: u16) -> Self { Self(id) }
33//! #     pub fn id(&self) -> u16 { self.0 }
34//! #     pub fn run_events(&mut self) {}
35//! # }
36//! // a simulation with three hosts
37//! let hosts = [Host::new(0), Host::new(1), Host::new(2)];
38//!
39//! // a scheduler with two threads (no cpu pinning) and three hosts
40//! let mut sched: ThreadPerCoreSched<Host> =
41//!     ThreadPerCoreSched::new(&[None, None], hosts, false);
42//!
43//! // the counter is owned by this main thread with a non-static lifetime, but
44//! // because of the "scoped threads" design it can be accessed by the task in
45//! // the scheduler's threads
46//! let counter = AtomicU32::new(0);
47//!
48//! // run one round of the scheduler
49//! sched.scope(|s| {
50//!     s.run_with_hosts(|thread_idx, hosts| {
51//!         hosts.for_each(|mut host| {
52//!             println!("Running host {} on thread {thread_idx}", host.id());
53//!             host.run_events();
54//!             counter.fetch_add(1, Ordering::Relaxed);
55//!             host
56//!         });
57//!     });
58//!
59//!     // we can do other processing here in the main thread while we wait for the
60//!     // above task to finish running
61//!     println!("Waiting for the task to finish on all threads");
62//! });
63//!
64//! println!("Finished processing the hosts");
65//!
66//! // the `counter.fetch_add(1)` was run once for each host
67//! assert_eq!(counter.load(Ordering::Relaxed), 3);
68//!
69//! // we're done with the scheduler, so join all of its threads
70//! sched.join();
71//! ```
72//!
73//! The [`ThreadPerCoreSched`] scheduler is generally much faster and should be preferred over the
74//! [`ThreadPerHostSched`] scheduler. If no one finds a situation where the `ThreadPerHostSched` is
75//! faster, then it should probably be removed sometime in the future.
76//!
77//! It's probably good to [`box`][Box] the host since the schedulers move the host frequently, and it's
78//! faster to move a pointer than the entire host object.
79//!
80//! Unsafe code should only be written in the thread pools. The schedulers themselves should be
81//! written in only safe code using the safe interfaces provided by the thread pools. If new
82//! features are needed in the scheduler, it's recommended to try to add them to the scheduler
83//! itself and not modify any of the thread pools. The thread pools are complicated and have
84//! delicate lifetime [sub-typing/variance][variance] handling, which is easy to break and would
85//! enable the user of the scheduler to invoke undefined behaviour.
86//!
87//! [variance]: https://doc.rust-lang.org/nomicon/subtyping.html
88//!
89//! If the scheduler uses CPU pinning, the task can get the CPU its pinned to using
90//! [`core_affinity`].
91
92// https://github.com/rust-lang/rfcs/blob/master/text/2585-unsafe-block-in-unsafe-fn.md
93#![deny(unsafe_op_in_unsafe_fn)]
94
95pub mod thread_per_core;
96pub mod thread_per_host;
97
98mod logical_processor;
99mod pools;
100mod sync;
101
102use std::cell::Cell;
103
104#[cfg(doc)]
105use {thread_per_core::ThreadPerCoreSched, thread_per_host::ThreadPerHostSched};
106
107// any scheduler implementation can read/write the thread-local directly, but external modules can
108// only read it using `core_affinity()`
109
110std::thread_local! {
111    /// The core affinity of the current thread, as set by the active scheduler.
112    static CORE_AFFINITY: Cell<Option<u32>> = const { Cell::new(None) };
113}
114
115/// Get the core affinity of the current thread, as set by the active scheduler. Will be `None` if
116/// the scheduler is not using CPU pinning, or if called from a thread not owned by the scheduler.
117pub fn core_affinity() -> Option<u32> {
118    CORE_AFFINITY.with(|x| x.get())
119}
120
121// the enum supports hosts that satisfy the trait bounds of each scheduler variant
122pub trait Host: thread_per_core::Host + thread_per_host::Host {}
123impl<T> Host for T where T: thread_per_core::Host + thread_per_host::Host {}
124
125/// A wrapper for different host schedulers. It would have been nice to make this a trait, but would
126/// require support for GATs.
127pub enum Scheduler<HostType: Host> {
128    ThreadPerHost(thread_per_host::ThreadPerHostSched<HostType>),
129    ThreadPerCore(thread_per_core::ThreadPerCoreSched<HostType>),
130}
131
132impl<HostType: Host> Scheduler<HostType> {
133    /// The maximum number of threads that will ever be run in parallel. The number of threads
134    /// created by the scheduler may be higher.
135    pub fn parallelism(&self) -> usize {
136        match self {
137            Self::ThreadPerHost(sched) => sched.parallelism(),
138            Self::ThreadPerCore(sched) => sched.parallelism(),
139        }
140    }
141
142    /// Create a scope for any task run on the scheduler. The current thread will block at the end
143    /// of the scope until the task has completed.
144    pub fn scope<'scope>(
145        &'scope mut self,
146        f: impl for<'a, 'b> FnOnce(SchedulerScope<'a, 'b, 'scope, HostType>) + 'scope,
147    ) {
148        match self {
149            Self::ThreadPerHost(sched) => sched.scope(move |s| f(SchedulerScope::ThreadPerHost(s))),
150            Self::ThreadPerCore(sched) => sched.scope(move |s| f(SchedulerScope::ThreadPerCore(s))),
151        }
152    }
153
154    /// Join all threads started by the scheduler.
155    pub fn join(self) {
156        match self {
157            Self::ThreadPerHost(sched) => sched.join(),
158            Self::ThreadPerCore(sched) => sched.join(),
159        }
160    }
161}
162
163/// A scope for any task run on the scheduler.
164pub enum SchedulerScope<'sched, 'pool, 'scope, HostType: Host> {
165    ThreadPerHost(thread_per_host::SchedulerScope<'pool, 'scope, HostType>),
166    ThreadPerCore(thread_per_core::SchedulerScope<'sched, 'pool, 'scope, HostType>),
167}
168
169// there are multiple named lifetimes, so let's just be explicit about them rather than hide them
170#[allow(clippy::needless_lifetimes)]
171impl<'sched, 'pool, 'scope, HostType: Host> SchedulerScope<'sched, 'pool, 'scope, HostType> {
172    /// Run the closure on all threads. The closure is given an index of the currently running
173    /// thread.
174    pub fn run(self, f: impl Fn(usize) + Sync + Send + 'scope) {
175        match self {
176            Self::ThreadPerHost(scope) => scope.run(f),
177            Self::ThreadPerCore(scope) => scope.run(f),
178        }
179    }
180
181    /// Run the closure on all threads. The closure is given an index of the currently running
182    /// thread and a host iterator.
183    ///
184    /// The closure must iterate over the provided `HostIter` to completion (until `next()` returns
185    /// `None`), otherwise this may panic. The host iterator is not a real [`std::iter::Iterator`],
186    /// but rather a fake iterator that behaves like a streaming iterator.
187    pub fn run_with_hosts(self, f: impl Fn(usize, &mut HostIter<HostType>) + Send + Sync + 'scope) {
188        match self {
189            Self::ThreadPerHost(scope) => scope.run_with_hosts(move |idx, iter| {
190                let mut iter = HostIter::ThreadPerHost(iter);
191                f(idx, &mut iter)
192            }),
193            Self::ThreadPerCore(scope) => scope.run_with_hosts(move |idx, iter| {
194                let mut iter = HostIter::ThreadPerCore(iter);
195                f(idx, &mut iter)
196            }),
197        }
198    }
199
200    /// Run the closure on all threads. The closure is given an index of the currently running
201    /// thread, a host iterator, and an element of `data`.
202    ///
203    /// The closure must iterate over the provided `HostIter` to completion (until `next()` returns
204    /// `None`), otherwise this may panic. The host iterator is not a real [`std::iter::Iterator`],
205    /// but rather a fake iterator that behaves like a streaming iterator.
206    ///
207    /// Each call of the closure will be given an element of `data`, and this element will not be
208    /// given to any other thread while this closure is running, which means you should not expect
209    /// any contention on this element if using interior mutability.  The provided slice **must**
210    /// have a length of at least [`Scheduler::parallelism`]. If the data needs to be initialized,
211    /// it should be initialized before calling this function and not at the beginning of the
212    /// closure. The element may be given to multiple threads, but never two threads at the same
213    /// time.
214    pub fn run_with_data<T>(
215        self,
216        data: &'scope [T],
217        f: impl Fn(usize, &mut HostIter<HostType>, &T) + Send + Sync + 'scope,
218    ) where
219        T: Sync,
220    {
221        match self {
222            Self::ThreadPerHost(scope) => scope.run_with_data(data, move |idx, iter, elem| {
223                let mut iter = HostIter::ThreadPerHost(iter);
224                f(idx, &mut iter, elem)
225            }),
226            Self::ThreadPerCore(scope) => scope.run_with_data(data, move |idx, iter, elem| {
227                let mut iter = HostIter::ThreadPerCore(iter);
228                f(idx, &mut iter, elem)
229            }),
230        }
231    }
232}
233
234/// Supports iterating over all hosts assigned to this thread.
235pub enum HostIter<'a, 'b, HostType: Host> {
236    ThreadPerHost(&'a mut thread_per_host::HostIter<HostType>),
237    ThreadPerCore(&'a mut thread_per_core::HostIter<'b, HostType>),
238}
239
240impl<HostType: Host> HostIter<'_, '_, HostType> {
241    /// For each [`Host`], calls `f` with the host. The `Host` must be returned by the closure. The
242    /// ownership of the `Host` is transferred in and out of the closure rather than using a mutable
243    /// reference since Shadow needs to put the host in a global with `'static` lifetime (the
244    /// worker).
245    pub fn for_each<F>(&mut self, f: F)
246    where
247        F: FnMut(HostType) -> HostType,
248    {
249        match self {
250            Self::ThreadPerHost(x) => x.for_each(f),
251            Self::ThreadPerCore(x) => x.for_each(f),
252        }
253    }
254}