1#![forbid(unsafe_code)]
23use crossbeam::queue::ArrayQueue;
45/// A set of `n` logical processors.
6pub struct LogicalProcessors {
7 lps: Vec<LogicalProcessor>,
8}
910impl LogicalProcessors {
11pub fn new(processors: &[Option<u32>], num_workers: usize) -> Self {
12let mut lps = Vec::new();
1314for cpu_id in processors {
15 lps.push(LogicalProcessor {
16 cpu_id: *cpu_id,
17// each queue must be large enough to store all the workers
18ready_workers: ArrayQueue::new(num_workers),
19 done_workers: ArrayQueue::new(num_workers),
20 });
21 }
2223Self { lps }
24 }
2526/// Add a worker id to be run on processor `lpi`.
27pub fn add_worker(&self, lpi: usize, worker: usize) {
28self.lps[lpi].ready_workers.push(worker).unwrap();
29 }
3031/// Get a worker id to run on processor `lpi`. Returns `None` if there are no more workers to run.
32pub fn next_worker(&self, lpi: usize) -> Option<(usize, usize)> {
33// Start with workers that last ran on `lpi`; if none are available steal from another in
34 // round-robin order.
35for (from_lpi, from_lp) in self
36.lps
37 .iter()
38 .enumerate()
39 .cycle()
40 .skip(lpi)
41 .take(self.lps.len())
42 {
43if let Some(worker) = from_lp.ready_workers.pop() {
44// Mark the worker as "done"; push the worker to `lpi`, not the processor that it
45 // was stolen from.
46self.lps[lpi].done_workers.push(worker).unwrap();
4748return Some((worker, from_lpi));
49 }
50 }
5152None
53}
5455/// Call after finishing running a task on all workers to mark all workers ready to run again.
56pub fn reset(&mut self) {
57for lp in &mut self.lps {
58assert!(lp.ready_workers.is_empty(), "Not all workers were used");
59 std::mem::swap(&mut lp.ready_workers, &mut lp.done_workers);
60 }
61 }
6263/// Returns the cpu id that should be used with [`libc::sched_setaffinity`] to run a thread on
64 /// `lpi`. Returns `None` if no cpu id was assigned to `lpi`.
65pub fn cpu_id(&self, lpi: usize) -> Option<u32> {
66self.lps[lpi].cpu_id
67 }
6869/// Returns an iterator of logical processor indexes.
70pub fn iter(&self) -> impl std::iter::ExactSizeIterator<Item = usize> + Clone {
710..self.lps.len()
72 }
73}
7475pub struct LogicalProcessor {
76 cpu_id: Option<u32>,
77 ready_workers: ArrayQueue<usize>,
78 done_workers: ArrayQueue<usize>,
79}