1use std::cmp::Ordering;
2use std::collections::{BinaryHeap, HashSet, VecDeque};
3use std::fmt::Debug;
4use std::hash::Hash;
5
6type PushOrder = u64;
8
9struct Prioritized<T> {
15 item: T,
16 priority: u64,
17 push_order: PushOrder,
18}
19
20impl<T> Ord for Prioritized<T> {
21 fn cmp(&self, other: &Self) -> Ordering {
22 let ordering = match self.priority.cmp(&other.priority) {
27 Ordering::Less => Ordering::Greater,
28 Ordering::Greater => Ordering::Less,
29 Ordering::Equal => self.push_order.cmp(&other.push_order).reverse(),
31 };
32 assert_ne!(ordering, Ordering::Equal);
33 ordering
34 }
35}
36
37impl<T> PartialOrd for Prioritized<T> {
38 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
39 Some(self.cmp(other))
41 }
42}
43
44impl<T> Eq for Prioritized<T> {}
45
46impl<T> PartialEq for Prioritized<T> {
47 fn eq(&self, other: &Self) -> bool {
48 let is_equal = self.partial_cmp(other) == Some(Ordering::Equal);
51 assert!(!is_equal);
52 is_equal
53 }
54}
55
56pub enum NetworkQueueKind {
58 MinPriority,
62 FirstInFirstOut,
65}
66
67enum QueuingDiscipline<T> {
75 MinPriority((BinaryHeap<Prioritized<T>>, PushOrder)),
77 FirstInFirstOut(VecDeque<T>),
79}
80
81#[derive(Debug, Copy, Clone, PartialEq)]
82pub enum PushError {
83 AlreadyQueued,
85 NonePriority,
87}
88
89pub struct NetworkQueue<T: Clone + Debug + Eq + Hash> {
99 membership: HashSet<T>,
101 queue: QueuingDiscipline<T>,
103}
104
105impl<T: Clone + Debug + Eq + Hash> NetworkQueue<T> {
106 pub fn new(kind: NetworkQueueKind) -> Self {
108 let queue = match kind {
109 NetworkQueueKind::MinPriority => QueuingDiscipline::MinPriority((BinaryHeap::new(), 0)),
110 NetworkQueueKind::FirstInFirstOut => {
111 QueuingDiscipline::FirstInFirstOut(VecDeque::new())
112 }
113 };
114 Self {
115 membership: HashSet::new(),
116 queue,
117 }
118 }
119
120 pub fn clear(&mut self) {
122 match &mut self.queue {
123 QueuingDiscipline::MinPriority((heap, _)) => heap.clear(),
124 QueuingDiscipline::FirstInFirstOut(deque) => deque.clear(),
125 }
126 self.membership.clear();
127 }
128
129 pub fn contains(&self, item: &T) -> bool {
131 self.membership.contains(item)
132 }
133
134 #[allow(dead_code)]
136 pub fn is_empty(&self) -> bool {
137 self.len() == 0
138 }
139
140 pub fn len(&self) -> usize {
142 self.membership.len()
143 }
144
145 pub fn pop(&mut self) -> Option<T> {
148 match &mut self.queue {
149 QueuingDiscipline::MinPriority((heap, _)) => heap.pop().map(|x| x.item),
150 QueuingDiscipline::FirstInFirstOut(deque) => deque.pop_front(),
151 }
152 .inspect(|x| {
153 assert!(self.membership.remove(x));
154 })
155 }
156
157 pub fn push(&mut self, item: T, maybe_priority: Option<u64>) {
167 self.try_push(item, maybe_priority).unwrap()
168 }
169
170 pub fn try_push(&mut self, item: T, maybe_priority: Option<u64>) -> Result<(), PushError> {
177 if self.contains(&item) {
178 Err(PushError::AlreadyQueued)
179 } else {
180 assert!(self.membership.insert(item.clone()));
181 match &mut self.queue {
182 QueuingDiscipline::MinPriority((heap, counter)) => {
183 if let Some(priority) = maybe_priority {
184 let push_order = *counter;
185 *counter += 1;
186 heap.push(Prioritized {
187 item,
188 priority,
189 push_order,
190 })
191 } else {
192 return Err(PushError::NonePriority);
193 }
194 }
195 QueuingDiscipline::FirstInFirstOut(deque) => deque.push_back(item),
196 };
197 Ok(())
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::rc::Rc;
205
206 use super::*;
207
208 fn len_helper<T: Clone + Debug + Eq + Hash>(q: &NetworkQueue<T>, len: usize) {
209 match &q.queue {
210 QueuingDiscipline::MinPriority((heap, _)) => assert_eq!(heap.len(), len),
211 QueuingDiscipline::FirstInFirstOut(deque) => assert_eq!(deque.len(), len),
212 };
213 assert_eq!(q.membership.len(), len);
214 assert_eq!(q.len(), len);
215 if len == 0 {
216 assert!(q.is_empty());
217 } else {
218 assert!(!q.is_empty());
219 }
220 }
221
222 fn new_helper(kind: NetworkQueueKind) -> NetworkQueue<String> {
223 let q: NetworkQueue<String> = NetworkQueue::new(kind);
224 len_helper(&q, 0);
225 q
226 }
227
228 #[test]
229 fn new() {
230 new_helper(NetworkQueueKind::MinPriority);
231 new_helper(NetworkQueueKind::FirstInFirstOut);
232 }
233
234 fn push_helper(kind: NetworkQueueKind) -> NetworkQueue<String> {
235 let mut q = new_helper(kind);
236
237 q.push(String::from("First:Max"), Some(3));
238 len_helper(&q, 1);
239 assert!(q.contains(&String::from("First:Max")));
240
241 q.push(String::from("Second:Mid"), Some(2));
242 len_helper(&q, 2);
243 assert!(q.contains(&String::from("First:Max")));
244 assert!(q.contains(&String::from("Second:Mid")));
245
246 q.push(String::from("Third:Min"), Some(1));
247 len_helper(&q, 3);
248 assert!(q.contains(&String::from("First:Max")));
249 assert!(q.contains(&String::from("Second:Mid")));
250 assert!(q.contains(&String::from("Third:Min")));
251 q
252 }
253
254 #[test]
255 fn push() {
256 push_helper(NetworkQueueKind::MinPriority);
257 push_helper(NetworkQueueKind::FirstInFirstOut);
258 }
259
260 #[test]
261 fn push_equal_priority() {
262 let mut q = new_helper(NetworkQueueKind::MinPriority);
263
264 const NUM_ITEMS: i32 = 10;
265
266 for i in 0..NUM_ITEMS {
268 q.push(format!("One:{i}"), Some(1));
269 q.push(format!("Two:{i}"), Some(2));
270 q.push(format!("Zero:{i}"), Some(0));
271 }
272
273 for i in 0..NUM_ITEMS {
275 assert!(q.contains(&format!("One:{i}")));
276 assert!(q.contains(&format!("Two:{i}")));
277 assert!(q.contains(&format!("Zero:{i}")));
278 }
279
280 for i in 0..NUM_ITEMS {
282 assert_eq!(q.pop(), Some(format!("Zero:{i}")));
283 }
284 for i in 0..NUM_ITEMS {
285 assert_eq!(q.pop(), Some(format!("One:{i}")));
286 }
287 for i in 0..NUM_ITEMS {
288 assert_eq!(q.pop(), Some(format!("Two:{i}")));
289 }
290 }
291
292 #[test]
293 fn pop() {
294 let mut q = push_helper(NetworkQueueKind::MinPriority);
295 len_helper(&q, 3);
296 assert!(q.contains(&String::from("First:Max")));
297 assert!(q.contains(&String::from("Second:Mid")));
298 assert!(q.contains(&String::from("Third:Min")));
299 assert_eq!(q.pop(), Some(String::from("Third:Min")));
300 len_helper(&q, 2);
301 assert!(q.contains(&String::from("First:Max")));
302 assert!(q.contains(&String::from("Second:Mid")));
303 assert!(!q.contains(&String::from("Third:Min")));
304 assert_eq!(q.pop(), Some(String::from("Second:Mid")));
305 len_helper(&q, 1);
306 assert!(q.contains(&String::from("First:Max")));
307 assert!(!q.contains(&String::from("Second:Mid")));
308 assert!(!q.contains(&String::from("Third:Min")));
309 assert_eq!(q.pop(), Some(String::from("First:Max")));
310 len_helper(&q, 0);
311 assert!(!q.contains(&String::from("First:Max")));
312 assert!(!q.contains(&String::from("Second:Mid")));
313 assert!(!q.contains(&String::from("Third:Min")));
314
315 let mut q = push_helper(NetworkQueueKind::FirstInFirstOut);
316 len_helper(&q, 3);
317 assert!(q.contains(&String::from("First:Max")));
318 assert!(q.contains(&String::from("Second:Mid")));
319 assert!(q.contains(&String::from("Third:Min")));
320 assert_eq!(q.pop(), Some(String::from("First:Max")));
321 len_helper(&q, 2);
322 assert!(!q.contains(&String::from("First:Max")));
323 assert!(q.contains(&String::from("Second:Mid")));
324 assert!(q.contains(&String::from("Third:Min")));
325 assert_eq!(q.pop(), Some(String::from("Second:Mid")));
326 len_helper(&q, 1);
327 assert!(!q.contains(&String::from("First:Max")));
328 assert!(!q.contains(&String::from("Second:Mid")));
329 assert!(q.contains(&String::from("Third:Min")));
330 assert_eq!(q.pop(), Some(String::from("Third:Min")));
331 len_helper(&q, 0);
332 assert!(!q.contains(&String::from("First:Max")));
333 assert!(!q.contains(&String::from("Second:Mid")));
334 assert!(!q.contains(&String::from("Third:Min")));
335 }
336
337 #[test]
338 fn none_priority() {
339 let mut q = new_helper(NetworkQueueKind::MinPriority);
340 assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
341 assert_eq!(
342 q.try_push(String::from("Item2"), None),
343 Err(PushError::NonePriority)
344 );
345
346 let mut q = new_helper(NetworkQueueKind::FirstInFirstOut);
347 assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
348 assert_eq!(q.try_push(String::from("Item2"), None), Ok(()));
349 }
350
351 #[test]
352 fn already_queued() {
353 let mut q = new_helper(NetworkQueueKind::MinPriority);
354 assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
355 assert_eq!(
356 q.try_push(String::from("Item1"), Some(2)),
357 Err(PushError::AlreadyQueued)
358 );
359
360 let mut q = new_helper(NetworkQueueKind::FirstInFirstOut);
361 assert_eq!(q.try_push(String::from("Item1"), Some(1)), Ok(()));
362 assert_eq!(
363 q.try_push(String::from("Item1"), Some(2)),
364 Err(PushError::AlreadyQueued)
365 );
366 }
367
368 #[test]
369 fn rc_wrapped_items() {
370 let item = Rc::new(String::from("Item"));
374 assert_eq!(Rc::strong_count(&item), 1);
375
376 let mut q = NetworkQueue::new(NetworkQueueKind::MinPriority);
377 q.push(item.clone(), Some(1));
378 assert!(q.contains(&item));
379 assert_eq!(Rc::strong_count(&item), 3);
380
381 assert_eq!(
382 q.try_push(item.clone(), Some(2)),
383 Err(PushError::AlreadyQueued)
384 );
385 assert!(q.contains(&item));
386 assert_eq!(Rc::strong_count(&item), 3);
387
388 assert_eq!(q.pop(), Some(item.clone()));
389 assert!(!q.contains(&item));
390 assert_eq!(Rc::strong_count(&item), 1);
391 }
392}