scheduler/lib.rs
1//! Scheduler for Shadow discrete-event simulations.
2//!
3//! In Shadow, each host has a queue of events it must process, and within a given scheduling round
4//! the host can process these events independently of all other hosts. This means that Shadow can
5//! process each host in parallel.
6//!
7//! For a given list of hosts, the scheduler must tell each host to run and process its events. This
8//! must occur in parallel with minimal overhead. With a typical thread pool you might create a new
9//! task for each host and run all of the tasks on the thread pool, but this is too slow for Shadow
10//! and results in a huge runtime performance loss (simulation run time increases by over 10x). Most
11//! thread pools also don't have a method of specifying which task (and therefore which host) runs
12//! on which CPU core, which is an important performance optimization on NUMA architectures.
13//!
14//! The scheduler in this library uses a thread pool optimized for running the same task across all
15//! threads. This means that the scheduler takes a single function/closure and runs it on each
16//! thread simultaneously (and sometimes repeatedly) until all of the hosts have been processed. The
17//! implementation details depend on which scheduler is in use ( [`ThreadPerCoreSched`] or
18//! [`ThreadPerHostSched`]), but all schedulers share a common interface so that they can easily be
19//! switched out.
20//!
21//! The [`Scheduler`] provides a simple wrapper to make it easier to support both schedulers, which
22//! is useful if you want to choose one at runtime. The schedulers use a "[scoped
23//! threads][std::thread::scope]" design to simplify the calling code. This helps the calling code
24//! share data with the scheduler without requiring the caller to use locking or "unsafe" to do so.
25//!
26//! ```
27//! # use scheduler::thread_per_core::ThreadPerCoreSched;
28//! # use std::sync::atomic::{AtomicU32, Ordering};
29//! # #[derive(Debug)]
30//! # struct Host(u16);
31//! # impl Host {
32//! # pub fn new(id: u16) -> Self { Self(id) }
33//! # pub fn id(&self) -> u16 { self.0 }
34//! # pub fn run_events(&mut self) {}
35//! # }
36//! // a simulation with three hosts
37//! let hosts = [Host::new(0), Host::new(1), Host::new(2)];
38//!
39//! // a scheduler with two threads (no cpu pinning) and three hosts
40//! let mut sched: ThreadPerCoreSched<Host> =
41//! ThreadPerCoreSched::new(&[None, None], hosts, false);
42//!
43//! // the counter is owned by this main thread with a non-static lifetime, but
44//! // because of the "scoped threads" design it can be accessed by the task in
45//! // the scheduler's threads
46//! let counter = AtomicU32::new(0);
47//!
48//! // run one round of the scheduler
49//! sched.scope(|s| {
50//! s.run_with_hosts(|thread_idx, hosts| {
51//! hosts.for_each(|mut host| {
52//! println!("Running host {} on thread {thread_idx}", host.id());
53//! host.run_events();
54//! counter.fetch_add(1, Ordering::Relaxed);
55//! host
56//! });
57//! });
58//!
59//! // we can do other processing here in the main thread while we wait for the
60//! // above task to finish running
61//! println!("Waiting for the task to finish on all threads");
62//! });
63//!
64//! println!("Finished processing the hosts");
65//!
66//! // the `counter.fetch_add(1)` was run once for each host
67//! assert_eq!(counter.load(Ordering::Relaxed), 3);
68//!
69//! // we're done with the scheduler, so join all of its threads
70//! sched.join();
71//! ```
72//!
73//! The [`ThreadPerCoreSched`] scheduler is generally much faster and should be preferred over the
74//! [`ThreadPerHostSched`] scheduler. If no one finds a situation where the `ThreadPerHostSched` is
75//! faster, then it should probably be removed sometime in the future.
76//!
77//! It's probably good to [`box`][Box] the host since the schedulers move the host frequently, and it's
78//! faster to move a pointer than the entire host object.
79//!
80//! Unsafe code should only be written in the thread pools. The schedulers themselves should be
81//! written in only safe code using the safe interfaces provided by the thread pools. If new
82//! features are needed in the scheduler, it's recommended to try to add them to the scheduler
83//! itself and not modify any of the thread pools. The thread pools are complicated and have
84//! delicate lifetime [sub-typing/variance][variance] handling, which is easy to break and would
85//! enable the user of the scheduler to invoke undefined behaviour.
86//!
87//! [variance]: https://doc.rust-lang.org/nomicon/subtyping.html
88//!
89//! If the scheduler uses CPU pinning, the task can get the CPU its pinned to using
90//! [`core_affinity`].
91
92// https://github.com/rust-lang/rfcs/blob/master/text/2585-unsafe-block-in-unsafe-fn.md
93#![deny(unsafe_op_in_unsafe_fn)]
94
95pub mod thread_per_core;
96pub mod thread_per_host;
97
98mod logical_processor;
99mod pools;
100mod sync;
101
102use std::cell::Cell;
103
104#[cfg(doc)]
105use {thread_per_core::ThreadPerCoreSched, thread_per_host::ThreadPerHostSched};
106
107// any scheduler implementation can read/write the thread-local directly, but external modules can
108// only read it using `core_affinity()`
109
110std::thread_local! {
111 /// The core affinity of the current thread, as set by the active scheduler.
112 static CORE_AFFINITY: Cell<Option<u32>> = const { Cell::new(None) };
113}
114
115/// Get the core affinity of the current thread, as set by the active scheduler. Will be `None` if
116/// the scheduler is not using CPU pinning, or if called from a thread not owned by the scheduler.
117pub fn core_affinity() -> Option<u32> {
118 CORE_AFFINITY.with(|x| x.get())
119}
120
121// the enum supports hosts that satisfy the trait bounds of each scheduler variant
122pub trait Host: thread_per_core::Host + thread_per_host::Host {}
123impl<T> Host for T where T: thread_per_core::Host + thread_per_host::Host {}
124
125/// A wrapper for different host schedulers. It would have been nice to make this a trait, but would
126/// require support for GATs.
127pub enum Scheduler<HostType: Host> {
128 ThreadPerHost(thread_per_host::ThreadPerHostSched<HostType>),
129 ThreadPerCore(thread_per_core::ThreadPerCoreSched<HostType>),
130}
131
132impl<HostType: Host> Scheduler<HostType> {
133 /// The maximum number of threads that will ever be run in parallel. The number of threads
134 /// created by the scheduler may be higher.
135 pub fn parallelism(&self) -> usize {
136 match self {
137 Self::ThreadPerHost(sched) => sched.parallelism(),
138 Self::ThreadPerCore(sched) => sched.parallelism(),
139 }
140 }
141
142 /// Create a scope for any task run on the scheduler. The current thread will block at the end
143 /// of the scope until the task has completed.
144 pub fn scope<'scope>(
145 &'scope mut self,
146 f: impl for<'a, 'b> FnOnce(SchedulerScope<'a, 'b, 'scope, HostType>) + 'scope,
147 ) {
148 match self {
149 Self::ThreadPerHost(sched) => sched.scope(move |s| f(SchedulerScope::ThreadPerHost(s))),
150 Self::ThreadPerCore(sched) => sched.scope(move |s| f(SchedulerScope::ThreadPerCore(s))),
151 }
152 }
153
154 /// Join all threads started by the scheduler.
155 pub fn join(self) {
156 match self {
157 Self::ThreadPerHost(sched) => sched.join(),
158 Self::ThreadPerCore(sched) => sched.join(),
159 }
160 }
161}
162
163/// A scope for any task run on the scheduler.
164pub enum SchedulerScope<'sched, 'pool, 'scope, HostType: Host> {
165 ThreadPerHost(thread_per_host::SchedulerScope<'pool, 'scope, HostType>),
166 ThreadPerCore(thread_per_core::SchedulerScope<'sched, 'pool, 'scope, HostType>),
167}
168
169// there are multiple named lifetimes, so let's just be explicit about them rather than hide them
170#[allow(clippy::needless_lifetimes)]
171impl<'sched, 'pool, 'scope, HostType: Host> SchedulerScope<'sched, 'pool, 'scope, HostType> {
172 /// Run the closure on all threads. The closure is given an index of the currently running
173 /// thread.
174 pub fn run(self, f: impl Fn(usize) + Sync + Send + 'scope) {
175 match self {
176 Self::ThreadPerHost(scope) => scope.run(f),
177 Self::ThreadPerCore(scope) => scope.run(f),
178 }
179 }
180
181 /// Run the closure on all threads. The closure is given an index of the currently running
182 /// thread and a host iterator.
183 ///
184 /// The closure must iterate over the provided `HostIter` to completion (until `next()` returns
185 /// `None`), otherwise this may panic. The host iterator is not a real [`std::iter::Iterator`],
186 /// but rather a fake iterator that behaves like a streaming iterator.
187 pub fn run_with_hosts(self, f: impl Fn(usize, &mut HostIter<HostType>) + Send + Sync + 'scope) {
188 match self {
189 Self::ThreadPerHost(scope) => scope.run_with_hosts(move |idx, iter| {
190 let mut iter = HostIter::ThreadPerHost(iter);
191 f(idx, &mut iter)
192 }),
193 Self::ThreadPerCore(scope) => scope.run_with_hosts(move |idx, iter| {
194 let mut iter = HostIter::ThreadPerCore(iter);
195 f(idx, &mut iter)
196 }),
197 }
198 }
199
200 /// Run the closure on all threads. The closure is given an index of the currently running
201 /// thread, a host iterator, and an element of `data`.
202 ///
203 /// The closure must iterate over the provided `HostIter` to completion (until `next()` returns
204 /// `None`), otherwise this may panic. The host iterator is not a real [`std::iter::Iterator`],
205 /// but rather a fake iterator that behaves like a streaming iterator.
206 ///
207 /// Each call of the closure will be given an element of `data`, and this element will not be
208 /// given to any other thread while this closure is running, which means you should not expect
209 /// any contention on this element if using interior mutability. The provided slice **must**
210 /// have a length of at least [`Scheduler::parallelism`]. If the data needs to be initialized,
211 /// it should be initialized before calling this function and not at the beginning of the
212 /// closure. The element may be given to multiple threads, but never two threads at the same
213 /// time.
214 pub fn run_with_data<T>(
215 self,
216 data: &'scope [T],
217 f: impl Fn(usize, &mut HostIter<HostType>, &T) + Send + Sync + 'scope,
218 ) where
219 T: Sync,
220 {
221 match self {
222 Self::ThreadPerHost(scope) => scope.run_with_data(data, move |idx, iter, elem| {
223 let mut iter = HostIter::ThreadPerHost(iter);
224 f(idx, &mut iter, elem)
225 }),
226 Self::ThreadPerCore(scope) => scope.run_with_data(data, move |idx, iter, elem| {
227 let mut iter = HostIter::ThreadPerCore(iter);
228 f(idx, &mut iter, elem)
229 }),
230 }
231 }
232}
233
234/// Supports iterating over all hosts assigned to this thread.
235pub enum HostIter<'a, 'b, HostType: Host> {
236 ThreadPerHost(&'a mut thread_per_host::HostIter<HostType>),
237 ThreadPerCore(&'a mut thread_per_core::HostIter<'b, HostType>),
238}
239
240impl<HostType: Host> HostIter<'_, '_, HostType> {
241 /// For each [`Host`], calls `f` with the host. The `Host` must be returned by the closure. The
242 /// ownership of the `Host` is transferred in and out of the closure rather than using a mutable
243 /// reference since Shadow needs to put the host in a global with `'static` lifetime (the
244 /// worker).
245 pub fn for_each<F>(&mut self, f: F)
246 where
247 F: FnMut(HostType) -> HostType,
248 {
249 match self {
250 Self::ThreadPerHost(x) => x.for_each(f),
251 Self::ThreadPerCore(x) => x.for_each(f),
252 }
253 }
254}