shadow_rs/network/relay/mod.rs
1use std::net::Ipv4Addr;
2use std::sync::Arc;
3use std::sync::Weak;
4
5use atomic_refcell::AtomicRefCell;
6use shadow_shim_helper_rs::simulation_time::SimulationTime;
7
8use crate::core::work::task::TaskRef;
9use crate::core::worker::Worker;
10use crate::cshadow as c;
11use crate::host::host::Host;
12use crate::network::PacketRc;
13use crate::network::packet::PacketStatus;
14use crate::network::relay::token_bucket::TokenBucket;
15use crate::utility::ObjectCounter;
16
17mod token_bucket;
18
19/// A `Relay` forwards `PacketRc`s between `PacketDevice`s, optionally enforcing a
20/// bandwidth limit on the rate at which we forward `PacketRc`s between devices.
21///
22/// The `Relay` is considered the "active" part of the `PacketRc` forwarding
23/// process: it initiates `PacketRc` forwarding and internally schedules tasks to
24/// ensure that `PacketRc`s are continually forwarded over time without exceeding
25/// the configured `RateLimit`.
26///
27/// An `Ipv4Addr` associated with a source `PacketDevice` object is supplied
28/// when creating a `Relay`. This `Ipv4Addr` is only meaningful to the extent
29/// that the `Host` understands how to map this `Ipv4Addr` to the intended
30/// `PacketDevice` when `Host::get_packet_device(Ipv4Addr)` is called. This
31/// source `PacketDevice` supplies the `Relay` with a stream of `PacketRc`s
32/// (through its implementation of `PacketDevice::pop()`) that the `Relay` will
33/// forward to a destination.
34///
35/// `Relay::notify()` must be called whenever the source `PacketDevice` changes
36/// state from empty to non-empty, to trigger an idle `Relay` to start
37/// forwarding `PacketRc`s again.
38///
39/// For each `PacketRc` that needs to be forwarded, the `Relay` uses the
40/// `PacketRc`'s destination `Ipv4Addr` to obtain the destination `PacketDevice`
41/// from the `Host` by calling its `Host::get_packet_device(Ipv4Addr)` function.
42/// The `PacketRc` is forwarded to the destination through the destination
43/// `PacketDevice`'s implementation of `PacketDevice::push()`.
44///
45/// This design allows the `Host` to use `Host::get_packet_device` to define its
46/// own routing table.
47///
48/// Note that `PacketRc`s forwarded between identical source and destination
49/// `PacketDevices` are considered "local" to that device and exempt from any
50/// configured `RateLimit`.
51pub struct Relay {
52 /// Allow for internal mutability. It as assumed that this will never be
53 /// mutably borrowed outside of `Relay::forward_until_blocked()`.
54 internal: AtomicRefCell<RelayInternal>,
55}
56
57struct RelayInternal {
58 _counter: ObjectCounter,
59 rate_limiter: Option<TokenBucket>,
60 src_dev_address: Ipv4Addr,
61 state: RelayState,
62 next_packet: Option<PacketRc>,
63}
64
65/// Track's the `Relay`s state, which typically moves from Idle to Pending to
66/// Forwarding, and then back to either Idle or Pending.
67#[derive(PartialEq, Copy, Clone, Debug)]
68enum RelayState {
69 /// Relay is idle (is not currently forwarding packets) and has not
70 /// scheduled a forwarding event.
71 Idle,
72 /// A forwarding event has been scheduled, and we are waiting for it to be
73 /// executed before we start forwarding packets.
74 Pending,
75 /// We are currently running our packet forwarding loop.
76 Forwarding,
77}
78
79/// Specifies a throughput limit the relay should enforce when forwarding packets.
80pub enum RateLimit {
81 BytesPerSecond(u64),
82 Unlimited,
83}
84
85impl Relay {
86 /// Creates a new `Relay` that will forward `PacketRc`s following the given
87 /// `RateLimit` from the `PacketDevice` returned by the `Host` when passing
88 /// the given `src_dev_address` to `Host::get_packet_device()`. The `Relay`
89 /// internally schedules tasks as needed to ensure packets continue to be
90 /// forwarded over time without exceeding the configured `RateLimit`.
91 pub fn new(rate: RateLimit, src_dev_address: Ipv4Addr) -> Self {
92 let rate_limiter = match rate {
93 RateLimit::BytesPerSecond(bytes) => Some(create_token_bucket(bytes)),
94 RateLimit::Unlimited => None,
95 };
96
97 Self {
98 internal: AtomicRefCell::new(RelayInternal {
99 _counter: ObjectCounter::new("Relay"),
100 rate_limiter,
101 src_dev_address,
102 state: RelayState::Idle,
103 next_packet: None,
104 }),
105 }
106 }
107
108 /// Notify the relay that its packet source now has packets available for
109 /// relaying to the packet sink. This must be called when the source changes
110 /// state from empty to non-empty to signal the relay to resume forwarding.
111 pub fn notify(self: &Arc<Self>, host: &Host) {
112 // The only time we hold a mutable borrow of our internals while
113 // executing outside of this module is when we're running our forwarding
114 // loop, and forwarding packets can certainly cause a call to
115 // Relay::notify(). Thus, it's safe to assume that we are in the
116 // Forwarding state if the borrow fails.
117 let state = match self.internal.try_borrow() {
118 Ok(internal) => internal.state,
119 Err(_) => RelayState::Forwarding,
120 };
121
122 #[allow(dead_code)]
123 match state {
124 RelayState::Idle => {
125 // Allow packets to accumulate and unwind the stack to forward
126 // them.
127 self.forward_later(SimulationTime::ZERO, host);
128 }
129 RelayState::Pending => {
130 log::trace!("Relay forward task already scheduled; skipping forward request.");
131 }
132 RelayState::Forwarding => {
133 log::trace!("Relay forward task currently running; skipping forward request.");
134 }
135 }
136 }
137
138 /// Schedule an event to trigger us to run the forwarding loop later, and
139 /// changes our state to `RelayState::Pending`. This allows us to run the
140 /// forwarding loop after unwinding the current stack, and allows socket
141 /// data to accumulate so we can forward multiple packets at once.
142 ///
143 /// Must not be called if our state is already `RelayState::Pending`, to
144 /// avoid scheduling multiple forwarding events simultaneously.
145 fn forward_later(self: &Arc<Self>, delay: SimulationTime, host: &Host) {
146 // We should not already be waiting for a scheduled forwarding task.
147 {
148 let mut internal = self.internal.borrow_mut();
149 assert_ne!(internal.state, RelayState::Pending);
150 internal.state = RelayState::Pending;
151 }
152
153 // Schedule a forwarding task using a weak reference to allow the relay
154 // to be dropped before the forwarding task is executed.
155 let weak_self = Arc::downgrade(self);
156 let task = TaskRef::new(move |host| Self::run_forward_task(&weak_self, host));
157 host.schedule_task_with_delay(task, delay);
158 log::trace!(
159 "Relay src={} scheduled event to start forwarding packets after {:?}",
160 self.internal.borrow().src_dev_address,
161 delay
162 );
163 }
164
165 /// The initial entry point for the forwarding event executed by the scheduler.
166 fn run_forward_task(weak_self: &Weak<Self>, host: &Host) {
167 // Ignore the task if the relay was dropped while the task was pending.
168 let Some(strong_self) = Weak::upgrade(weak_self) else {
169 log::trace!("Relay no longer exists; skipping forward task.");
170 return;
171 };
172
173 // Relay still exists, and task is no longer pending.
174 strong_self.internal.borrow_mut().state = RelayState::Idle;
175
176 // Run the main packet forwarding loop.
177 strong_self.forward_now(host);
178 }
179
180 /// Runs the forward loop, and then schedules a task to run it again if needed.
181 fn forward_now(self: &Arc<Self>, host: &Host) {
182 if let Some(blocking_dur) = self.forward_until_blocked(host) {
183 // Block until we have enough tokens to forward the next packet.
184 // Our state will be changed to `RelayState::Pending`.
185 self.forward_later(blocking_dur, host);
186 }
187 }
188
189 /// Run our main packet forwarding loop that continues forwarding packets
190 /// from the source device to the destination device until we run out of
191 /// either tokens or packets.
192 ///
193 /// Causes our state to change to `RelayState::Forwarding` during execution
194 /// of the loop, and then either `RelayState::Idle` if we run out of
195 /// packets, or `RelayState::Pending` if we run out of tokens before all
196 /// available packets are forwarded and we scheduled an event to resume
197 /// forwarding later.
198 ///
199 /// The duration until we have enough tokens to forward the next packet is
200 /// returned in case we run out of tokens in the forwarding loop.
201 fn forward_until_blocked(self: &Arc<Self>, host: &Host) -> Option<SimulationTime> {
202 // We don't enforce rate limits during bootstrapping.
203 let is_bootstrapping = Worker::is_bootstrapping();
204
205 // Get a mutable reference to internals, which we'll continuously hold
206 // for the rest of this function (for the entire time that we remain in
207 // the Forwarding state).
208 let mut internal = self.internal.borrow_mut();
209 internal.state = RelayState::Forwarding;
210
211 // The source device supplies us with the stream of packets to forward.
212 let src = host.get_packet_device(internal.src_dev_address);
213
214 // Continue forwarding until we run out of either packets or tokens.
215 loop {
216 // Get next packet from our local cache, or from the source device.
217 let Some(packet) = internal.next_packet.take().or_else(|| src.pop()) else {
218 // Ran out of packets to forward.
219 internal.state = RelayState::Idle;
220 return None;
221 };
222
223 // The packet is local if the src and dst refer to the same device.
224 // This can happen for the loopback device, and for the inet device
225 // if both sockets use the public ip to communicate over localhost.
226 let is_local = src.get_address() == *packet.dst_ipv4_address().ip();
227
228 // Check if we have enough tokens for forward the packet. Rate
229 // limits do not apply during bootstrapping, or if the source and
230 // destination are the same device.
231 if !is_bootstrapping && !is_local {
232 // Rate limit applies only if we have a token bucket.
233 if let Some(tb) = internal.rate_limiter.as_mut() {
234 // Try to remove tokens for this packet.
235 if let Err(blocking_dur) = tb.comforming_remove(packet.len() as u64) {
236 // Too few tokens, need to block.
237 log::trace!(
238 "Relay src={} dst={} exceeded rate limit, need {} more tokens \
239 for packet of size {}, blocking for {:?}",
240 src.get_address(),
241 packet.dst_ipv4_address().ip(),
242 packet
243 .len()
244 .saturating_sub(tb.comforming_remove(0).unwrap() as usize),
245 packet.len(),
246 blocking_dur
247 );
248
249 // Cache the packet until we can forward it later.
250 packet.add_status(PacketStatus::RelayCached);
251 assert!(internal.next_packet.is_none());
252 internal.next_packet = Some(packet);
253 internal.state = RelayState::Idle;
254
255 // Call Relay::forward_later() after dropping the mutable borrow.
256 return Some(blocking_dur);
257 }
258 }
259 }
260
261 // Forward the packet to the destination device now.
262 packet.add_status(PacketStatus::RelayForwarded);
263 if is_local {
264 // The source and destination are the same. Avoid a double
265 // mutable borrow of the packet device.
266 src.push(packet);
267 } else {
268 // The source and destination are different.
269 let dst = host.get_packet_device(*packet.dst_ipv4_address().ip());
270 dst.push(packet);
271 }
272 }
273 }
274}
275
276/// Configures a token bucket according the the given bytes_per_second rate
277/// limit. We always refill at least 1 byte per millisecond.
278fn create_token_bucket(bytes_per_second: u64) -> TokenBucket {
279 let refill_interval = SimulationTime::from_millis(1);
280 let refill_size = std::cmp::max(1, bytes_per_second / 1000);
281
282 // Only the `capacity` of the bucket is increased by the burst allowance,
283 // not the `refill_size`. Therefore, the long term rate limit enforced by
284 // the token bucket (configured by `refill_size`) is not affected much.
285 let capacity = refill_size + get_burst_allowance();
286
287 TokenBucket::new(capacity, refill_size, refill_interval).unwrap()
288}
289
290/// Returns the "burst allowance" we use in our token buckets.
291///
292/// What the burst allowance ensures is that we don't lose tokens that are
293/// unused because we don't fragment packets. If we set the capacity of the
294/// bucket to exactly the refill size (i.e., without the `CONFIG_MTU` burst
295/// allowance) and there are only 1499 tokens left in this sending round, a full
296/// packet would not fit. The next time the bucket refills, it adds
297/// `refill_size` tokens but in doing so 1499 tokens would fall over the top of
298/// the bucket; these tokens would represent wasted bandwidth, and could
299/// potentially accumulate in every refill interval leading to a significantly
300/// lower achievable bandwidth.
301///
302/// A downside of the `CONFIG_MTU` burst allowance is that the sending rate
303/// could possibly become "bursty" with a behavior such as:
304/// - interval 1: send `refill_size` + `CONFIG_MTU` bytes, sending over the
305/// allowance by 1500 bytes
306/// - refill: `refill_size` token gets added to the bucket
307/// - interval 2: send `refill_size` - `CONFIG_MTU` bytes, sending under the
308/// allowance by 1500 bytes
309/// - refill: `refill_size` token gets added to the bucket
310/// - interval 3: send `refill_size` + `CONFIG_MTU` bytes, sending over the
311/// allowance by 1500 bytes
312/// - repeat
313///
314/// So it could become less smooth and more "bursty" even though the long term
315/// average is maintained. But I don't think this would happen much in practice,
316/// and we are batching sends for performance reasons.
317fn get_burst_allowance() -> u64 {
318 c::CONFIG_MTU.into()
319}