scheduler/
logical_processor.rs

1#![forbid(unsafe_code)]
2
3use crossbeam::queue::ArrayQueue;
4
5/// A set of `n` logical processors.
6pub struct LogicalProcessors {
7    lps: Vec<LogicalProcessor>,
8}
9
10impl LogicalProcessors {
11    pub fn new(processors: &[Option<u32>], num_workers: usize) -> Self {
12        let mut lps = Vec::new();
13
14        for cpu_id in processors {
15            lps.push(LogicalProcessor {
16                cpu_id: *cpu_id,
17                // each queue must be large enough to store all the workers
18                ready_workers: ArrayQueue::new(num_workers),
19                done_workers: ArrayQueue::new(num_workers),
20            });
21        }
22
23        Self { lps }
24    }
25
26    /// Add a worker id to be run on processor `lpi`.
27    pub fn add_worker(&self, lpi: usize, worker: usize) {
28        self.lps[lpi].ready_workers.push(worker).unwrap();
29    }
30
31    /// Get a worker id to run on processor `lpi`. Returns `None` if there are no more workers to run.
32    pub fn next_worker(&self, lpi: usize) -> Option<(usize, usize)> {
33        // Start with workers that last ran on `lpi`; if none are available steal from another in
34        // round-robin order.
35        for (from_lpi, from_lp) in self
36            .lps
37            .iter()
38            .enumerate()
39            .cycle()
40            .skip(lpi)
41            .take(self.lps.len())
42        {
43            if let Some(worker) = from_lp.ready_workers.pop() {
44                // Mark the worker as "done"; push the worker to `lpi`, not the processor that it
45                // was stolen from.
46                self.lps[lpi].done_workers.push(worker).unwrap();
47
48                return Some((worker, from_lpi));
49            }
50        }
51
52        None
53    }
54
55    /// Call after finishing running a task on all workers to mark all workers ready to run again.
56    pub fn reset(&mut self) {
57        for lp in &mut self.lps {
58            assert!(lp.ready_workers.is_empty(), "Not all workers were used");
59            std::mem::swap(&mut lp.ready_workers, &mut lp.done_workers);
60        }
61    }
62
63    /// Returns the cpu id that should be used with [`libc::sched_setaffinity`] to run a thread on
64    /// `lpi`. Returns `None` if no cpu id was assigned to `lpi`.
65    pub fn cpu_id(&self, lpi: usize) -> Option<u32> {
66        self.lps[lpi].cpu_id
67    }
68
69    /// Returns an iterator of logical processor indexes.
70    pub fn iter(&self) -> impl std::iter::ExactSizeIterator<Item = usize> + Clone {
71        0..self.lps.len()
72    }
73}
74
75pub struct LogicalProcessor {
76    cpu_id: Option<u32>,
77    ready_workers: ArrayQueue<usize>,
78    done_workers: ArrayQueue<usize>,
79}