shadow_rs/host/network/
interface.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::collections::hash_map::Entry;
4use std::fs::File;
5use std::io::BufWriter;
6use std::net::{Ipv4Addr, SocketAddrV4};
7use std::path::PathBuf;
8
9use crate::core::configuration::QDiscMode;
10use crate::core::worker::Worker;
11use crate::host::descriptor::socket::inet::InetSocket;
12use crate::host::network::queuing::{NetworkQueue, NetworkQueueKind};
13use crate::network::PacketDevice;
14use crate::network::packet::{IanaProtocol, PacketRc, PacketStatus};
15use crate::utility::ObjectCounter;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::pcap_writer::{PacketDisplay, PcapWriter};
18
19/// The priority used by the fifo qdisc to choose the next socket to send a packet from.
20pub type FifoPacketPriority = u64;
21
22#[derive(Debug, Clone)]
23pub struct PcapOptions {
24    pub path: PathBuf,
25    pub capture_size_bytes: u32,
26}
27
28#[derive(Clone, Debug, Eq, Hash, PartialEq)]
29struct AssociatedSocketKey {
30    protocol: IanaProtocol,
31    local: SocketAddrV4,
32    remote: SocketAddrV4,
33}
34
35impl AssociatedSocketKey {
36    fn new(protocol: IanaProtocol, local: SocketAddrV4, remote: SocketAddrV4) -> Self {
37        Self {
38            protocol,
39            local,
40            remote,
41        }
42    }
43}
44
45fn setup_pcap_writer(
46    name: &str,
47    options: &PcapOptions,
48) -> std::io::Result<PcapWriter<BufWriter<File>>> {
49    let file = File::create(options.path.join(format!("{name}.pcap")))?;
50    PcapWriter::new(BufWriter::new(file), options.capture_size_bytes)
51}
52
53/// Represents a network device that can send and receive packets.
54// TODO: remove the ref cells below since the `NetworkNamespace` already stores this interface
55// in a `RefCell`. We should remove the `RefCell`s to simplify the code and fix any circular
56// code paths that exist.
57pub struct NetworkInterface {
58    addr: Ipv4Addr,
59    /// The sockets from which we will pull out packets so that we can send them over the network.
60    send_sockets: RefCell<NetworkQueue<InetSocket>>,
61    /// The sockets to which we will push incoming packets so they can be received by the network
62    /// stack and their payloads read by the managed process.
63    recv_sockets: RefCell<HashMap<AssociatedSocketKey, InetSocket>>,
64    /// If configured, assists us in writing out pcap files of our packet flows.
65    pcap: RefCell<Option<PcapWriter<BufWriter<File>>>>,
66    /// Used to prevent recursion during cleanup.
67    // TODO: remove when the legacy stack is removed.
68    cleanup_in_progress: RefCell<bool>,
69    // Declared last so we only count deallocation as successful after the above are dropped.
70    _counter: ObjectCounter,
71}
72
73impl NetworkInterface {
74    /// Create a new network interface for the assigned `addr`. The configured `name` will be used
75    /// to construct a filesystem path for the pcap file (if enabled), so take care in choosing a
76    /// filesystem-appropriate static string.
77    pub fn new(
78        name: &str,
79        addr: Ipv4Addr,
80        pcap_options: Option<PcapOptions>,
81        qdisc: QDiscMode,
82    ) -> Self {
83        // Try to set up the pcap writer if configured.
84        let pcap = pcap_options.and_then(|opt| match setup_pcap_writer(name, &opt) {
85            Ok(writer) => Some(writer),
86            Err(e) => {
87                log::warn!("Unable to set up the configured pcap writer for '{name}': {e}");
88                None
89            }
90        });
91
92        log::debug!(
93            "Bringing up network interface '{name}' at '{addr}' using {:?}",
94            qdisc
95        );
96
97        let queue_kind = match qdisc {
98            // A packet fifo is realized using a min-heap over monitonically increasing priority
99            // values, which encodes the sequence in which the packets became ready to be sent.
100            // A socket's priority is that of its next sendable packet. This is equivalent to a
101            // pfifo, and close to the default Linux qdisc.
102            // https://tldp.org/HOWTO/Traffic-Control-HOWTO/classless-qdiscs.html
103            QDiscMode::Fifo => NetworkQueueKind::MinPriority,
104            // We use a round-robin policy to select the next socket, send a packet from that
105            // socket, and repeat. We realize this using a fifo queue of sockets that we repeatedly
106            // push() and pop(). A better name for this qdisc is probably 'StochasticFairQueuing':
107            // https://tldp.org/HOWTO/Traffic-Control-HOWTO/classless-qdiscs.html
108            QDiscMode::RoundRobin => NetworkQueueKind::FirstInFirstOut,
109        };
110
111        Self {
112            addr,
113            send_sockets: RefCell::new(NetworkQueue::new(queue_kind)),
114            recv_sockets: RefCell::new(HashMap::new()),
115            pcap: RefCell::new(pcap),
116            cleanup_in_progress: RefCell::new(false),
117            _counter: ObjectCounter::new("NetworkInterface"),
118        }
119    }
120
121    pub fn associate(
122        &self,
123        socket: &InetSocket,
124        protocol: IanaProtocol,
125        port: u16,
126        peer: SocketAddrV4,
127    ) {
128        let local = SocketAddrV4::new(self.addr, port);
129        let key = AssociatedSocketKey::new(protocol, local, peer);
130        log::trace!("Associating socket key {key:?}");
131
132        if let Entry::Vacant(entry) = self.recv_sockets.borrow_mut().entry(key) {
133            entry.insert(socket.clone());
134        } else {
135            // TODO: Return an error if the association fails.
136            debug_panic!("Entry is unexpectedly occupied");
137        }
138    }
139
140    pub fn disassociate(&self, protocol: IanaProtocol, port: u16, peer: SocketAddrV4) {
141        if *self.cleanup_in_progress.borrow() {
142            return;
143        }
144
145        let local = SocketAddrV4::new(self.addr, port);
146        let key = AssociatedSocketKey::new(protocol, local, peer);
147        log::trace!("Disassociating socket key {key:?}");
148
149        // TODO: Return an error if the disassociation fails. Generally the calling code should only
150        // try to disassociate a socket if it thinks that the socket is actually associated with
151        // this interface, and if it's not, then it's probably an error. But TCP sockets will
152        // disassociate all sockets (including ones that have never been associated) and will try to
153        // disassociate the same socket multiple times, so we can't just add an assert here.
154        if self.recv_sockets.borrow_mut().remove(&key).is_none() {
155            // Since this always occurs with our legacy TCP stack and is not really a bug, we log at
156            // trace instead of warn level for now until the legacy TCP stack is removed.
157            log::trace!("Attempted to disassociate a vacant socket key");
158        }
159    }
160
161    pub fn is_addr_in_use(&self, protocol: IanaProtocol, port: u16, peer: SocketAddrV4) -> bool {
162        let local = SocketAddrV4::new(self.addr, port);
163        let key = AssociatedSocketKey::new(protocol, local, peer);
164        self.recv_sockets.borrow().contains_key(&key)
165    }
166
167    // Add the socket to the list of sockets that have data ready for us to send out to the network.
168    pub fn add_data_source(&self, socket: &InetSocket) {
169        assert!(socket.borrow().has_data_to_send());
170
171        if !self.send_sockets.borrow().contains(socket) {
172            self.send_sockets
173                .borrow_mut()
174                .push(socket.clone(), socket.borrow().peek_next_packet_priority());
175        } else {
176            log::trace!(
177                "We attemped to add a socket as a packet source but it is already in our queue of \
178                sending sockets. Ignoring."
179            );
180        }
181    }
182
183    /// Disassociate all bound sockets and remove sockets from the sending queue. This should be
184    /// called as part of the host's cleanup procedure. We don't think we need this function for
185    /// Rust sockets, but we think we need it for the legacy TCP stack which will not otherwise drop
186    /// due to circular references.
187    pub fn remove_all_sockets(&self) {
188        // The legacy TCP stack also calls disassociate on drop, so we need to prevent recursion.
189        *self.cleanup_in_progress.borrow_mut() = true;
190        self.recv_sockets.borrow_mut().clear();
191        self.send_sockets.borrow_mut().clear();
192        *self.cleanup_in_progress.borrow_mut() = false;
193    }
194
195    fn capture_if_configured(&self, packet: &PacketRc) {
196        // Avoid double mutable borrow of pcap.
197        let mut pcap_borrowed = self.pcap.borrow_mut();
198
199        if let Some(pcap) = pcap_borrowed.as_mut() {
200            let now = Worker::current_time().unwrap().to_abs_simtime();
201
202            let ts_sec: u32 = now.as_secs().try_into().unwrap_or(u32::MAX);
203            let ts_usec: u32 = now.subsec_micros();
204            let packet_len: u32 = packet.len().try_into().unwrap_or(u32::MAX);
205
206            if let Err(e) = pcap.write_packet_fmt(ts_sec, ts_usec, packet_len, |writer| {
207                packet.display_bytes(writer)
208            }) {
209                // There was a non-recoverable error.
210                log::warn!("Unable to write packet to pcap output: {}", e);
211                log::warn!(
212                    "Fatal pcap logging error; stopping pcap logging for interface '{}'.",
213                    self.addr
214                );
215                pcap_borrowed.take();
216            }
217        }
218    }
219}
220
221impl PacketDevice for NetworkInterface {
222    fn get_address(&self) -> Ipv4Addr {
223        self.addr
224    }
225
226    // Pops a packet from the interface to send over the simulated network.
227    fn pop(&self) -> Option<PacketRc> {
228        loop {
229            // Choose the next socket that will send a packet.
230            let Some(socket) = self.send_sockets.borrow_mut().pop() else {
231                log::trace!(
232                    "Interface {} is now idle with no sockets containing sendable packets.",
233                    self.addr
234                );
235                return None;
236            };
237
238            // The socket was in our sendable queue, so it _should_ have a packet.
239            let Some(packet) = CallbackQueue::queue_and_run_with_legacy(|cb_queue| {
240                socket.borrow_mut().pull_out_packet(cb_queue)
241            }) else {
242                // It is possible that the socket changed state since it was added to our queue, so
243                // we tolerate the case that it no longer has a sendable packet.
244                continue;
245            };
246
247            // If socket has more packets, keep tracking it for future sends. Note that it is
248            // possible that the socket was already re-added to the send queue above during the call
249            // to `pull_out_packet()`.
250            if socket.borrow().has_data_to_send() {
251                self.add_data_source(&socket);
252            }
253
254            packet.add_status(PacketStatus::SndInterfaceSent);
255            self.capture_if_configured(&packet);
256
257            return Some(packet);
258        }
259    }
260
261    // Pushes a packet from the simulated network into the interface.
262    fn push(&self, packet: PacketRc) {
263        // The packet is successfully received by this interface.
264        packet.add_status(PacketStatus::RcvInterfaceReceived);
265
266        // Record the packet before we process it, otherwise we may send more packets before we
267        // record this one and the order will be incorrect.
268        self.capture_if_configured(&packet);
269
270        // Find the socket that should process the packet.
271        let protocol = packet.iana_protocol();
272        let local = SocketAddrV4::new(self.addr, packet.dst_ipv4_address().port());
273        let peer = packet.src_ipv4_address();
274        let key = AssociatedSocketKey::new(protocol, local, peer);
275
276        // First check for a socket with the specific association.
277        log::trace!("Looking for socket associated with specific key {key:?}");
278        let maybe_socket = {
279            let associated = self.recv_sockets.borrow();
280            associated
281                .get(&key)
282                .or_else(|| {
283                    // Then fall back to checking for the wildcard association.
284                    let wildcard = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0);
285                    let key = AssociatedSocketKey::new(protocol, local, wildcard);
286                    log::trace!("Looking for socket associated with general key {key:?}");
287                    associated.get(&key)
288                })
289                // Pushing a packet to the socket may cause the socket to be disassociated, so we
290                // can't hold on to the borrow of `recv_sockets` when we call `push_in_packet`. We
291                // need to clone the socket instead so that we can drop the `recv_sockets` borrow.
292                .cloned()
293        };
294
295        if let Some(socket) = maybe_socket {
296            let recv_time = Worker::current_time().unwrap();
297            CallbackQueue::queue_and_run_with_legacy(|cb_queue| {
298                socket
299                    .borrow_mut()
300                    .push_in_packet(packet, cb_queue, recv_time);
301            });
302        } else {
303            packet.add_status(PacketStatus::RcvInterfaceDropped);
304        }
305    }
306}