shadow_rs/network/relay/
mod.rs

1use std::net::Ipv4Addr;
2use std::sync::Arc;
3use std::sync::Weak;
4
5use atomic_refcell::AtomicRefCell;
6use shadow_shim_helper_rs::simulation_time::SimulationTime;
7
8use crate::core::work::task::TaskRef;
9use crate::core::worker::Worker;
10use crate::cshadow as c;
11use crate::host::host::Host;
12use crate::network::PacketRc;
13use crate::network::packet::PacketStatus;
14use crate::network::relay::token_bucket::TokenBucket;
15use crate::utility::ObjectCounter;
16
17mod token_bucket;
18
19/// A `Relay` forwards `PacketRc`s between `PacketDevice`s, optionally enforcing a
20/// bandwidth limit on the rate at which we forward `PacketRc`s between devices.
21///
22/// The `Relay` is considered the "active" part of the `PacketRc` forwarding
23/// process: it initiates `PacketRc` forwarding and internally schedules tasks to
24/// ensure that `PacketRc`s are continually forwarded over time without exceeding
25/// the configured `RateLimit`.
26///
27/// An `Ipv4Addr` associated with a source `PacketDevice` object is supplied
28/// when creating a `Relay`. This `Ipv4Addr` is only meaningful to the extent
29/// that the `Host` understands how to map this `Ipv4Addr` to the intended
30/// `PacketDevice` when `Host::get_packet_device(Ipv4Addr)` is called. This
31/// source `PacketDevice` supplies the `Relay` with a stream of `PacketRc`s
32/// (through its implementation of `PacketDevice::pop()`) that the `Relay` will
33/// forward to a destination.
34///
35/// `Relay::notify()` must be called whenever the source `PacketDevice` changes
36/// state from empty to non-empty, to trigger an idle `Relay` to start
37/// forwarding `PacketRc`s again.
38///
39/// For each `PacketRc` that needs to be forwarded, the `Relay` uses the
40/// `PacketRc`'s destination `Ipv4Addr` to obtain the destination `PacketDevice`
41/// from the `Host` by calling its `Host::get_packet_device(Ipv4Addr)` function.
42/// The `PacketRc` is forwarded to the destination through the destination
43/// `PacketDevice`'s implementation of `PacketDevice::push()`.
44///
45/// This design allows the `Host` to use `Host::get_packet_device` to define its
46/// own routing table.
47///
48/// Note that `PacketRc`s forwarded between identical source and destination
49/// `PacketDevices` are considered "local" to that device and exempt from any
50/// configured `RateLimit`.
51pub struct Relay {
52    /// Allow for internal mutability. It as assumed that this will never be
53    /// mutably borrowed outside of `Relay::forward_until_blocked()`.
54    internal: AtomicRefCell<RelayInternal>,
55}
56
57struct RelayInternal {
58    _counter: ObjectCounter,
59    rate_limiter: Option<TokenBucket>,
60    src_dev_address: Ipv4Addr,
61    state: RelayState,
62    next_packet: Option<PacketRc>,
63}
64
65/// Track's the `Relay`s state, which typically moves from Idle to Pending to
66/// Forwarding, and then back to either Idle or Pending.
67#[derive(PartialEq, Copy, Clone, Debug)]
68enum RelayState {
69    /// Relay is idle (is not currently forwarding packets) and has not
70    /// scheduled a forwarding event.
71    Idle,
72    /// A forwarding event has been scheduled, and we are waiting for it to be
73    /// executed before we start forwarding packets.
74    Pending,
75    /// We are currently running our packet forwarding loop.
76    Forwarding,
77}
78
79/// Specifies a throughput limit the relay should enforce when forwarding packets.
80pub enum RateLimit {
81    BytesPerSecond(u64),
82    Unlimited,
83}
84
85impl Relay {
86    /// Creates a new `Relay` that will forward `PacketRc`s following the given
87    /// `RateLimit` from the `PacketDevice` returned by the `Host` when passing
88    /// the given `src_dev_address` to `Host::get_packet_device()`. The `Relay`
89    /// internally schedules tasks as needed to ensure packets continue to be
90    /// forwarded over time without exceeding the configured `RateLimit`.
91    pub fn new(rate: RateLimit, src_dev_address: Ipv4Addr) -> Self {
92        let rate_limiter = match rate {
93            RateLimit::BytesPerSecond(bytes) => Some(create_token_bucket(bytes)),
94            RateLimit::Unlimited => None,
95        };
96
97        Self {
98            internal: AtomicRefCell::new(RelayInternal {
99                _counter: ObjectCounter::new("Relay"),
100                rate_limiter,
101                src_dev_address,
102                state: RelayState::Idle,
103                next_packet: None,
104            }),
105        }
106    }
107
108    /// Notify the relay that its packet source now has packets available for
109    /// relaying to the packet sink. This must be called when the source changes
110    /// state from empty to non-empty to signal the relay to resume forwarding.
111    pub fn notify(self: &Arc<Self>, host: &Host) {
112        // The only time we hold a mutable borrow of our internals while
113        // executing outside of this module is when we're running our forwarding
114        // loop, and forwarding packets can certainly cause a call to
115        // Relay::notify(). Thus, it's safe to assume that we are in the
116        // Forwarding state if the borrow fails.
117        let state = match self.internal.try_borrow() {
118            Ok(internal) => internal.state,
119            Err(_) => RelayState::Forwarding,
120        };
121
122        #[allow(dead_code)]
123        match state {
124            RelayState::Idle => {
125                // Allow packets to accumulate and unwind the stack to forward
126                // them.
127                self.forward_later(SimulationTime::ZERO, host);
128            }
129            RelayState::Pending => {
130                log::trace!("Relay forward task already scheduled; skipping forward request.");
131            }
132            RelayState::Forwarding => {
133                log::trace!("Relay forward task currently running; skipping forward request.");
134            }
135        }
136    }
137
138    /// Schedule an event to trigger us to run the forwarding loop later, and
139    /// changes our state to `RelayState::Pending`. This allows us to run the
140    /// forwarding loop after unwinding the current stack, and allows socket
141    /// data to accumulate so we can forward multiple packets at once.
142    ///
143    /// Must not be called if our state is already `RelayState::Pending`, to
144    /// avoid scheduling multiple forwarding events simultaneously.
145    fn forward_later(self: &Arc<Self>, delay: SimulationTime, host: &Host) {
146        // We should not already be waiting for a scheduled forwarding task.
147        {
148            let mut internal = self.internal.borrow_mut();
149            assert_ne!(internal.state, RelayState::Pending);
150            internal.state = RelayState::Pending;
151        }
152
153        // Schedule a forwarding task using a weak reference to allow the relay
154        // to be dropped before the forwarding task is executed.
155        let weak_self = Arc::downgrade(self);
156        let task = TaskRef::new(move |host| Self::run_forward_task(&weak_self, host));
157        host.schedule_task_with_delay(task, delay);
158        log::trace!(
159            "Relay src={} scheduled event to start forwarding packets after {:?}",
160            self.internal.borrow().src_dev_address,
161            delay
162        );
163    }
164
165    /// The initial entry point for the forwarding event executed by the scheduler.
166    fn run_forward_task(weak_self: &Weak<Self>, host: &Host) {
167        // Ignore the task if the relay was dropped while the task was pending.
168        let Some(strong_self) = Weak::upgrade(weak_self) else {
169            log::trace!("Relay no longer exists; skipping forward task.");
170            return;
171        };
172
173        // Relay still exists, and task is no longer pending.
174        strong_self.internal.borrow_mut().state = RelayState::Idle;
175
176        // Run the main packet forwarding loop.
177        strong_self.forward_now(host);
178    }
179
180    /// Runs the forward loop, and then schedules a task to run it again if needed.
181    fn forward_now(self: &Arc<Self>, host: &Host) {
182        if let Some(blocking_dur) = self.forward_until_blocked(host) {
183            // Block until we have enough tokens to forward the next packet.
184            // Our state will be changed to `RelayState::Pending`.
185            self.forward_later(blocking_dur, host);
186        }
187    }
188
189    /// Run our main packet forwarding loop that continues forwarding packets
190    /// from the source device to the destination device until we run out of
191    /// either tokens or packets.
192    ///
193    /// Causes our state to change to `RelayState::Forwarding` during execution
194    /// of the loop, and then either `RelayState::Idle` if we run out of
195    /// packets, or `RelayState::Pending` if we run out of tokens before all
196    /// available packets are forwarded and we scheduled an event to resume
197    /// forwarding later.
198    ///
199    /// The duration until we have enough tokens to forward the next packet is
200    /// returned in case we run out of tokens in the forwarding loop.
201    fn forward_until_blocked(self: &Arc<Self>, host: &Host) -> Option<SimulationTime> {
202        // We don't enforce rate limits during bootstrapping.
203        let is_bootstrapping = Worker::is_bootstrapping();
204
205        // Get a mutable reference to internals, which we'll continuously hold
206        // for the rest of this function (for the entire time that we remain in
207        // the Forwarding state).
208        let mut internal = self.internal.borrow_mut();
209        internal.state = RelayState::Forwarding;
210
211        // The source device supplies us with the stream of packets to forward.
212        let src = host.get_packet_device(internal.src_dev_address);
213
214        // Continue forwarding until we run out of either packets or tokens.
215        loop {
216            // Get next packet from our local cache, or from the source device.
217            let Some(packet) = internal.next_packet.take().or_else(|| src.pop()) else {
218                // Ran out of packets to forward.
219                internal.state = RelayState::Idle;
220                return None;
221            };
222
223            // The packet is local if the src and dst refer to the same device.
224            // This can happen for the loopback device, and for the inet device
225            // if both sockets use the public ip to communicate over localhost.
226            let is_local = src.get_address() == *packet.dst_ipv4_address().ip();
227
228            // Check if we have enough tokens for forward the packet. Rate
229            // limits do not apply during bootstrapping, or if the source and
230            // destination are the same device.
231            if !is_bootstrapping && !is_local {
232                // Rate limit applies only if we have a token bucket.
233                if let Some(tb) = internal.rate_limiter.as_mut() {
234                    // Try to remove tokens for this packet.
235                    if let Err(blocking_dur) = tb.comforming_remove(packet.len() as u64) {
236                        // Too few tokens, need to block.
237                        log::trace!(
238                            "Relay src={} dst={} exceeded rate limit, need {} more tokens \
239                            for packet of size {}, blocking for {:?}",
240                            src.get_address(),
241                            packet.dst_ipv4_address().ip(),
242                            packet
243                                .len()
244                                .saturating_sub(tb.comforming_remove(0).unwrap() as usize),
245                            packet.len(),
246                            blocking_dur
247                        );
248
249                        // Cache the packet until we can forward it later.
250                        packet.add_status(PacketStatus::RelayCached);
251                        assert!(internal.next_packet.is_none());
252                        internal.next_packet = Some(packet);
253                        internal.state = RelayState::Idle;
254
255                        // Call Relay::forward_later() after dropping the mutable borrow.
256                        return Some(blocking_dur);
257                    }
258                }
259            }
260
261            // Forward the packet to the destination device now.
262            packet.add_status(PacketStatus::RelayForwarded);
263            if is_local {
264                // The source and destination are the same. Avoid a double
265                // mutable borrow of the packet device.
266                src.push(packet);
267            } else {
268                // The source and destination are different.
269                let dst = host.get_packet_device(*packet.dst_ipv4_address().ip());
270                dst.push(packet);
271            }
272        }
273    }
274}
275
276/// Configures a token bucket according the the given bytes_per_second rate
277/// limit. We always refill at least 1 byte per millisecond.
278fn create_token_bucket(bytes_per_second: u64) -> TokenBucket {
279    let refill_interval = SimulationTime::from_millis(1);
280    let refill_size = std::cmp::max(1, bytes_per_second / 1000);
281
282    // Only the `capacity` of the bucket is increased by the burst allowance,
283    // not the `refill_size`. Therefore, the long term rate limit enforced by
284    // the token bucket (configured by `refill_size`) is not affected much.
285    let capacity = refill_size + get_burst_allowance();
286
287    TokenBucket::new(capacity, refill_size, refill_interval).unwrap()
288}
289
290/// Returns the "burst allowance" we use in our token buckets.
291///
292/// What the burst allowance ensures is that we don't lose tokens that are
293/// unused because we don't fragment packets. If we set the capacity of the
294/// bucket to exactly the refill size (i.e., without the `CONFIG_MTU` burst
295/// allowance) and there are only 1499 tokens left in this sending round, a full
296/// packet would not fit. The next time the bucket refills, it adds
297/// `refill_size` tokens but in doing so 1499 tokens would fall over the top of
298/// the bucket; these tokens would represent wasted bandwidth, and could
299/// potentially accumulate in every refill interval leading to a significantly
300/// lower achievable bandwidth.
301///
302/// A downside of the `CONFIG_MTU` burst allowance is that the sending rate
303/// could possibly become "bursty" with a behavior such as:
304/// - interval 1: send `refill_size` + `CONFIG_MTU` bytes, sending over the
305///   allowance by 1500 bytes
306/// - refill: `refill_size` token gets added to the bucket
307/// - interval 2: send `refill_size` - `CONFIG_MTU` bytes, sending under the
308///   allowance by 1500 bytes
309/// - refill: `refill_size` token gets added to the bucket
310/// - interval 3: send `refill_size` + `CONFIG_MTU` bytes, sending over the
311///   allowance by 1500 bytes
312/// - repeat
313///
314/// So it could become less smooth and more "bursty" even though the long term
315/// average is maintained. But I don't think this would happen much in practice,
316/// and we are batching sends for performance reasons.
317fn get_burst_allowance() -> u64 {
318    c::CONFIG_MTU.into()
319}