1//! An active queue management (AQM) algorithm implementing CoDel.
2//! <https://tools.ietf.org/html/rfc8289>
3//!
4//! The "Flow Queue" variant is not implemented.
5//! <https://tools.ietf.org/html/rfc8290>
6//!
7//! More info:
8//! - <https://en.wikipedia.org/wiki/CoDel>
9//! - <http://man7.org/linux/man-pages/man8/tc-codel.8.html>
10//! - <https://queue.acm.org/detail.cfm?id=2209336>
11//! - <https://queue.acm.org/appendices/codel.html>
1213use std::{collections::VecDeque, time::Duration};
1415use shadow_shim_helper_rs::{emulated_time::EmulatedTime, simulation_time::SimulationTime};
1617use crate::cshadow as c;
18use crate::network::packet::{PacketRc, PacketStatus};
1920/// The target minimum standing queue delay time, corresponding to the "TARGET"
21/// parameter in the RFC. This is recommended to be set to 5 milliseconds in
22/// internet routers, but in Shadow we increase it to 10 milliseconds.
23const TARGET: SimulationTime = SimulationTime::from_duration(Duration::from_millis(10));
2425/// The most recent time interval over which the standing delay is computed,
26/// corresponding to the "INTERVAL" parameter in the RFC. This is recommended to
27/// be set to 100 milliseconds in internet routers.
28const INTERVAL: SimulationTime = SimulationTime::from_duration(Duration::from_millis(100));
2930/// The maximum number of packets we will store, corresponding to the "limit"
31/// parameter in the codel man page. This is recommended to be 1000 in internet
32/// routers, but in Shadow we don't enforce a limit due to our batched sending.
33const LIMIT: usize = usize::MAX;
3435/// Encodes if CoDel determines that the next available packet can be dropped.
36struct CoDelPopItem {
37 packet: PacketRc,
38 ok_to_drop: bool,
39}
4041/// Represents the possible states of the CoDel algorithm.
42#[derive(PartialEq, Debug)]
43enum CoDelMode {
44/// Under good conditions, we store and forward packets
45Store,
46/// Under bad conditions, we occasionally drop packets
47Drop,
48}
4950/// An entry in the CoDel queque.
51struct CoDelElement {
52 packet: PacketRc,
53 enqueue_ts: EmulatedTime,
54}
5556/// A packet queue implementing the CoDel active queue management (AQM)
57/// algorithm, suitable for use in network routers.
58///
59/// Currently, the memory capacity of the queue for storing elements is
60/// monitonically increasing since we do not shrink the queue's capacity on
61/// `pop()` operations. We think this is OK since we only use one queue per
62/// host. However, if memory overhead becomes problematic, we can consider
63/// occasionally shrinking the queue's capacity or using a backing that is more
64/// memory-efficient (e.g. a LinkedList).
65pub struct CoDelQueue {
66/// A queue holding packets and insertion times.
67elements: VecDeque<CoDelElement>,
68/// The running sum of the sizes of packets stored in the queue.
69total_bytes_stored: usize,
70/// The state indicating if we are dropping or storing packets.
71mode: CoDelMode,
72/// If Some, this is an interval worth of time after which packet delays
73 /// started to exceed the target delay.
74interval_end: Option<EmulatedTime>,
75/// If Some, the next time we should drop a packet.
76drop_next: Option<EmulatedTime>,
77/// The number of packets dropped since entering drop mode.
78current_drop_count: usize,
79/// The number of packets dropped the last time we were in drop mode.
80previous_drop_count: usize,
81}
8283impl CoDelQueue {
84/// Creates a new empty packet queue.
85pub fn new() -> CoDelQueue {
86 CoDelQueue {
87 elements: VecDeque::new(),
88 total_bytes_stored: 0,
89 mode: CoDelMode::Store,
90 interval_end: None,
91 drop_next: None,
92 current_drop_count: 0,
93 previous_drop_count: 0,
94 }
95 }
9697/// Returns the total number of packets stored in the queue.
98#[cfg(test)]
99pub fn len(&self) -> usize {
100self.elements.len()
101 }
102103/// Returns true if the queue is holding zero packets, false otherwise.
104#[cfg(test)]
105pub fn is_empty(&self) -> bool {
106self.len() == 0
107}
108109/// Returns the packet at the front of the queue, or None if the queue is
110 /// empty. Note that there is no gurantee that a subsequent `pop()`
111 /// operation will return the same packet, since it could be dropped by the
112 /// queue between the `peek()` and `pop()` operations.
113#[cfg(test)]
114pub fn peek(&self) -> Option<&PacketRc> {
115self.elements.front().map(|x| &x.packet)
116 }
117118/// Returns the next packet in the queue that conforms to the CoDel standing
119 /// delay requirements, or None if the queue is empty before a conforming
120 /// packet is found. The CoDel packet dropping logic is applied during this
121 /// operation, which could result in packets being dropped before a packet
122 /// that conforms to the standing delay requirements is returned.
123 /// Requires the current time as an argument to avoid calling into the
124 /// worker module internally.
125pub fn pop(&mut self, now: EmulatedTime) -> Option<PacketRc> {
126let maybe_packet = match self.codel_pop(&now) {
127Some(item) => match item.ok_to_drop {
128true => match self.mode {
129 CoDelMode::Store => self.drop_from_store_mode(&now, item.packet),
130 CoDelMode::Drop => self.drop_from_drop_mode(&now, item.packet),
131 },
132false => {
133// Always set Store mode when standing delay below TARGET.
134self.mode = CoDelMode::Store;
135Some(item.packet)
136 }
137 },
138None => {
139// Always set Store mode when the queue is empty.
140self.mode = CoDelMode::Store;
141None
142}
143 };
144145 maybe_packet.inspect(|p| {
146 p.add_status(PacketStatus::RouterDequeued);
147 })
148 }
149150fn drop_from_store_mode(&mut self, now: &EmulatedTime, packet: PacketRc) -> Option<PacketRc> {
151debug_assert_eq!(self.mode, CoDelMode::Store);
152153// Drop one packet and move to drop mode.
154self.drop_packet(packet);
155let next_item = self.codel_pop(now);
156self.mode = CoDelMode::Drop;
157158// Reset to the drop rate that was known to control the queue.
159let delta = self
160.current_drop_count
161 .saturating_sub(self.previous_drop_count);
162self.current_drop_count = match self.was_dropping_recently(now) && delta > 1 {
163true => delta,
164false => 1,
165 };
166self.drop_next = Some(CoDelQueue::apply_control_law(now, self.current_drop_count));
167self.previous_drop_count = self.current_drop_count;
168169 next_item.map(|x| x.packet)
170 }
171172fn drop_from_drop_mode(&mut self, now: &EmulatedTime, packet: PacketRc) -> Option<PacketRc> {
173debug_assert_eq!(self.mode, CoDelMode::Drop);
174175let mut item = Some(CoDelPopItem {
176 packet,
177 ok_to_drop: true,
178 });
179180// Drop as many packets as the control law dictates.
181while item.is_some() && self.mode == CoDelMode::Drop && self.should_drop(now) {
182self.drop_packet(item.unwrap().packet);
183self.current_drop_count += 1;
184185 item = self.codel_pop(now);
186187match item.as_ref().is_some_and(|x| x.ok_to_drop) {
188true => {
189// Set the next drop time based on CoDel control law.
190 // `self.drop_next` is already set in `drop_from_store_mode()`
191self.drop_next = Some(CoDelQueue::apply_control_law(
192&self.drop_next.unwrap(),
193self.current_drop_count,
194 ));
195 }
196false => self.mode = CoDelMode::Store,
197 }
198 }
199200 item.map(|x| x.packet)
201 }
202203// Corresponds to the `dodequeue` function in the RFC.
204fn codel_pop(&mut self, now: &EmulatedTime) -> Option<CoDelPopItem> {
205match self.elements.pop_front() {
206Some(element) => {
207// Found a packet.
208debug_assert!(element.packet.len() <= self.total_bytes_stored);
209self.total_bytes_stored =
210self.total_bytes_stored.saturating_sub(element.packet.len());
211212debug_assert!(now >= &element.enqueue_ts);
213let standing_delay = now.saturating_duration_since(&element.enqueue_ts);
214let ok_to_drop = self.process_standing_delay(now, standing_delay);
215216Some(CoDelPopItem {
217 packet: element.packet,
218 ok_to_drop,
219 })
220 }
221None => {
222// Queue is empty, so we cannot be above target.
223self.interval_end = None;
224None
225}
226 }
227 }
228229/// Update our state based on the given standing delay. Returns true if the
230 /// packet associated with this delay can be dropped, false otherwise.
231fn process_standing_delay(
232&mut self,
233 now: &EmulatedTime,
234 standing_delay: SimulationTime,
235 ) -> bool {
236if standing_delay < TARGET || self.total_bytes_stored <= c::CONFIG_MTU.try_into().unwrap() {
237// We are in a good state, i.e., below the target delay. We reset
238 // the interval expiration, so that we wait for at least one full
239 // interval if the delay exceeds the target again.
240self.interval_end = None;
241false
242} else {
243// We are in a bad state, i.e., at or above the target delay.
244match self.interval_end {
245Some(end) => {
246// We were already in a bad state, and now we stayed in it.
247248 // if we have been in a bad state for a full interval worth
249 // of time, drop this packet.
250now >= &end
251 }
252None => {
253// None means we were in a good state, but now we just
254 // entered a bad state. If we stay in the bad state for a
255 // full interval, we will need to enter drop mode later.
256 // Mark the end of the interval now so we can track it.
257self.interval_end = Some(now.saturating_add(INTERVAL));
258false
259}
260 }
261 }
262 }
263264/// Returns true if now exceeds our drop_next threshold, false otherwise.
265fn should_drop(&self, now: &EmulatedTime) -> bool {
266match self.drop_next {
267Some(next) => now >= &next,
268None => false, // Have not yet set the drop threshold
269}
270 }
271272/// Returns true if now is within 16 intervals of drop_next, false otherwise.
273fn was_dropping_recently(&self, now: &EmulatedTime) -> bool {
274match self.drop_next {
275Some(drop_next) => {
276// now < drop_next + interval*16
277now.saturating_duration_since(&drop_next) < INTERVAL.saturating_mul(16)
278 }
279None => false, // Have not yet dropped a packet
280}
281 }
282283/// Apply the CoDel control law using the inverse sqrt of the drop count,
284 /// i.e., `time + (INTERVAL / sqrt(count));`.
285fn apply_control_law(time: &EmulatedTime, count: usize) -> EmulatedTime {
286let increment = {
287let interval = INTERVAL.as_nanos_f64();
288let sqrt_count = match count {
2890 => 1f64,
290_ => (count as f64).sqrt(),
291 };
292let div = interval / sqrt_count;
293 SimulationTime::from_nanos(div.round() as u64)
294 };
295let original = time.to_abs_simtime();
296let adjusted = original.saturating_add(increment);
297 EmulatedTime::from_abs_simtime(adjusted)
298 }
299300/// Append a packet to the end of the queue.
301 /// Requires the current time as an argument to avoid calling into the
302 /// worker module internally.
303pub fn push(&mut self, packet: PacketRc, now: EmulatedTime) {
304if self.elements.len() < LIMIT {
305 packet.add_status(PacketStatus::RouterEnqueued);
306self.total_bytes_stored += packet.len();
307self.elements.push_back(CoDelElement {
308 packet,
309 enqueue_ts: now,
310 });
311 } else {
312// Section 5.4 in the RFC notes that "packets arriving at a full
313 // buffer will be dropped, but these drops are not counted towards
314 // CoDel's computations".
315self.drop_packet(packet);
316 }
317 }
318319fn drop_packet(&self, packet: PacketRc) {
320 packet.add_status(PacketStatus::RouterDropped);
321 }
322}
323324#[cfg(test)]
325mod tests {
326use super::*;
327use crate::network::tests::mock_time_millis;
328329// Some of the tests here don't run in miri because they cause c::packet*
330 // functions to be called during the test.
331332#[test]
333fn empty() {
334let now = mock_time_millis(1000);
335let mut cdq = CoDelQueue::new();
336assert_eq!(cdq.len(), 0);
337assert!(cdq.is_empty());
338assert!(cdq.peek().is_none());
339assert!(cdq.pop(now).is_none());
340 }
341342#[test]
343 #[cfg_attr(miri, ignore)]
344fn push_pop_simple() {
345let now = mock_time_millis(1000);
346let mut cdq = CoDelQueue::new();
347348const N: usize = 10;
349350for i in 1..=N {
351assert_eq!(cdq.len(), i - 1);
352 cdq.push(PacketRc::new_ipv4_udp_mock(), now);
353assert_eq!(cdq.len(), i);
354 }
355for i in 1..=N {
356assert_eq!(cdq.len(), N - i + 1);
357assert!(cdq.pop(now).is_some());
358assert_eq!(cdq.len(), N - i);
359 }
360assert_eq!(cdq.len(), 0);
361assert!(cdq.is_empty());
362assert!(cdq.pop(now).is_none());
363 }
364365#[test]
366fn control_law() {
367let now = mock_time_millis(1000);
368369// The increment should be a full interval.
370for i in 0..2 {
371assert_eq!(
372 CoDelQueue::apply_control_law(&now, i).duration_since(&now),
373 INTERVAL
374 );
375 }
376377// The increment should reduce exponentially.
378for i in 2..20 {
379assert_eq!(
380 CoDelQueue::apply_control_law(&now, i).duration_since(&now),
381 SimulationTime::from_nanos(
382 (INTERVAL.as_nanos_f64() / (i as f64).sqrt()).round() as u64
383 )
384 );
385 }
386 }
387388#[test]
389 #[cfg_attr(miri, ignore)]
390fn interval() {
391let one = SimulationTime::try_from_millis(1).unwrap();
392393let start = mock_time_millis(1000);
394395let mut cdq = CoDelQueue::new();
396for _ in 0..5 {
397 cdq.push(PacketRc::new_ipv4_udp_mock(), start);
398 }
399assert!(cdq.total_bytes_stored > c::CONFIG_MTU.try_into().unwrap());
400401// Standing delays must remain above the TARGET for INTERVAL amount of
402 // time in order to enter the drop state.
403404 // Not above target so interval is not set.
405assert!(cdq.interval_end.is_none());
406let now = start + TARGET - one;
407assert!(!cdq.process_standing_delay(&now, TARGET - one));
408assert!(cdq.interval_end.is_none());
409410// Reached target, interval is set but still not ok to drop.
411let now = start + TARGET;
412assert!(!cdq.process_standing_delay(&now, TARGET));
413assert!(cdq.interval_end.is_some());
414assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
415416// Now we exceed target+interval, so ok to drop.
417let now = start + TARGET + INTERVAL;
418assert!(cdq.process_standing_delay(&now, TARGET + INTERVAL));
419assert!(cdq.interval_end.is_some());
420assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
421422let now = start + TARGET + INTERVAL * 2u32;
423assert!(cdq.process_standing_delay(&now, TARGET + INTERVAL * 2u32));
424assert!(cdq.interval_end.is_some());
425assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
426427// Delay back to low, interval resets, not ok to drop.
428let now = start + TARGET + INTERVAL * 2u32;
429assert!(!cdq.process_standing_delay(&now, one));
430assert!(cdq.interval_end.is_none());
431 }
432433#[test]
434 #[cfg_attr(miri, ignore)]
435fn mode() {
436let one = SimulationTime::try_from_millis(1).unwrap();
437438let start = mock_time_millis(1000);
439440let mut cdq = CoDelQueue::new();
441const N: usize = 6;
442for _ in 0..N {
443 cdq.push(PacketRc::new_ipv4_udp_mock(), start);
444 }
445assert!(cdq.total_bytes_stored > c::CONFIG_MTU.try_into().unwrap());
446assert_eq!(cdq.len(), N);
447assert_eq!(cdq.mode, CoDelMode::Store);
448449// Standing delays must remain above the TARGET for INTERVAL amount of
450 // time in order to enter the drop state.
451452 // We didn't reach target yet.
453cdq.pop(start + TARGET - one);
454assert_eq!(cdq.len(), N - 1);
455assert_eq!(cdq.mode, CoDelMode::Store);
456457// We now reached target.
458cdq.pop(start + TARGET);
459assert_eq!(cdq.len(), N - 2);
460assert_eq!(cdq.mode, CoDelMode::Store);
461462// Still not above target for a full interval.
463cdq.pop(start + TARGET + INTERVAL - one);
464assert_eq!(cdq.len(), N - 3);
465assert_eq!(cdq.mode, CoDelMode::Store);
466467// Now above target for interval, should enter drop mode and drop one packet.
468cdq.pop(start + TARGET + INTERVAL);
469assert_eq!(cdq.len(), N - 5);
470assert_eq!(cdq.mode, CoDelMode::Drop);
471472// Now if we wait another interval, we get another drop and then
473 // low-delay packets should put us back into store mode.
474for _ in 0..3 {
475// Add some low-delay packets
476cdq.push(
477 PacketRc::new_ipv4_udp_mock(),
478 start + TARGET + INTERVAL * 2u32 - one,
479 );
480 }
481 cdq.pop(start + TARGET + INTERVAL * 2u32);
482assert_eq!(cdq.mode, CoDelMode::Store);
483 }
484485#[test]
486fn drop_empty() {
487let start = mock_time_millis(1000);
488let mut cdq = CoDelQueue::new();
489 cdq.mode = CoDelMode::Drop;
490 cdq.pop(start);
491assert_eq!(cdq.mode, CoDelMode::Store);
492 }
493494#[test]
495 #[cfg_attr(miri, ignore)]
496fn drop_many() {
497let start = mock_time_millis(1000);
498let end = mock_time_millis(1000000);
499500let mut cdq = CoDelQueue::new();
501const N: usize = 20;
502for _ in 0..N {
503 cdq.push(PacketRc::new_ipv4_udp_mock(), start);
504 }
505assert_eq!(cdq.len(), N);
506507// Start in Store mode.
508assert_eq!(cdq.mode, CoDelMode::Store);
509510// Sets the interval.
511cdq.pop(start + TARGET);
512assert_eq!(cdq.len(), N - 1);
513assert_eq!(cdq.current_drop_count, 0);
514assert_eq!(cdq.previous_drop_count, 0);
515assert!(!cdq.was_dropping_recently(&(start + TARGET)));
516assert_eq!(cdq.mode, CoDelMode::Store);
517518// Enters Drop mode, drops 1 packet and sets drop_next
519cdq.pop(start + TARGET + INTERVAL);
520assert_eq!(cdq.len(), N - 3);
521assert_eq!(cdq.current_drop_count, 1);
522assert_eq!(cdq.previous_drop_count, 1);
523assert!(cdq.drop_next.is_some());
524assert!(cdq.was_dropping_recently(&(start + TARGET + INTERVAL)));
525assert_eq!(cdq.mode, CoDelMode::Drop);
526527// In Drop mode, we have repeated drops, but then it leaves Drop mode
528 // before the queue is empty.
529assert!(cdq.should_drop(&end));
530 cdq.pop(end);
531assert_eq!(cdq.len(), 1);
532assert_eq!(cdq.current_drop_count, N - 4);
533assert_eq!(cdq.mode, CoDelMode::Store);
534 }
535}