shadow_rs/network/router/
codel_queue.rs1use std::{collections::VecDeque, time::Duration};
14
15use shadow_shim_helper_rs::{emulated_time::EmulatedTime, simulation_time::SimulationTime};
16
17use crate::cshadow as c;
18use crate::network::packet::{PacketRc, PacketStatus};
19
20const TARGET: SimulationTime = SimulationTime::from_duration(Duration::from_millis(10));
24
25const INTERVAL: SimulationTime = SimulationTime::from_duration(Duration::from_millis(100));
29
30const LIMIT: usize = usize::MAX;
34
35struct CoDelPopItem {
37 packet: PacketRc,
38 ok_to_drop: bool,
39}
40
41#[derive(PartialEq, Debug)]
43enum CoDelMode {
44 Store,
46 Drop,
48}
49
50struct CoDelElement {
52 packet: PacketRc,
53 enqueue_ts: EmulatedTime,
54}
55
56pub struct CoDelQueue {
66 elements: VecDeque<CoDelElement>,
68 total_bytes_stored: usize,
70 mode: CoDelMode,
72 interval_end: Option<EmulatedTime>,
75 drop_next: Option<EmulatedTime>,
77 current_drop_count: usize,
79 previous_drop_count: usize,
81}
82
83impl CoDelQueue {
84 pub fn new() -> CoDelQueue {
86 CoDelQueue {
87 elements: VecDeque::new(),
88 total_bytes_stored: 0,
89 mode: CoDelMode::Store,
90 interval_end: None,
91 drop_next: None,
92 current_drop_count: 0,
93 previous_drop_count: 0,
94 }
95 }
96
97 #[cfg(test)]
99 pub fn len(&self) -> usize {
100 self.elements.len()
101 }
102
103 #[cfg(test)]
105 pub fn is_empty(&self) -> bool {
106 self.len() == 0
107 }
108
109 #[cfg(test)]
114 pub fn peek(&self) -> Option<&PacketRc> {
115 self.elements.front().map(|x| &x.packet)
116 }
117
118 pub fn pop(&mut self, now: EmulatedTime) -> Option<PacketRc> {
126 let maybe_packet = match self.codel_pop(&now) {
127 Some(item) => match item.ok_to_drop {
128 true => match self.mode {
129 CoDelMode::Store => self.drop_from_store_mode(&now, item.packet),
130 CoDelMode::Drop => self.drop_from_drop_mode(&now, item.packet),
131 },
132 false => {
133 self.mode = CoDelMode::Store;
135 Some(item.packet)
136 }
137 },
138 None => {
139 self.mode = CoDelMode::Store;
141 None
142 }
143 };
144
145 maybe_packet.inspect(|p| {
146 p.add_status(PacketStatus::RouterDequeued);
147 })
148 }
149
150 fn drop_from_store_mode(&mut self, now: &EmulatedTime, packet: PacketRc) -> Option<PacketRc> {
151 debug_assert_eq!(self.mode, CoDelMode::Store);
152
153 self.drop_packet(packet);
155 let next_item = self.codel_pop(now);
156 self.mode = CoDelMode::Drop;
157
158 let delta = self
160 .current_drop_count
161 .saturating_sub(self.previous_drop_count);
162 self.current_drop_count = match self.was_dropping_recently(now) && delta > 1 {
163 true => delta,
164 false => 1,
165 };
166 self.drop_next = Some(CoDelQueue::apply_control_law(now, self.current_drop_count));
167 self.previous_drop_count = self.current_drop_count;
168
169 next_item.map(|x| x.packet)
170 }
171
172 fn drop_from_drop_mode(&mut self, now: &EmulatedTime, packet: PacketRc) -> Option<PacketRc> {
173 debug_assert_eq!(self.mode, CoDelMode::Drop);
174
175 let mut item = Some(CoDelPopItem {
176 packet,
177 ok_to_drop: true,
178 });
179
180 while item.is_some() && self.mode == CoDelMode::Drop && self.should_drop(now) {
182 self.drop_packet(item.unwrap().packet);
183 self.current_drop_count += 1;
184
185 item = self.codel_pop(now);
186
187 match item.as_ref().is_some_and(|x| x.ok_to_drop) {
188 true => {
189 self.drop_next = Some(CoDelQueue::apply_control_law(
192 &self.drop_next.unwrap(),
193 self.current_drop_count,
194 ));
195 }
196 false => self.mode = CoDelMode::Store,
197 }
198 }
199
200 item.map(|x| x.packet)
201 }
202
203 fn codel_pop(&mut self, now: &EmulatedTime) -> Option<CoDelPopItem> {
205 match self.elements.pop_front() {
206 Some(element) => {
207 debug_assert!(element.packet.len() <= self.total_bytes_stored);
209 self.total_bytes_stored =
210 self.total_bytes_stored.saturating_sub(element.packet.len());
211
212 debug_assert!(now >= &element.enqueue_ts);
213 let standing_delay = now.saturating_duration_since(&element.enqueue_ts);
214 let ok_to_drop = self.process_standing_delay(now, standing_delay);
215
216 Some(CoDelPopItem {
217 packet: element.packet,
218 ok_to_drop,
219 })
220 }
221 None => {
222 self.interval_end = None;
224 None
225 }
226 }
227 }
228
229 fn process_standing_delay(
232 &mut self,
233 now: &EmulatedTime,
234 standing_delay: SimulationTime,
235 ) -> bool {
236 if standing_delay < TARGET || self.total_bytes_stored <= c::CONFIG_MTU.try_into().unwrap() {
237 self.interval_end = None;
241 false
242 } else {
243 match self.interval_end {
245 Some(end) => {
246 now >= &end
251 }
252 None => {
253 self.interval_end = Some(now.saturating_add(INTERVAL));
258 false
259 }
260 }
261 }
262 }
263
264 fn should_drop(&self, now: &EmulatedTime) -> bool {
266 match self.drop_next {
267 Some(next) => now >= &next,
268 None => false, }
270 }
271
272 fn was_dropping_recently(&self, now: &EmulatedTime) -> bool {
274 match self.drop_next {
275 Some(drop_next) => {
276 now.saturating_duration_since(&drop_next) < INTERVAL.saturating_mul(16)
278 }
279 None => false, }
281 }
282
283 fn apply_control_law(time: &EmulatedTime, count: usize) -> EmulatedTime {
286 let increment = {
287 let interval = INTERVAL.as_nanos_f64();
288 let sqrt_count = match count {
289 0 => 1f64,
290 _ => (count as f64).sqrt(),
291 };
292 let div = interval / sqrt_count;
293 SimulationTime::from_nanos(div.round() as u64)
294 };
295 let original = time.to_abs_simtime();
296 let adjusted = original.saturating_add(increment);
297 EmulatedTime::from_abs_simtime(adjusted)
298 }
299
300 pub fn push(&mut self, packet: PacketRc, now: EmulatedTime) {
304 if self.elements.len() < LIMIT {
305 packet.add_status(PacketStatus::RouterEnqueued);
306 self.total_bytes_stored += packet.len();
307 self.elements.push_back(CoDelElement {
308 packet,
309 enqueue_ts: now,
310 });
311 } else {
312 self.drop_packet(packet);
316 }
317 }
318
319 fn drop_packet(&self, packet: PacketRc) {
320 packet.add_status(PacketStatus::RouterDropped);
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use crate::network::tests::mock_time_millis;
328
329 #[test]
333 fn empty() {
334 let now = mock_time_millis(1000);
335 let mut cdq = CoDelQueue::new();
336 assert_eq!(cdq.len(), 0);
337 assert!(cdq.is_empty());
338 assert!(cdq.peek().is_none());
339 assert!(cdq.pop(now).is_none());
340 }
341
342 #[test]
343 #[cfg_attr(miri, ignore)]
344 fn push_pop_simple() {
345 let now = mock_time_millis(1000);
346 let mut cdq = CoDelQueue::new();
347
348 const N: usize = 10;
349
350 for i in 1..=N {
351 assert_eq!(cdq.len(), i - 1);
352 cdq.push(PacketRc::new_ipv4_udp_mock(), now);
353 assert_eq!(cdq.len(), i);
354 }
355 for i in 1..=N {
356 assert_eq!(cdq.len(), N - i + 1);
357 assert!(cdq.pop(now).is_some());
358 assert_eq!(cdq.len(), N - i);
359 }
360 assert_eq!(cdq.len(), 0);
361 assert!(cdq.is_empty());
362 assert!(cdq.pop(now).is_none());
363 }
364
365 #[test]
366 fn control_law() {
367 let now = mock_time_millis(1000);
368
369 for i in 0..2 {
371 assert_eq!(
372 CoDelQueue::apply_control_law(&now, i).duration_since(&now),
373 INTERVAL
374 );
375 }
376
377 for i in 2..20 {
379 assert_eq!(
380 CoDelQueue::apply_control_law(&now, i).duration_since(&now),
381 SimulationTime::from_nanos(
382 (INTERVAL.as_nanos_f64() / (i as f64).sqrt()).round() as u64
383 )
384 );
385 }
386 }
387
388 #[test]
389 #[cfg_attr(miri, ignore)]
390 fn interval() {
391 let one = SimulationTime::try_from_millis(1).unwrap();
392
393 let start = mock_time_millis(1000);
394
395 let mut cdq = CoDelQueue::new();
396 for _ in 0..5 {
397 cdq.push(PacketRc::new_ipv4_udp_mock(), start);
398 }
399 assert!(cdq.total_bytes_stored > c::CONFIG_MTU.try_into().unwrap());
400
401 assert!(cdq.interval_end.is_none());
406 let now = start + TARGET - one;
407 assert!(!cdq.process_standing_delay(&now, TARGET - one));
408 assert!(cdq.interval_end.is_none());
409
410 let now = start + TARGET;
412 assert!(!cdq.process_standing_delay(&now, TARGET));
413 assert!(cdq.interval_end.is_some());
414 assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
415
416 let now = start + TARGET + INTERVAL;
418 assert!(cdq.process_standing_delay(&now, TARGET + INTERVAL));
419 assert!(cdq.interval_end.is_some());
420 assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
421
422 let now = start + TARGET + INTERVAL * 2u32;
423 assert!(cdq.process_standing_delay(&now, TARGET + INTERVAL * 2u32));
424 assert!(cdq.interval_end.is_some());
425 assert_eq!(cdq.interval_end.unwrap(), start + TARGET + INTERVAL);
426
427 let now = start + TARGET + INTERVAL * 2u32;
429 assert!(!cdq.process_standing_delay(&now, one));
430 assert!(cdq.interval_end.is_none());
431 }
432
433 #[test]
434 #[cfg_attr(miri, ignore)]
435 fn mode() {
436 let one = SimulationTime::try_from_millis(1).unwrap();
437
438 let start = mock_time_millis(1000);
439
440 let mut cdq = CoDelQueue::new();
441 const N: usize = 6;
442 for _ in 0..N {
443 cdq.push(PacketRc::new_ipv4_udp_mock(), start);
444 }
445 assert!(cdq.total_bytes_stored > c::CONFIG_MTU.try_into().unwrap());
446 assert_eq!(cdq.len(), N);
447 assert_eq!(cdq.mode, CoDelMode::Store);
448
449 cdq.pop(start + TARGET - one);
454 assert_eq!(cdq.len(), N - 1);
455 assert_eq!(cdq.mode, CoDelMode::Store);
456
457 cdq.pop(start + TARGET);
459 assert_eq!(cdq.len(), N - 2);
460 assert_eq!(cdq.mode, CoDelMode::Store);
461
462 cdq.pop(start + TARGET + INTERVAL - one);
464 assert_eq!(cdq.len(), N - 3);
465 assert_eq!(cdq.mode, CoDelMode::Store);
466
467 cdq.pop(start + TARGET + INTERVAL);
469 assert_eq!(cdq.len(), N - 5);
470 assert_eq!(cdq.mode, CoDelMode::Drop);
471
472 for _ in 0..3 {
475 cdq.push(
477 PacketRc::new_ipv4_udp_mock(),
478 start + TARGET + INTERVAL * 2u32 - one,
479 );
480 }
481 cdq.pop(start + TARGET + INTERVAL * 2u32);
482 assert_eq!(cdq.mode, CoDelMode::Store);
483 }
484
485 #[test]
486 fn drop_empty() {
487 let start = mock_time_millis(1000);
488 let mut cdq = CoDelQueue::new();
489 cdq.mode = CoDelMode::Drop;
490 cdq.pop(start);
491 assert_eq!(cdq.mode, CoDelMode::Store);
492 }
493
494 #[test]
495 #[cfg_attr(miri, ignore)]
496 fn drop_many() {
497 let start = mock_time_millis(1000);
498 let end = mock_time_millis(1000000);
499
500 let mut cdq = CoDelQueue::new();
501 const N: usize = 20;
502 for _ in 0..N {
503 cdq.push(PacketRc::new_ipv4_udp_mock(), start);
504 }
505 assert_eq!(cdq.len(), N);
506
507 assert_eq!(cdq.mode, CoDelMode::Store);
509
510 cdq.pop(start + TARGET);
512 assert_eq!(cdq.len(), N - 1);
513 assert_eq!(cdq.current_drop_count, 0);
514 assert_eq!(cdq.previous_drop_count, 0);
515 assert!(!cdq.was_dropping_recently(&(start + TARGET)));
516 assert_eq!(cdq.mode, CoDelMode::Store);
517
518 cdq.pop(start + TARGET + INTERVAL);
520 assert_eq!(cdq.len(), N - 3);
521 assert_eq!(cdq.current_drop_count, 1);
522 assert_eq!(cdq.previous_drop_count, 1);
523 assert!(cdq.drop_next.is_some());
524 assert!(cdq.was_dropping_recently(&(start + TARGET + INTERVAL)));
525 assert_eq!(cdq.mode, CoDelMode::Drop);
526
527 assert!(cdq.should_drop(&end));
530 cdq.pop(end);
531 assert_eq!(cdq.len(), 1);
532 assert_eq!(cdq.current_drop_count, N - 4);
533 assert_eq!(cdq.mode, CoDelMode::Store);
534 }
535}