shadow_rs/host/network/
interface.rs1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::collections::hash_map::Entry;
4use std::fs::File;
5use std::io::BufWriter;
6use std::net::{Ipv4Addr, SocketAddrV4};
7use std::path::PathBuf;
8
9use crate::core::configuration::QDiscMode;
10use crate::core::worker::Worker;
11use crate::host::descriptor::socket::inet::InetSocket;
12use crate::host::network::queuing::{NetworkQueue, NetworkQueueKind};
13use crate::network::PacketDevice;
14use crate::network::packet::{IanaProtocol, PacketRc, PacketStatus};
15use crate::utility::ObjectCounter;
16use crate::utility::callback_queue::CallbackQueue;
17use crate::utility::pcap_writer::{PacketDisplay, PcapWriter};
18
19pub type FifoPacketPriority = u64;
21
22#[derive(Debug, Clone)]
23pub struct PcapOptions {
24 pub path: PathBuf,
25 pub capture_size_bytes: u32,
26}
27
28#[derive(Clone, Debug, Eq, Hash, PartialEq)]
29struct AssociatedSocketKey {
30 protocol: IanaProtocol,
31 local: SocketAddrV4,
32 remote: SocketAddrV4,
33}
34
35impl AssociatedSocketKey {
36 fn new(protocol: IanaProtocol, local: SocketAddrV4, remote: SocketAddrV4) -> Self {
37 Self {
38 protocol,
39 local,
40 remote,
41 }
42 }
43}
44
45fn setup_pcap_writer(
46 name: &str,
47 options: &PcapOptions,
48) -> std::io::Result<PcapWriter<BufWriter<File>>> {
49 let file = File::create(options.path.join(format!("{name}.pcap")))?;
50 PcapWriter::new(BufWriter::new(file), options.capture_size_bytes)
51}
52
53pub struct NetworkInterface {
58 addr: Ipv4Addr,
59 send_sockets: RefCell<NetworkQueue<InetSocket>>,
61 recv_sockets: RefCell<HashMap<AssociatedSocketKey, InetSocket>>,
64 pcap: RefCell<Option<PcapWriter<BufWriter<File>>>>,
66 cleanup_in_progress: RefCell<bool>,
69 _counter: ObjectCounter,
71}
72
73impl NetworkInterface {
74 pub fn new(
78 name: &str,
79 addr: Ipv4Addr,
80 pcap_options: Option<PcapOptions>,
81 qdisc: QDiscMode,
82 ) -> Self {
83 let pcap = pcap_options.and_then(|opt| match setup_pcap_writer(name, &opt) {
85 Ok(writer) => Some(writer),
86 Err(e) => {
87 log::warn!("Unable to set up the configured pcap writer for '{name}': {e}");
88 None
89 }
90 });
91
92 log::debug!("Bringing up network interface '{name}' at '{addr}' using {qdisc:?}");
93
94 let queue_kind = match qdisc {
95 QDiscMode::Fifo => NetworkQueueKind::MinPriority,
101 QDiscMode::RoundRobin => NetworkQueueKind::FirstInFirstOut,
106 };
107
108 Self {
109 addr,
110 send_sockets: RefCell::new(NetworkQueue::new(queue_kind)),
111 recv_sockets: RefCell::new(HashMap::new()),
112 pcap: RefCell::new(pcap),
113 cleanup_in_progress: RefCell::new(false),
114 _counter: ObjectCounter::new("NetworkInterface"),
115 }
116 }
117
118 pub fn associate(
119 &self,
120 socket: &InetSocket,
121 protocol: IanaProtocol,
122 port: u16,
123 peer: SocketAddrV4,
124 ) {
125 let local = SocketAddrV4::new(self.addr, port);
126 let key = AssociatedSocketKey::new(protocol, local, peer);
127 log::trace!("Associating socket key {key:?}");
128
129 if let Entry::Vacant(entry) = self.recv_sockets.borrow_mut().entry(key) {
130 entry.insert(socket.clone());
131 } else {
132 debug_panic!("Entry is unexpectedly occupied");
134 }
135 }
136
137 pub fn disassociate(&self, protocol: IanaProtocol, port: u16, peer: SocketAddrV4) {
138 if *self.cleanup_in_progress.borrow() {
139 return;
140 }
141
142 let local = SocketAddrV4::new(self.addr, port);
143 let key = AssociatedSocketKey::new(protocol, local, peer);
144 log::trace!("Disassociating socket key {key:?}");
145
146 if self.recv_sockets.borrow_mut().remove(&key).is_none() {
152 log::trace!("Attempted to disassociate a vacant socket key");
155 }
156 }
157
158 pub fn is_addr_in_use(&self, protocol: IanaProtocol, port: u16, peer: SocketAddrV4) -> bool {
159 let local = SocketAddrV4::new(self.addr, port);
160 let key = AssociatedSocketKey::new(protocol, local, peer);
161 self.recv_sockets.borrow().contains_key(&key)
162 }
163
164 pub fn add_data_source(&self, socket: &InetSocket) {
166 assert!(socket.borrow().has_data_to_send());
167
168 if !self.send_sockets.borrow().contains(socket) {
169 self.send_sockets
170 .borrow_mut()
171 .push(socket.clone(), socket.borrow().peek_next_packet_priority());
172 } else {
173 log::trace!(
174 "We attemped to add a socket as a packet source but it is already in our queue of \
175 sending sockets. Ignoring."
176 );
177 }
178 }
179
180 pub fn remove_all_sockets(&self) {
185 *self.cleanup_in_progress.borrow_mut() = true;
187 self.recv_sockets.borrow_mut().clear();
188 self.send_sockets.borrow_mut().clear();
189 *self.cleanup_in_progress.borrow_mut() = false;
190 }
191
192 fn capture_if_configured(&self, packet: &PacketRc) {
193 let mut pcap_borrowed = self.pcap.borrow_mut();
195
196 if let Some(pcap) = pcap_borrowed.as_mut() {
197 let now = Worker::current_time().unwrap().to_abs_simtime();
198
199 let ts_sec: u32 = now.as_secs().try_into().unwrap_or(u32::MAX);
200 let ts_usec: u32 = now.subsec_micros();
201 let packet_len: u32 = packet.len().try_into().unwrap_or(u32::MAX);
202
203 if let Err(e) = pcap.write_packet_fmt(ts_sec, ts_usec, packet_len, |writer| {
204 packet.display_bytes(writer)
205 }) {
206 log::warn!("Unable to write packet to pcap output: {e}");
208 log::warn!(
209 "Fatal pcap logging error; stopping pcap logging for interface '{}'.",
210 self.addr
211 );
212 pcap_borrowed.take();
213 }
214 }
215 }
216}
217
218impl PacketDevice for NetworkInterface {
219 fn get_address(&self) -> Ipv4Addr {
220 self.addr
221 }
222
223 fn pop(&self) -> Option<PacketRc> {
225 loop {
226 let Some(socket) = self.send_sockets.borrow_mut().pop() else {
228 log::trace!(
229 "Interface {} is now idle with no sockets containing sendable packets.",
230 self.addr
231 );
232 return None;
233 };
234
235 let Some(packet) = CallbackQueue::queue_and_run_with_legacy(|cb_queue| {
237 socket.borrow_mut().pull_out_packet(cb_queue)
238 }) else {
239 continue;
242 };
243
244 if socket.borrow().has_data_to_send() {
248 self.add_data_source(&socket);
249 }
250
251 packet.add_status(PacketStatus::SndInterfaceSent);
252 self.capture_if_configured(&packet);
253
254 return Some(packet);
255 }
256 }
257
258 fn push(&self, packet: PacketRc) {
260 packet.add_status(PacketStatus::RcvInterfaceReceived);
262
263 self.capture_if_configured(&packet);
266
267 let protocol = packet.iana_protocol();
269 let local = SocketAddrV4::new(self.addr, packet.dst_ipv4_address().port());
270 let peer = packet.src_ipv4_address();
271 let key = AssociatedSocketKey::new(protocol, local, peer);
272
273 log::trace!("Looking for socket associated with specific key {key:?}");
275 let maybe_socket = {
276 let associated = self.recv_sockets.borrow();
277 associated
278 .get(&key)
279 .or_else(|| {
280 let wildcard = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0);
282 let key = AssociatedSocketKey::new(protocol, local, wildcard);
283 log::trace!("Looking for socket associated with general key {key:?}");
284 associated.get(&key)
285 })
286 .cloned()
290 };
291
292 if let Some(socket) = maybe_socket {
293 let recv_time = Worker::current_time().unwrap();
294 CallbackQueue::queue_and_run_with_legacy(|cb_queue| {
295 socket
296 .borrow_mut()
297 .push_in_packet(packet, cb_queue, recv_time);
298 });
299 } else {
300 packet.add_status(PacketStatus::RcvInterfaceDropped);
301 }
302 }
303}