1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::sync::Weak;

use atomic_refcell::AtomicRefCell;
use shadow_shim_helper_rs::simulation_time::SimulationTime;

use crate::core::work::task::TaskRef;
use crate::core::worker::Worker;
use crate::cshadow as c;
use crate::host::host::Host;
use crate::network::packet::PacketStatus;
use crate::network::relay::token_bucket::TokenBucket;
use crate::network::PacketRc;
use crate::utility::ObjectCounter;

mod token_bucket;

/// A `Relay` forwards `PacketRc`s between `PacketDevice`s, optionally enforcing a
/// bandwidth limit on the rate at which we forward `PacketRc`s between devices.
///
/// The `Relay` is considered the "active" part of the `PacketRc` forwarding
/// process: it initiates `PacketRc` forwarding and internally schedules tasks to
/// ensure that `PacketRc`s are continually forwarded over time without exceeding
/// the configured `RateLimit`.
///
/// An `Ipv4Addr` associated with a source `PacketDevice` object is supplied
/// when creating a `Relay`. This `Ipv4Addr` is only meaningful to the extent
/// that the `Host` understands how to map this `Ipv4Addr` to the intended
/// `PacketDevice` when `Host::get_packet_device(Ipv4Addr)` is called. This
/// source `PacketDevice` supplies the `Relay` with a stream of `PacketRc`s
/// (through its implementation of `PacketDevice::pop()`) that the `Relay` will
/// forward to a destination.
///
/// `Relay::notify()` must be called whenever the source `PacketDevice` changes
/// state from empty to non-empty, to trigger an idle `Relay` to start
/// forwarding `PacketRc`s again.
///
/// For each `PacketRc` that needs to be forwarded, the `Relay` uses the
/// `PacketRc`'s destination `Ipv4Addr` to obtain the destination `PacketDevice`
/// from the `Host` by calling its `Host::get_packet_device(Ipv4Addr)` function.
/// The `PacketRc` is forwarded to the destination through the destination
/// `PacketDevice`'s implementation of `PacketDevice::push()`.
///
/// This design allows the `Host` to use `Host::get_packet_device` to define its
/// own routing table.
///
/// Note that `PacketRc`s forwarded between identical source and destination
/// `PacketDevices` are considered "local" to that device and exempt from any
/// configured `RateLimit`.
pub struct Relay {
    /// Allow for internal mutability. It as assumed that this will never be
    /// mutably borrowed outside of `Relay::forward_until_blocked()`.
    internal: AtomicRefCell<RelayInternal>,
}

struct RelayInternal {
    _counter: ObjectCounter,
    rate_limiter: Option<TokenBucket>,
    src_dev_address: Ipv4Addr,
    state: RelayState,
    next_packet: Option<PacketRc>,
}

/// Track's the `Relay`s state, which typically moves from Idle to Pending to
/// Forwarding, and then back to either Idle or Pending.
#[derive(PartialEq, Copy, Clone, Debug)]
enum RelayState {
    /// Relay is idle (is not currently forwarding packets) and has not
    /// scheduled a forwarding event.
    Idle,
    /// A forwarding event has been scheduled, and we are waiting for it to be
    /// executed before we start forwarding packets.
    Pending,
    /// We are currently running our packet forwarding loop.
    Forwarding,
}

/// Specifies a throughput limit the relay should enforce when forwarding packets.
pub enum RateLimit {
    BytesPerSecond(u64),
    Unlimited,
}

impl Relay {
    /// Creates a new `Relay` that will forward `PacketRc`s following the given
    /// `RateLimit` from the `PacketDevice` returned by the `Host` when passing
    /// the given `src_dev_address` to `Host::get_packet_device()`. The `Relay`
    /// internally schedules tasks as needed to ensure packets continue to be
    /// forwarded over time without exceeding the configured `RateLimit`.
    pub fn new(rate: RateLimit, src_dev_address: Ipv4Addr) -> Self {
        let rate_limiter = match rate {
            RateLimit::BytesPerSecond(bytes) => Some(create_token_bucket(bytes)),
            RateLimit::Unlimited => None,
        };

        Self {
            internal: AtomicRefCell::new(RelayInternal {
                _counter: ObjectCounter::new("Relay"),
                rate_limiter,
                src_dev_address,
                state: RelayState::Idle,
                next_packet: None,
            }),
        }
    }

