shadow_rs/network/router/
codel_queue.rs

1//! An active queue management (AQM) algorithm implementing CoDel.
2//! <https://tools.ietf.org/html/rfc8289>
3//!
4//!  The "Flow Queue" variant is not implemented.
5//!  <https://tools.ietf.org/html/rfc8290>
6//!
7//!  More info:
8//!   - <https://en.wikipedia.org/wiki/CoDel>
9//!   - <http://man7.org/linux/man-pages/man8/tc-codel.8.html>
10//!   - <https://queue.acm.org/detail.cfm?id=2209336>
11//!   - <https://queue.acm.org/appendices/codel.html>
12
13use std::{collections::VecDeque, time::Duration};
14
15use shadow_shim_helper_rs::{emulated_time::EmulatedTime, simulation_time::SimulationTime};
16
17use crate::cshadow as c;
18use crate::network::packet::{PacketRc, PacketStatus};
19
20/// The target minimum standing queue delay time, corresponding to the "TARGET"
21/// parameter in the RFC. This is recommended to be set to 5 milliseconds in
22/// internet routers, but in Shadow we increase it to 10 milliseconds.
23const TARGET: SimulationTime = SimulationTime::from_duration(Duration::from_millis(10));
24
25/// The most recent time interval over which the standing delay is computed,
26/// corresponding to the "INTERVAL" parameter in the RFC. This is recommended to
27/// be set to 100 milliseconds in internet routers.
28const INTERVAL: SimulationTime = SimulationTime::from_duration(Duration::from_millis(100));
29
30/// The maximum number of packets we will store, corresponding to the "limit"
31/// parameter in the codel man page. This is recommended to be 1000 in internet
32/// routers, but in Shadow we don't enforce a limit due to our batched sending.
33const LIMIT: usize = usize::MAX;
34
35/// Encodes if CoDel determines that the next available packet can be dropped.
36struct CoDelPopItem {
37    packet: PacketRc,
38    ok_to_drop: bool,
39}
40
41/// Represents the possible states of the CoDel algorithm.
42#[derive(PartialEq, Debug)]
43enum CoDelMode {
44    /// Under good conditions, we store and forward packets
45    Store,
46    /// Under bad conditions, we occasionally drop packets
47    Drop,
48}
49
50/// An entry in the CoDel queque.
51struct CoDelElement {
52    packet: PacketRc,
53    enqueue_ts: EmulatedTime,
54}
55
56/// A packet queue implementing the CoDel active queue management (AQM)
57/// algorithm, suitable for use in network routers.
58///
59/// Currently, the memory capacity of the queue for storing elements is
60/// monitonically increasing since we do not shrink the queue's capacity on
61/// `pop()` operations. We think this is OK since we only use one queue per
62/// host. However, if memory overhead becomes problematic, we can consider
63/// occasionally shrinking the queue's capacity or using a backing that is more
64/// memory-efficient (e.g. a LinkedList).
65pub struct CoDelQueue {
66    /// A queue holding packets and insertion times.
67    elements: VecDeque<CoDelElement>,
68    /// The running sum of the sizes of packets stored in the queue.
69    total_bytes_stored: usize,
70    /// The state indicating if we are dropping or storing packets.
71    mode: CoDelMode,
72    /// If Some, this is an interval worth of time after which packet delays
73    /// started to exceed the target delay.
74    interval_end: Option<EmulatedTime>,
75    /// If Some, the next time we should drop a packet.
76    drop_next: Option<EmulatedTime>,
77    /// The number of packets dropped since entering drop mode.
78    current_drop_count: usize,
79    /// The number of packets dropped the last time we were in drop mode.
80    previous_drop_count: usize,
81}
82
83impl CoDelQueue {
84    /// Creates a new empty packet queue.
85    pub fn new() -> CoDelQueue {
86        CoDelQueue {
87            elements: VecDeque::new(),
88            total_bytes_stored: 0,
89            mode: CoDelMode::Store,
90            interval_end: None,
91            drop_next: None,
92            current_drop_count: 0,
93            previous_drop_count: 0,
94        }
95    }
96
97    /// Returns the total number of packets stored in the queue.
98    #[cfg(test)]
99    pub fn len(&self) -> usize {
100        self.elements.len()
101    }
102
103    /// Returns true if the queue is holding zero packets, false otherwise.
104    #[cfg(test)]
105    pub fn is_empty(&self) -> bool {
106        self.len() == 0
107    }
108
109    /// Returns the packet at the front of the queue, or None if the queue is
110    /// empty. Note that there is no gurantee that a subsequent `pop()`
111    /// operation will return the same packet, since it could be dropped by the
112    /// queue between the `peek()` and `pop()` operations.
113    #[cfg(test)]
114    pub fn peek(&self) -> Option<&PacketRc> {
115        self.elements.front().map(|x| &x.packet)
116    }
117
118    /// Returns the next packet in the queue that conforms to the CoDel standing
119    /// delay requirements, or None if the queue is empty before a conforming
120    /// packet is found. The CoDel packet dropping logic is applied during this
121    /// operation, which could result in packets being dropped before a packet
122    /// that conforms to the standing delay requirements is returned.
123    /// Requires the current time as an argument to avoid calling into the
124    /// worker module internally.
125    pub fn pop(&mut self, now: EmulatedTime) -> Option<PacketRc> {
126        let maybe_packet = match self.codel_pop(&now) {
127            Some(item) => match item.ok_to_drop {
128                true => match self.mode {
129                    CoDelMode::Store => self.drop_from_store_mode(&now, item.packet),
130                    CoDelMode::Drop => self.drop_from_drop_mode(&now, item.packet),
131                },
132                false => {
133                    // Always set Store mode when standing delay below TARGET.
134                    self.mode = CoDelMode::Store;
135                    Some(item.packet)
136                }
137            },
138            None => {
139                // Always set Store mode when the queue is empty.
140                self.mode = CoDelMode::Store;
141                None
142            }
143        };
144
145        maybe_packet.inspect(|p| {
146            p.add_status(PacketStatus::RouterDequeued);
147        })
148    }
149
150    fn drop_from_store_mode(&mut self, now: &EmulatedTime, packet: PacketRc) -> Option<PacketRc> {
151        debug_assert_eq!(self.mode, CoDelMode::Store);
152
153        // Drop one packet and move to drop mode.
154        self.drop_packet(packet);
155        let next_item = self.codel_pop(now);
156        self.mode = CoDelMode::Drop;
157
158        // Reset to the drop rate that was known to control the queue.
159        let delta = self
160            .current_drop_count
161            .saturating_sub(self.previous_drop_count);
162        self.current_drop_count = match self.was_dropping_recently(now) && delta > 1 {
163            true => delta,
164            false => 1,
165        };
166        self.drop_next = Some(CoDelQueue::apply_control_law(now, self.current_drop_count));
167        self.previous_drop_count = self.current_drop_count;
168
169        next_item.map(|x| x.packet)
170    }
171
172    fn drop_from_drop_mode(&mut self, now: &EmulatedTime, packet: PacketRc) -> Option<PacketRc> {
173        debug_assert_eq!(self.mode, CoDelMode::Drop);
174
175        let mut item = Some(CoDelPopItem {
176            packet,
177            ok_to_drop: true,
178        });
179
180        // Drop as many packets as the control law dictates.
181        while item.is_some() && self.mode == CoDelMode::Drop && self.should_drop(now) {
182            self.drop_packet(item.unwrap().packet);
183            self.current_drop_count += 1;
184
185            item = self.codel_pop(now);
186
187            match item.as_ref().is_some_and(|x| x.ok_to_drop) {
188                true => {
189                    // Set the next drop time based on CoDel control law.
190                    // `self.drop_next` is already set in `drop_from_store_mode()`
191                    self.drop_next = Some(CoDelQueue::apply_control_law(
192                        &self.drop_next.unwrap(),
193                        self.current_drop_count,
194                    ));
195                }
196                false => self.mode = CoDelMode::Store,
197            }
198        }
199
200        item.map(|x| x.packet)
201    }
202
203    // Corresponds to the `dodequeue` function in the RFC.
204    fn codel_pop(&mut self, now: &EmulatedTime) -> Option<CoDelPopItem> {
205        match self.elements.pop_front() {
206            Some(element) => {
207                // Found a packet.
208                debug_assert!(element.packet.len() <= self.total_bytes_stored);
209                self.total_bytes_stored =
210                    self.total_bytes_stored.saturating_sub(element.packet.len());
211
212                debug_assert!(now >= &element.enqueue_ts);
213                let standing_delay = now.saturating_duration_since(&element.enqueue_ts);
214                let ok_to_drop = self.process_standing_delay(now, standing_delay);
215
216                Some(CoDelPopItem {
217                    packet: element.packet,
218                    ok_to_drop,
219                })
220            }
221            None => {
222                // Queue is empty, so we cannot be above target.
223                self.interval_end = None;
224                None
225            }
226        }
227    }
228
229    /// Update our state based on the given standing delay. Returns true if the
230    /// packet associated with this delay can be dropped, false otherwise.
231    fn process_standing_delay(
232        &mut self,
233        now: &EmulatedTime,
234        standing_delay: SimulationTime,
235    ) -> bool {
236        if standing_delay < TARGET || self.total_bytes_stored <= c::CONFIG_MTU.try_into().unwrap() {
237            // We are in a good state, i.e., below the target delay. We reset
238            // the interval expiration, so that we wait for at least one full
239            // interval if the delay exceeds the target again.
240            self.interval_end = None;
241            false
242        } else {
243            // We are in a bad state, i.e., at or above the target delay.
244            match self.interval_end {
245                Some(end) => {
246                    // We were already in a bad state, and now we stayed in it.
247
248                    // if we have been in a bad state for a full interval worth
249                    // of time, drop this packet.
250                    now >= &end
251                }
252                None => {
253                    // None means we were in a good state, but now we just
254                    // entered a bad state. If we stay in the bad state for a
255                    // full interval, we will need to enter drop mode later.
256                    // Mark the end of the interval now so we can track it.
257                    self.interval_end = Some(now.saturating_add(INTERVAL));
258                    false
259                }
260            }
261        }
262    }
263
264    /// Returns true if now exceeds our drop_next threshold, false otherwise.
265    fn should_drop(&self, now: &EmulatedTime) -> bool {
266        match self.drop_next {
267            Some(next) => now >= &next,
268            None => false, // Have not yet set the drop threshold
269        }
270    }
271
272    /// Returns true if now is within 16 intervals of drop_next, false otherwise.
273    fn was_dropping_recently(&self, now: &EmulatedTime) -> bool {
274        match self.drop_next {
275            Some(drop_next) => {
276                // now < drop_next + interval*16
277                now.saturating_duration_since(&drop_next) < INTERVAL.saturating_mul(16)
278            }
279            None => false, // Have not yet dropped a packet
280        }
281    }
282
283    /// Apply the CoDel control law using the inverse sqrt of the drop count,
284    /// i.e., `time + (INTERVAL / sqrt(count));`.
285    fn apply_control_law(time: &EmulatedTime, count: usize) -> EmulatedTime {
286        let increment = {
287            let interval = INTERVAL.as_nanos_f64();
288            let sqrt_count = match count {
289                0 => 1f64,
290                _ => (count as f64).sqrt(),
291            };
292            let div = interval / sqrt_count;
293            SimulationTime::from_nanos(div.round() as u64)
294        };
295        let original = time.to_abs_simtime();
296        let adjusted = original.saturating_add(increment);
297        EmulatedTime::from_abs_simtime(adjusted)
298    }
299
300    /// Append a packet to the end of the queue.
301    /// Requires the current time as an argument to avoid calling into the
302    /// worker module internally.
303    pub fn push(&mut self, packet: PacketRc, now: EmulatedTime) {
304        if self.elements.len() < LIMIT {
305            packet.add_status(PacketStatus::RouterEnqueued);
306            self.total_bytes_stored += packet.len();
307            self.elements.push_back(CoDelElement {
308                packet,
309                enqueue_ts: now,
310            });
311        } else {
312            // Section 5.4 in the RFC notes that "packets arriving at a full
313            // buffer will be dropped, but these drops are not counted towards
314            // CoDel's computations".
315            self.drop_packet(packet);
316        }
317    }
318
319    fn drop_packet(&self, packet: PacketRc) {
320        packet.add_status(PacketStatus::RouterDropped);
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use crate::network::tests::mock_time_millis;
328
329    // Some of the tests here don't run in miri because they cause c::packet*
330    // functions to be called during the test.
331
332    #[test]
333    fn empty() {
334        let now = mock_time_millis(1000);
335        let mut cdq = CoDelQueue::new();
336        assert_eq!(cdq.len(), 0);
337        assert!(cdq.is_empty());
338        assert!(cdq.peek().is_none());
339        assert!(cdq.pop(now).is_none());
340    }
341
342    #[test]
343    #[cfg_attr(miri, ignore)]
344    fn push_pop_simple() {
345        let now = mock_time_millis(1000);
346        let mut cdq = CoDelQueue::new();
347
348        const N: usize = 10;
349
350        for i in 1..=N {
351            assert_eq!(cdq.len(), i - 1);
352            cdq.push(PacketRc::new_ipv4_udp_mock(), now);
353            assert_eq!(cdq.len(), i);
354        }
355        for i in 1..=N {
356            assert_eq!(cdq.len(), N - i + 1);
357            assert!(cdq.pop(now).is_some());
358            assert_eq!(cdq.len(), N - i);
359        }
360        assert_eq!(cdq.len(), 0);
361        assert!(cdq.is_empty());
362        assert!(cdq.pop(now).is_none());
363    }
364
365    #[test]
366    fn control_law() {
367        let now = mock_time_millis(1000);
368
369        // The increment should be a full interval.
370        for i in 0..2 {
371            assert_eq!(
372                CoDelQueue::apply_control_law(&now, i).duration_since(&now),
373                INTERVAL
374            );
375        }
376
377        // The increment should reduce exponentially.
378        for i in 2..20 {
379            assert_eq!(
380                CoDelQueue::apply_control_law(&now, i).duration_since(&now),
381                SimulationTime::from_nanos(
382                    (INTERVAL.as_nanos_f64() / (i as f64).sqrt()).round() as u64
383                )
384            );
385        }
386    }
387
388    #[test]
389    #[cfg_attr(miri, ignore)]
390    fn interval() {
391        let one = SimulationTime::try_from_millis(1).unwrap();
392
393        let start = mock_time_millis(1000);
394
395        let mut cdq = CoDelQueue::new();
396        for _ in 0..5 {
397            cdq.push(PacketRc::new_ipv4_udp_mock(), start);
398        }
399        assert!(cdq.total_bytes_stored > c::CONFIG_MTU.try_into().unwrap());
400
401        // Standing delays must remain above the TARGET for INTERVAL amount of
402        // time in order to enter the drop state.
403
404        // Not above target so interval is not set.
405        assert!(cdq.interval_end.is_none());
406        let now = start + TARGET - one;
407        assert!(!cdq.process_standing_delay(&now, TARGET - one));
408        assert!(cdq.interval_end.is_none());
409
410        // Reached target, interval is set but still not ok to drop.
411        let now = start + TARGET;
412        assert!(!cdq.process_standing_delay(&now, TARGET));
413        assert!(cdq.interval_end.is_some());
414        assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
415
416        // Now we exceed target+interval, so ok to drop.
417        let now = start + TARGET + INTERVAL;
418        assert!(cdq.process_standing_delay(&now, TARGET + INTERVAL));
419        assert!(cdq.interval_end.is_some());
420        assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
421
422        let now = start + TARGET + INTERVAL * 2u32;
423        assert!(cdq.process_standing_delay(&now, TARGET + INTERVAL * 2u32));
424        assert!(cdq.interval_end.is_some());
425        assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
426
427        // Delay back to low, interval resets, not ok to drop.
428        let now = start + TARGET + INTERVAL * 2u32;
429        assert!(!cdq.process_standing_delay(&now, one));
430        assert!(cdq.interval_end.is_none());
431    }
432
433    #[test]
434    #[cfg_attr(miri, ignore)]
435    fn mode() {
436        let one = SimulationTime::try_from_millis(1).unwrap();
437
438        let start = mock_time_millis(1000);
439
440        let mut cdq = CoDelQueue::new();
441        const N: usize = 6;
442        for _ in 0..N {
443            cdq.push(PacketRc::new_ipv4_udp_mock(), start);
444        }
445        assert!(cdq.total_bytes_stored > c::CONFIG_MTU.try_into().unwrap());
446        assert_eq!(cdq.len(), N);
447        assert_eq!(cdq.mode, CoDelMode::Store);
448
449        // Standing delays must remain above the TARGET for INTERVAL amount of
450        // time in order to enter the drop state.
451
452        // We didn't reach target yet.
453        cdq.pop(start + TARGET - one);
454        assert_eq!(cdq.len(), N - 1);
455        assert_eq!(cdq.mode, CoDelMode::Store);
456
457        // We now reached target.
458        cdq.pop(start + TARGET);
459        assert_eq!(cdq.len(), N - 2);
460        assert_eq!(cdq.mode, CoDelMode::Store);
461
462        // Still not above target for a full interval.
463        cdq.pop(start + TARGET + INTERVAL - one);
464        assert_eq!(cdq.len(), N - 3);
465        assert_eq!(cdq.mode, CoDelMode::Store);
466
467        // Now above target for interval, should enter drop mode and drop one packet.
468        cdq.pop(start + TARGET + INTERVAL);
469        assert_eq!(cdq.len(), N - 5);
470        assert_eq!(cdq.mode, CoDelMode::Drop);
471
472        // Now if we wait another interval, we get another drop and then
473        // low-delay packets should put us back into store mode.
474        for _ in 0..3 {
475            // Add some low-delay packets
476            cdq.push(
477                PacketRc::new_ipv4_udp_mock(),
478                start + TARGET + INTERVAL * 2u32 - one,
479            );
480        }
481        cdq.pop(start + TARGET + INTERVAL * 2u32);
482        assert_eq!(cdq.mode, CoDelMode::Store);
483    }
484
485    #[test]
486    fn drop_empty() {
487        let start = mock_time_millis(1000);
488        let mut cdq = CoDelQueue::new();
489        cdq.mode = CoDelMode::Drop;
490        cdq.pop(start);
491        assert_eq!(cdq.mode, CoDelMode::Store);
492    }
493
494    #[test]
495    #[cfg_attr(miri, ignore)]
496    fn drop_many() {
497        let start = mock_time_millis(1000);
498        let end = mock_time_millis(1000000);
499
500        let mut cdq = CoDelQueue::new();
501        const N: usize = 20;
502        for _ in 0..N {
503            cdq.push(PacketRc::new_ipv4_udp_mock(), start);
504        }
505        assert_eq!(cdq.len(), N);
506
507        // Start in Store mode.
508        assert_eq!(cdq.mode, CoDelMode::Store);
509
510        // Sets the interval.
511        cdq.pop(start + TARGET);
512        assert_eq!(cdq.len(), N - 1);
513        assert_eq!(cdq.current_drop_count, 0);
514        assert_eq!(cdq.previous_drop_count, 0);
515        assert!(!cdq.was_dropping_recently(&(start + TARGET)));
516        assert_eq!(cdq.mode, CoDelMode::Store);
517
518        // Enters Drop mode, drops 1 packet and sets drop_next
519        cdq.pop(start + TARGET + INTERVAL);
520        assert_eq!(cdq.len(), N - 3);
521        assert_eq!(cdq.current_drop_count, 1);
522        assert_eq!(cdq.previous_drop_count, 1);
523        assert!(cdq.drop_next.is_some());
524        assert!(cdq.was_dropping_recently(&(start + TARGET + INTERVAL)));
525        assert_eq!(cdq.mode, CoDelMode::Drop);
526
527        // In Drop mode, we have repeated drops, but then it leaves Drop mode
528        // before the queue is empty.
529        assert!(cdq.should_drop(&end));
530        cdq.pop(end);
531        assert_eq!(cdq.len(), 1);
532        assert_eq!(cdq.current_drop_count, N - 4);
533        assert_eq!(cdq.mode, CoDelMode::Store);
534    }
535}