shadow_rs/host/network/
queuing.rs

1use std::cmp::Ordering;
2use std::collections::{BinaryHeap, HashSet, VecDeque};
3use std::fmt::Debug;
4use std::hash::Hash;
5
6/// Tracks the order in which items are pushed into the queue, which is useful for some disciplines.
7type PushOrder = u64;
8
9/// An item wrapper that allows us to implement a min-heap WRT item priority. Our implementation of
10/// the `Ord` trait is such that the item containing the smallest priority (across all items) will
11/// be dequeued first, where ties are broken by preferring the item that was pushed into the queue
12/// first. This wrapper guarantees that item priority does not change while the item is queued; to
13/// change an item's priority, it must be dequeued and then re-enqueued.
14struct Prioritized<T> {
15    item: T,
16    priority: u64,
17    push_order: PushOrder,
18}
19
20impl<T> Ord for Prioritized<T> {
21    fn cmp(&self, other: &Self) -> Ordering {
22        // This ordering is intended to implement a min-heap based on item priority, where smaller
23        // values have precedence over larger values. When items are equal, we break ties using the
24        // order in which items were pushed into the queue (which should always be unique in our
25        // implementation).
26        let ordering = match self.priority.cmp(&other.priority) {
27            Ordering::Less => Ordering::Greater,
28            Ordering::Greater => Ordering::Less,
29            // `reverse()` so that items pushed first (with a smaller `push_order`) are preferred.
30            Ordering::Equal => self.push_order.cmp(&other.push_order).reverse(),
31        };
32        assert_ne!(ordering, Ordering::Equal);
33        ordering
34    }
35}
36
37impl<T> PartialOrd for Prioritized<T> {
38    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
39        // We want PartialOrd to be consistent with Ord.
40        Some(self.cmp(other))
41    }
42}
43
44impl<T> Eq for Prioritized<T> {}
45
46impl<T> PartialEq for Prioritized<T> {
47    fn eq(&self, other: &Self) -> bool {
48        // PartialEq must be consistent with PartialOrd. The `Prioritized` object is designed to
49        // never be equal so that its ordering is never ambiguous, preventing non-determinism.
50        let is_equal = self.partial_cmp(other) == Some(Ordering::Equal);
51        assert!(!is_equal);
52        is_equal
53    }
54}
55
56/// The kinds of queuing disciplines the `NetworkQueue` currently supports.
57pub enum NetworkQueueKind {
58    /// A queue where items are sorted and dequeued based on a priority given at enqueue time.
59    /// Queues created with this discipline MUST provide `Some` priorities to
60    /// `NetworkQueue::push()`.
61    MinPriority,
62    /// A queue where items are sorted and dequeued based on the order in which they are enqueued.
63    /// Queues created with this discipline MAY provide `None` priorities to `NetworkQueue::push()`.
64    FirstInFirstOut,
65}
66
67/// The queuing discipline to use to define the order in which enqueued items are dequeued. We use
68/// different data structures to help us realize different queuing strategies. The supported
69/// variants match those described in `NetworkQueueKind`.
70///
71/// Note that the `NetworkQueue` will live for the lifetime of the simulation and these qdisc data
72/// structures will only ever grow and not shrink. Thus, if there's a burst of packets, the queue
73/// will grow to accommodate them but will never shrink again to reclaim memory.
74enum QueuingDiscipline<T> {
75    /// See `NetworkQueueKind::MinPriority`.
76    MinPriority((BinaryHeap<Prioritized<T>>, PushOrder)),
77    /// See `NetworkQueueKind::FirstInFirstOut`.
78    FirstInFirstOut(VecDeque<T>),
79}
80
81#[derive(Debug, Copy, Clone, PartialEq)]
82pub enum PushError {
83    /// Indicates that the item being added to the queue is already in the queue.
84    AlreadyQueued,
85    /// A priority value was not provided when one is required.
86    NonePriority,
87}
88
89/// A queue that can hold a group of items (e.g., sockets or packets) and that provides some control
90/// over the queuing discipline and behavior. The queue stores two copies of every enqueued item:
91/// the items and their clones must be such that they are considered equal. Thus, it may be useful
92/// to wrap items with a `std::rc::Rc` or `std::sync::Arc` before enqueuing.
93///
94/// The queue implementation requires that inserted items are not modified in such a way that an
95/// item's hash, as determined by the `Hash` trait, or its equality, as determined by the `Eq`
96/// trait, changes while it is in the queue. These requirements are consistent with those of a
97/// [`HashSet`](https://doc.rust-lang.org/std/collections/struct.HashSet.html).
98pub struct NetworkQueue<T: Clone + Debug + Eq + Hash> {
99    /// The set of items that currently exist in the queue.
100    membership: HashSet<T>,
101    /// A cloned set of items organized according to the configured queuing discipline.
102    queue: QueuingDiscipline<T>,
103}
104
105impl<T: Clone + Debug + Eq + Hash> NetworkQueue<T> {
106    /// Create a new queue that will organize items according to the given `NetworkQueueKind`.
107    pub fn new(kind: NetworkQueueKind) -> Self {
108        let queue = match kind {
109            NetworkQueueKind::MinPriority => QueuingDiscipline::MinPriority((BinaryHeap::new(), 0)),
110            NetworkQueueKind::FirstInFirstOut => {
111                QueuingDiscipline::FirstInFirstOut(VecDeque::new())
112            }
113        };
114        Self {
115            membership: HashSet::new(),
116            queue,
117        }
118    }
119
120    /// Remove and drop all items from the queue.
121    pub fn clear(&mut self) {
122        match &mut self.queue {
123            QueuingDiscipline::MinPriority((heap, _)) => heap.clear(),
124            QueuingDiscipline::FirstInFirstOut(deque) => deque.clear(),
125        }
126        self.membership.clear();
127    }
128
129    /// Returns true if the queue contains the item, and false otherwise.
130    pub fn contains(&self, item: &T) -> bool {
131        self.membership.contains(item)
132    }
133
134    /// Returns true if the queue does not contain any items.
135    #[allow(dead_code)]
136    pub fn is_empty(&self) -> bool {
137        self.len() == 0
138    }
139
140    /// Returns the number of queued items.
141    pub fn len(&self) -> usize {
142        self.membership.len()
143    }
144
145    /// Returns the next available item if one exists. This function also drops the duplicate cloned
146    /// handle that was created in `NetworkQueue::try_push()`.
147    pub fn pop(&mut self) -> Option<T> {
148        match &mut self.queue {
149            QueuingDiscipline::MinPriority((heap, _)) => heap.pop().map(|x| x.item),
150            QueuingDiscipline::FirstInFirstOut(deque) => deque.pop_front(),
151        }
152        .inspect(|x| {
153            assert!(self.membership.remove(x));
154        })
155    }
156
157    /// Pushes an item into the queue, internally cloning it to support membership checks. It may be
158    /// useful to wrap items with a `std::rc::Rc` or `std::sync::Arc` before calling this function.
159    ///
160    /// The item should not be modified while in the queue such that it's hash or equality changes.
161    ///
162    /// # Panics
163    ///
164    /// This function panics if the queue was configured with `NetworkQueueKind::MinPriority` and
165    /// `maybe_priority` is `None` or if the item already exists in the queue.
166    pub fn push(&mut self, item: T, maybe_priority: Option<u64>) {
167        self.try_push(item, maybe_priority).unwrap()
168    }
169
170    /// Tries to push an item into the queue. If successful, the item is internally cloned to
171    /// support membership checks, so it may be useful to wrap items with a `std::rc::Rc` or
172    /// `std::sync::Arc` before calling this function. If unsuccessful, a `PushError` is returned
173    /// encoding the reason for the failure.
174    ///
175    /// The item should not be modified while in the queue such that it's hash or equality changes.
176    pub fn try_push(&mut self, item: T, maybe_priority: Option<u64>) -> Result<(), PushError> {
177        if self.contains(&item) {
178            Err(PushError::AlreadyQueued)
179        } else {
180            assert!(self.membership.insert(item.clone()));
181            match &mut self.queue {
182                QueuingDiscipline::MinPriority((heap, counter)) => {
183                    if let Some(priority) = maybe_priority {
184                        let push_order = *counter;
185                        *counter += 1;
186                        heap.push(Prioritized {
187                            item,
188                            priority,
189                            push_order,
190                        })
191                    } else {
192                        return Err(PushError::NonePriority);
193                    }
194                }
195                QueuingDiscipline::FirstInFirstOut(deque) => deque.push_back(item),
196            };
197            Ok(())
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::rc::Rc;
205
206    use super::*;
207
208    fn len_helper<T: Clone + Debug + Eq + Hash>(q: &NetworkQueue<T>, len: usize) {
209        match &q.queue {
210            QueuingDiscipline::MinPriority((heap, _)) => assert_eq!(heap.len(), len),
211            QueuingDiscipline::FirstInFirstOut(deque) => assert_eq!(deque.len(), len),
212        };
213        assert_eq!(q.membership.len(), len);
214        assert_eq!(q.len(), len);
215        if len == 0 {
216            assert!(q.is_empty());
217        } else {
218            assert!(!q.is_empty());
219        }
220    }
221
222    fn new_helper(kind: NetworkQueueKind) -> NetworkQueue<String> {
223        let q: NetworkQueue<String> = NetworkQueue::new(kind);
224        len_helper(&q, 0);
225        q
226    }
227
228    #[test]
229    fn new() {
230        new_helper(NetworkQueueKind::MinPriority);
231        new_helper(NetworkQueueKind::FirstInFirstOut);
232    }
233
234    fn push_helper(kind: NetworkQueueKind) -> NetworkQueue<String> {
235        let mut q = new_helper(kind);
236
237        q.push(String::from("First:Max"), Some(3));
238        len_helper(&q, 1);
239        assert!(q.contains(&String::from("First:Max")));
240
241        q.push(String::from("Second:Mid"), Some(2));
242        len_helper(&q, 2);
243        assert!(q.contains(&String::from("First:Max")));
244        assert!(q.contains(&String::from("Second:Mid")));
245
246        q.push(String::from("Third:Min"), Some(1));
247        len_helper(&q, 3);
248        assert!(q.contains(&String::from("First:Max")));
249        assert!(q.contains(&String::from("Second:Mid")));
250        assert!(q.contains(&String::from("Third:Min")));
251        q
252    }
253
254    #[test]
255    fn push() {
256        push_helper(NetworkQueueKind::MinPriority);
257        push_helper(NetworkQueueKind::FirstInFirstOut);
258    }
259
260    #[test]
261    fn push_equal_priority() {
262        let mut q = new_helper(NetworkQueueKind::MinPriority);
263
264        const NUM_ITEMS: i32 = 10;
265
266        // Push a bunch of items with the same priority.
267        for i in 0..NUM_ITEMS {
268            q.push(format!("One:{i}"), Some(1));
269            q.push(format!("Two:{i}"), Some(2));
270            q.push(format!("Zero:{i}"), Some(0));
271        }
272
273        // Make sure they exist.
274        for i in 0..NUM_ITEMS {
275            assert!(q.contains(&format!("One:{i}")));
276            assert!(q.contains(&format!("Two:{i}")));
277            assert!(q.contains(&format!("Zero:{i}")));
278        }
279
280        // Lower priority first, then the ones pushed first should be popped first.
281        for i in 0..NUM_ITEMS {
282            assert_eq!(q.pop(), Some(format!("Zero:{i}")));
283        }
284        for i in 0..NUM_ITEMS {
285            assert_eq!(q.pop(), Some(format!("One:{i}")));
286        }
287        for i in 0..NUM_ITEMS {
288            assert_eq!(q.pop(), Some(format!("Two:{i}")));
289        }
290    }
291
292    #[test]
293    fn pop() {
294        let mut q = push_helper(NetworkQueueKind::MinPriority);
295        len_helper(&q, 3);
296        assert!(q.contains(&String::from("First:Max")));
297        assert!(q.contains(&String::from("Second:Mid")));
298        assert!(q.contains(&String::from("Third:Min")));
299        assert_eq!(q.pop(), Some(String::from("Third:Min")));
300        len_helper(&q, 2);
301        assert!(q.contains(&String::from("First:Max")));
302        assert!(q.contains(&String::from("Second:Mid")));
303        assert!(!q.contains(&String::from("Third:Min")));
304        assert_eq!(q.pop(), Some(String::from("Second:Mid")));
305        len_helper(&q, 1);
306        assert!(q.contains(&String::from("First:Max")));
307        assert!(!q.contains(&String::from("Second:Mid")));
308        assert!(!q.contains(&String::from("Third:Min")));
309        assert_eq!(q.pop(), Some(String::from("First:Max")));
310        len_helper(&q, 0);
311        assert!(!q.contains(&String::from("First:Max")));
312        assert!(!q.contains(&String::from("Second:Mid")));
313        assert!(!q.contains(&String::from("Third:Min")));
314
315        let mut q = push_helper(NetworkQueueKind::FirstInFirstOut);
316        len_helper(&q, 3);
317        assert!(q.contains(&String::from("First:Max")));
318        assert!(q.contains(&String::from("Second:Mid")));
319        assert!(q.contains(&String::from("Third:Min")));
320        assert_eq!(q.pop(), Some(String::from("First:Max")));
321        len_helper(&q, 2);
322        assert!(!q.contains(&String::from("First:Max")));
323        assert!(q.contains(&String::from("Second:Mid")));
324        assert!(q.contains(&String::from("Third:Min")));
325        assert_eq!(q.pop(), Some(String::from("Second:Mid")));
326        len_helper(&q, 1);
327        assert!(!q.contains(&String::from("First:Max")));
328        assert!(!q.contains(&String::from("Second:Mid")));
329        assert!(q.contains(&String::from("Third:Min")));
330        assert_eq!(q.pop(), Some(String::from("Third:Min")));
331        len_helper(&q, 0);
332        assert!(!q.contains(&String::from("First:Max")));
333        assert!(!q.contains(&String::from("Second:Mid")));
334        assert!(!q.contains(&String::from("Third:Min")));
335    }
336
337    #[test]
338    fn none_priority() {
339        let mut q = new_helper(NetworkQueueKind::MinPriority);
340        assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
341        assert_eq!(
342            q.try_push(String::from("Item2"), None),
343            Err(PushError::NonePriority)
344        );
345
346        let mut q = new_helper(NetworkQueueKind::FirstInFirstOut);
347        assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
348        assert_eq!(q.try_push(String::from("Item2"), None), Ok(()));
349    }
350
351    #[test]
352    fn already_queued() {
353        let mut q = new_helper(NetworkQueueKind::MinPriority);
354        assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
355        assert_eq!(
356            q.try_push(String::from("Item1"), Some(2)),
357            Err(PushError::AlreadyQueued)
358        );
359
360        let mut q = new_helper(NetworkQueueKind::FirstInFirstOut);
361        assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
362        assert_eq!(
363            q.try_push(String::from("Item1"), Some(2)),
364            Err(PushError::AlreadyQueued)
365        );
366    }
367
368    #[test]
369    fn rc_wrapped_items() {
370        // This test is very specific to the current implementation which clones an item on push,
371        // and it will have to be changed if we use an Rc or Arc internally rather than relying and
372        // the caller to use it.
373        let item = Rc::new(String::from("Item"));
374        assert_eq!(Rc::strong_count(&item), 1);
375
376        let mut q = NetworkQueue::new(NetworkQueueKind::MinPriority);
377        q.push(item.clone(), Some(1));
378        assert!(q.contains(&item));
379        assert_eq!(Rc::strong_count(&item), 3);
380
381        assert_eq!(
382            q.try_push(item.clone(), Some(2)),
383            Err(PushError::AlreadyQueued)
384        );
385        assert!(q.contains(&item));
386        assert_eq!(Rc::strong_count(&item), 3);
387
388        assert_eq!(q.pop(), Some(item.clone()));
389        assert!(!q.contains(&item));
390        assert_eq!(Rc::strong_count(&item), 1);
391    }
392}