    /// Notify the relay that its packet source now has packets available for
    /// relaying to the packet sink. This must be called when the source changes
    /// state from empty to non-empty to signal the relay to resume forwarding.
    pub fn notify(self: &Arc<Self>, host: &Host) {
        // The only time we hold a mutable borrow of our internals while
        // executing outside of this module is when we're running our forwarding
        // loop, and forwarding packets can certainly cause a call to
        // Relay::notify(). Thus, it's safe to assume that we are in the
        // Forwarding state if the borrow fails.
        let state = match self.internal.try_borrow() {
            Ok(internal) => internal.state,
            Err(_) => RelayState::Forwarding,
        };

        #[allow(dead_code)]
        match state {
            RelayState::Idle => {
                // Allow packets to accumulate and unwind the stack to forward
                // them.
                self.forward_later(SimulationTime::ZERO, host);
            }
            RelayState::Pending => {
                log::trace!("Relay forward task already scheduled; skipping forward request.");
            }
            RelayState::Forwarding => {
                log::trace!("Relay forward task currently running; skipping forward request.");
            }
        }
    }

    /// Schedule an event to trigger us to run the forwarding loop later, and
    /// changes our state to `RelayState::Pending`. This allows us to run the
    /// forwarding loop after unwinding the current stack, and allows socket
    /// data to accumulate so we can forward multiple packets at once.
    ///
    /// Must not be called if our state is already `RelayState::Pending`, to
    /// avoid scheduling multiple forwarding events simultaneously.
    fn forward_later(self: &Arc<Self>, delay: SimulationTime, host: &Host) {
        // We should not already be waiting for a scheduled forwarding task.
        {
            let mut internal = self.internal.borrow_mut();
            assert_ne!(internal.state, RelayState::Pending);
            internal.state = RelayState::Pending;
        }

        // Schedule a forwarding task using a weak reference to allow the relay
        // to be dropped before the forwarding task is executed.
        let weak_self = Arc::downgrade(self);
        let task = TaskRef::new(move |host| Self::run_forward_task(&weak_self, host));
        host.schedule_task_with_delay(task, delay);
        log::trace!(
            "Relay src={} scheduled event to start forwarding packets after {:?}",
            self.internal.borrow().src_dev_address,
            delay
        );
    }

    /// The initial entry point for the forwarding event executed by the scheduler.
    fn run_forward_task(weak_self: &Weak<Self>, host: &Host) {
        // Ignore the task if the relay was dropped while the task was pending.
        let Some(strong_self) = Weak::upgrade(weak_self) else {
            log::trace!("Relay no longer exists; skipping forward task.");
            return;
        };

        // Relay still exists, and task is no longer pending.
        strong_self.internal.borrow_mut().state = RelayState::Idle;

        // Run the main packet forwarding loop.
        strong_self.forward_now(host);
    }

    /// Runs the forward loop, and then schedules a task to run it again if needed.
    fn forward_now(self: &Arc<Self>, host: &Host) {
        if let Some(blocking_dur) = self.forward_until_blocked(host) {
            // Block until we have enough tokens to forward the next packet.
            // Our state will be changed to `RelayState::Pending`.
            self.forward_later(blocking_dur, host);
        }
    }

    /// Run our main packet forwarding loop that continues forwarding packets
    /// from the source device to the destination device until we run out of
    /// either tokens or packets.
    ///
    /// Causes our state to change to `RelayState::Forwarding` during execution
    /// of the loop, and then either `RelayState::Idle` if we run out of
    /// packets, or `RelayState::Pending` if we run out of tokens before all
    /// available packets are forwarded and we scheduled an event to resume
    /// forwarding later.
    ///
    /// The duration until we have enough tokens to forward the next packet is
    /// returned in case we run out of tokens in the forwarding loop.
    fn forward_until_blocked(self: &Arc<Self>, host: &Host) -> Option<SimulationTime> {
        // We don't enforce rate limits during bootstrapping.
        let is_bootstrapping = Worker::is_bootstrapping();

        // Get a mutable reference to internals, which we'll continuously hold
        // for the rest of this function (for the entire time that we remain in
        // the Forwarding state).
        let mut internal = self.internal.borrow_mut();
        internal.state = RelayState::Forwarding;

        // The source device supplies us with the stream of packets to forward.
        let src = host.get_packet_device(internal.src_dev_address);

        // Continue forwarding until we run out of either packets or tokens.
        loop {
            // Get next packet from our local cache, or from the source device.
            let Some(mut packet) = internal.next_packet.take().or_else(|| src.pop()) else {
                // Ran out of packets to forward.
                internal.state = RelayState::Idle;
                return None;
            };

            // The packet is local if the src and dst refer to the same device.
            // This can happen for the loopback device, and for the inet device
            // if both sockets use the public ip to communicate over localhost.
            let is_local = src.get_address() == *packet.dst_address().ip();

            // Check if we have enough tokens for forward the packet. Rate
            // limits do not apply during bootstrapping, or if the source and
            // destination are the same device.
            if !is_bootstrapping && !is_local {
                // Rate limit applies only if we have a token bucket.
                if let Some(tb) = internal.rate_limiter.as_mut() {
                    // Try to remove tokens for this packet.
                    if let Err(blocking_dur) = tb.comforming_remove(packet.total_size() as u64) {
                        // Too few tokens, need to block.
                        log::trace!(
                            "Relay src={} dst={} exceeded rate limit, need {} more tokens \
                            for packet of size {}, blocking for {:?}",
                            src.get_address(),
                            packet.dst_address().ip(),
                            packet
                                .total_size()
                                .saturating_sub(tb.comforming_remove(0).unwrap() as usize),
                            packet.total_size(),
                            blocking_dur
                        );

                        // Cache the packet until we can forward it later.
                        packet.add_status(PacketStatus::RelayCached);
                        assert!(internal.next_packet.is_none());
                        internal.next_packet = Some(packet);
                        internal.state = RelayState::Idle;

                        // Call Relay::forward_later() after dropping the mutable borrow.
                        return Some(blocking_dur);
                    }
                }
            }

            // Forward the packet to the destination device now.
            packet.add_status(PacketStatus::RelayForwarded);
            if is_local {
                // The source and destination are the same. Avoid a double
                // mutable borrow of the packet device.
                src.push(packet);
            } else {
                // The source and destination are different.
                let dst = host.get_packet_device(*packet.dst_address().ip());
                dst.push(packet);
            }
        }
    }
}

/// Configures a token bucket according the the given bytes_per_second rate
/// limit. We always refill at least 1 byte per millisecond.
fn create_token_bucket(bytes_per_second: u64) -> TokenBucket {
    let refill_interval = SimulationTime::from_millis(1);
    let refill_size = std::cmp::max(1, bytes_per_second / 1000);

    // Only the `capacity` of the bucket is increased by the burst allowance,
    // not the `refill_size`. Therefore, the long term rate limit enforced by
    // the token bucket (configured by `refill_size`) is not affected much.
    let capacity = refill_size + get_burst_allowance();

    TokenBucket::new(capacity, refill_size, refill_interval).unwrap()
}

/// Returns the "burst allowance" we use in our token buckets.
///
/// What the burst allowance ensures is that we don't lose tokens that are
/// unused because we don't fragment packets. If we set the capacity of the
/// bucket to exactly the refill size (i.e., without the `CONFIG_MTU` burst
/// allowance) and there are only 1499 tokens left in this sending round, a full
/// packet would not fit. The next time the bucket refills, it adds
/// `refill_size` tokens but in doing so 1499 tokens would fall over the top of
/// the bucket; these tokens would represent wasted bandwidth, and could
/// potentially accumulate in every refill interval leading to a significantly
/// lower achievable bandwidth.
///
/// A downside of the `CONFIG_MTU` burst allowance is that the sending rate
/// could possibly become "bursty" with a behavior such as:
/// - interval 1: send `refill_size` + `CONFIG_MTU` bytes, sending over the
///      allowance by 1500 bytes
/// - refill: `refill_size` token gets added to the bucket
/// - interval 2: send `refill_size` - `CONFIG_MTU` bytes, sending under the
///       allowance by 1500 bytes
/// - refill: `refill_size` token gets added to the bucket
/// - interval 3: send `refill_size` + `CONFIG_MTU` bytes, sending over the
///      allowance by 1500 bytes
/// - repeat
///
/// So it could become less smooth and more "bursty" even though the long term
/// average is maintained. But I don't think this would happen much in practice,
/// and we are batching sends for performance reasons.
fn get_burst_allowance() -> u64 {
    c::CONFIG_MTU.into()
}