shadow_rs/host/descriptor/socket/
unix.rs

1use std::collections::{LinkedList, VecDeque};
2use std::io::Read;
3use std::ops::DerefMut;
4use std::sync::{Arc, Weak};
5
6use atomic_refcell::AtomicRefCell;
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use linux_api::socket::Shutdown;
10use nix::sys::socket::MsgFlags;
11use shadow_shim_helper_rs::syscall_types::ForeignPtr;
12
13use crate::cshadow as c;
14use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
15use crate::host::descriptor::shared_buf::{
16    BufferHandle, BufferSignals, BufferState, ReaderHandle, SharedBuf, WriterHandle,
17};
18use crate::host::descriptor::socket::abstract_unix_ns::AbstractUnixNamespace;
19use crate::host::descriptor::socket::{RecvmsgArgs, RecvmsgReturn, SendmsgArgs, Socket};
20use crate::host::descriptor::{
21    File, FileMode, FileSignals, FileState, FileStatus, OpenFile, SyscallResult,
22};
23use crate::host::memory_manager::MemoryManager;
24use crate::host::network::namespace::NetworkNamespace;
25use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
26use crate::host::syscall::types::SyscallError;
27use crate::utility::HostTreePointer;
28use crate::utility::callback_queue::CallbackQueue;
29use crate::utility::sockaddr::{SockaddrStorage, SockaddrUnix};
30
31const UNIX_SOCKET_DEFAULT_BUFFER_SIZE: u64 = 212_992;
32
33/// A unix socket. The `UnixSocket` is the public-facing API, which forwards API calls to the inner
34/// state object.
35pub struct UnixSocket {
36    /// Data and functionality that is general for all states.
37    common: UnixSocketCommon,
38    /// State-specific data and functionality.
39    protocol_state: ProtocolState,
40}
41
42impl UnixSocket {
43    pub fn new(
44        status: FileStatus,
45        socket_type: UnixSocketType,
46        namespace: &Arc<AtomicRefCell<AbstractUnixNamespace>>,
47    ) -> Arc<AtomicRefCell<Self>> {
48        Arc::new_cyclic(|weak| {
49            // each socket tracks its own send limit, and we let the receiver have an unlimited recv
50            // buffer size
51            let recv_buffer = SharedBuf::new(usize::MAX);
52            let recv_buffer = Arc::new(AtomicRefCell::new(recv_buffer));
53
54            let mut common = UnixSocketCommon {
55                recv_buffer,
56                send_limit: UNIX_SOCKET_DEFAULT_BUFFER_SIZE,
57                sent_len: 0,
58                event_source: StateEventSource::new(),
59                state: FileState::ACTIVE,
60                status,
61                socket_type,
62                namespace: Arc::clone(namespace),
63                has_open_file: false,
64            };
65
66            // may generate new events
67            let protocol_state = ProtocolState::new(socket_type, &mut common, weak);
68
69            AtomicRefCell::new(Self {
70                common,
71                protocol_state,
72            })
73        })
74    }
75
76    pub fn status(&self) -> FileStatus {
77        self.common.status
78    }
79
80    pub fn set_status(&mut self, status: FileStatus) {
81        self.common.status = status;
82    }
83
84    pub fn mode(&self) -> FileMode {
85        FileMode::READ | FileMode::WRITE
86    }
87
88    pub fn has_open_file(&self) -> bool {
89        self.common.has_open_file
90    }
91
92    pub fn supports_sa_restart(&self) -> bool {
93        self.common.supports_sa_restart()
94    }
95
96    pub fn set_has_open_file(&mut self, val: bool) {
97        self.common.has_open_file = val;
98    }
99
100    pub fn getsockname(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
101        // return the bound address if set, otherwise return an empty unix sockaddr
102        Ok(Some(
103            self.protocol_state
104                .bound_address()?
105                .unwrap_or_else(SockaddrUnix::new_unnamed),
106        ))
107    }
108
109    pub fn getpeername(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
110        // return the peer address if set, otherwise return an empty unix sockaddr
111        Ok(Some(
112            self.protocol_state
113                .peer_address()?
114                .unwrap_or_else(SockaddrUnix::new_unnamed),
115        ))
116    }
117
118    pub fn address_family(&self) -> linux_api::socket::AddressFamily {
119        linux_api::socket::AddressFamily::AF_UNIX
120    }
121
122    fn recv_buffer(&self) -> &Arc<AtomicRefCell<SharedBuf>> {
123        &self.common.recv_buffer
124    }
125
126    fn inform_bytes_read(&mut self, num: u64, cb_queue: &mut CallbackQueue) {
127        self.protocol_state
128            .inform_bytes_read(&mut self.common, num, cb_queue);
129    }
130
131    pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
132        self.protocol_state.close(&mut self.common, cb_queue)
133    }
134
135    fn refresh_file_state(&mut self, signals: FileSignals, cb_queue: &mut CallbackQueue) {
136        self.protocol_state
137            .refresh_file_state(&mut self.common, signals, cb_queue)
138    }
139
140    pub fn bind(
141        socket: &Arc<AtomicRefCell<Self>>,
142        addr: Option<&SockaddrStorage>,
143        _net_ns: &NetworkNamespace,
144        rng: impl rand::Rng,
145    ) -> Result<(), SyscallError> {
146        let socket_ref = &mut *socket.borrow_mut();
147        socket_ref
148            .protocol_state
149            .bind(&mut socket_ref.common, socket, addr, rng)
150    }
151
152    pub fn readv(
153        &mut self,
154        _iovs: &[IoVec],
155        _offset: Option<libc::off_t>,
156        _flags: libc::c_int,
157        _mem: &mut MemoryManager,
158        _cb_queue: &mut CallbackQueue,
159    ) -> Result<libc::ssize_t, SyscallError> {
160        // we could call UnixSocket::recvmsg() here, but for now we expect that there are no code
161        // paths that would call UnixSocket::readv() since the readv() syscall handler should have
162        // called UnixSocket::recvmsg() instead
163        panic!("Called UnixSocket::readv() on a unix socket.");
164    }
165
166    pub fn writev(
167        &mut self,
168        _iovs: &[IoVec],
169        _offset: Option<libc::off_t>,
170        _flags: libc::c_int,
171        _mem: &mut MemoryManager,
172        _cb_queue: &mut CallbackQueue,
173    ) -> Result<libc::ssize_t, SyscallError> {
174        // we could call UnixSocket::sendmsg() here, but for now we expect that there are no code
175        // paths that would call UnixSocket::writev() since the writev() syscall handler should have
176        // called UnixSocket::sendmsg() instead
177        panic!("Called UnixSocket::writev() on a unix socket");
178    }
179
180    pub fn sendmsg(
181        socket: &Arc<AtomicRefCell<Self>>,
182        args: SendmsgArgs,
183        mem: &mut MemoryManager,
184        _net_ns: &NetworkNamespace,
185        _rng: impl rand::Rng,
186        cb_queue: &mut CallbackQueue,
187    ) -> Result<libc::ssize_t, SyscallError> {
188        let socket_ref = &mut *socket.borrow_mut();
189        socket_ref
190            .protocol_state
191            .sendmsg(&mut socket_ref.common, socket, args, mem, cb_queue)
192    }
193
194    pub fn recvmsg(
195        socket: &Arc<AtomicRefCell<Self>>,
196        args: RecvmsgArgs,
197        mem: &mut MemoryManager,
198        cb_queue: &mut CallbackQueue,
199    ) -> Result<RecvmsgReturn, SyscallError> {
200        let socket_ref = &mut *socket.borrow_mut();
201        socket_ref
202            .protocol_state
203            .recvmsg(&mut socket_ref.common, socket, args, mem, cb_queue)
204    }
205
206    pub fn ioctl(
207        &mut self,
208        request: IoctlRequest,
209        arg_ptr: ForeignPtr<()>,
210        memory_manager: &mut MemoryManager,
211    ) -> SyscallResult {
212        self.protocol_state
213            .ioctl(&mut self.common, request, arg_ptr, memory_manager)
214    }
215
216    pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
217        warn_once_then_debug!("We do not yet handle stat calls on unix sockets");
218        Err(Errno::EINVAL.into())
219    }
220
221    pub fn listen(
222        socket: &Arc<AtomicRefCell<Self>>,
223        backlog: i32,
224        _net_ns: &NetworkNamespace,
225        _rng: impl rand::Rng,
226        cb_queue: &mut CallbackQueue,
227    ) -> Result<(), Errno> {
228        let mut socket_ref = socket.borrow_mut();
229        let socket_ref = socket_ref.deref_mut();
230        socket_ref
231            .protocol_state
232            .listen(&mut socket_ref.common, backlog, cb_queue)
233    }
234
235    pub fn connect(
236        socket: &Arc<AtomicRefCell<Self>>,
237        addr: &SockaddrStorage,
238        _net_ns: &NetworkNamespace,
239        _rng: impl rand::Rng,
240        cb_queue: &mut CallbackQueue,
241    ) -> Result<(), SyscallError> {
242        let socket_ref = &mut *socket.borrow_mut();
243        socket_ref
244            .protocol_state
245            .connect(&mut socket_ref.common, socket, addr, cb_queue)
246    }
247
248    pub fn accept(
249        &mut self,
250        _net_ns: &NetworkNamespace,
251        _rng: impl rand::Rng,
252        cb_queue: &mut CallbackQueue,
253    ) -> Result<OpenFile, SyscallError> {
254        self.protocol_state.accept(&mut self.common, cb_queue)
255    }
256
257    pub fn shutdown(
258        &mut self,
259        _how: Shutdown,
260        _cb_queue: &mut CallbackQueue,
261    ) -> Result<(), SyscallError> {
262        log::warn!("shutdown() syscall not yet supported for unix sockets; Returning ENOSYS");
263        Err(Errno::ENOSYS.into())
264    }
265
266    pub fn getsockopt(
267        &mut self,
268        _level: libc::c_int,
269        _optname: libc::c_int,
270        _optval_ptr: ForeignPtr<()>,
271        _optlen: libc::socklen_t,
272        _memory_manager: &mut MemoryManager,
273        _cb_queue: &mut CallbackQueue,
274    ) -> Result<libc::socklen_t, SyscallError> {
275        log::warn!("getsockopt() syscall not yet supported for unix sockets; Returning ENOSYS");
276        Err(Errno::ENOSYS.into())
277    }
278
279    pub fn setsockopt(
280        &mut self,
281        _level: libc::c_int,
282        _optname: libc::c_int,
283        _optval_ptr: ForeignPtr<()>,
284        _optlen: libc::socklen_t,
285        _memory_manager: &MemoryManager,
286    ) -> Result<(), SyscallError> {
287        log::warn!("setsockopt() syscall not yet supported for unix sockets; Returning ENOSYS");
288        Err(Errno::ENOSYS.into())
289    }
290
291    pub fn pair(
292        status: FileStatus,
293        socket_type: UnixSocketType,
294        namespace: &Arc<AtomicRefCell<AbstractUnixNamespace>>,
295        cb_queue: &mut CallbackQueue,
296    ) -> (Arc<AtomicRefCell<Self>>, Arc<AtomicRefCell<Self>>) {
297        let socket_1 = UnixSocket::new(status, socket_type, namespace);
298        let socket_2 = UnixSocket::new(status, socket_type, namespace);
299
300        {
301            let socket_1_ref = &mut *socket_1.borrow_mut();
302            socket_1_ref
303                .protocol_state
304                .connect_unnamed(
305                    &mut socket_1_ref.common,
306                    &socket_1,
307                    Arc::clone(&socket_2),
308                    cb_queue,
309                )
310                .unwrap();
311        }
312
313        {
314            let socket_2_ref = &mut *socket_2.borrow_mut();
315            socket_2_ref
316                .protocol_state
317                .connect_unnamed(
318                    &mut socket_2_ref.common,
319                    &socket_2,
320                    Arc::clone(&socket_1),
321                    cb_queue,
322                )
323                .unwrap();
324        }
325
326        (socket_1, socket_2)
327    }
328
329    pub fn add_listener(
330        &mut self,
331        monitoring_state: FileState,
332        monitoring_signals: FileSignals,
333        filter: StateListenerFilter,
334        notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
335        + Send
336        + Sync
337        + 'static,
338    ) -> StateListenHandle {
339        self.common.event_source.add_listener(
340            monitoring_state,
341            monitoring_signals,
342            filter,
343            notify_fn,
344        )
345    }
346
347    pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<c::StatusListener>) {
348        self.common.event_source.add_legacy_listener(ptr);
349    }
350
351    pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener) {
352        self.common.event_source.remove_legacy_listener(ptr);
353    }
354
355    pub fn state(&self) -> FileState {
356        self.common.state
357    }
358}
359
360struct ConnOrientedInitial {
361    bound_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
362}
363struct ConnOrientedListening {
364    bound_addr: SockaddrUnix<libc::sockaddr_un>,
365    queue: VecDeque<Arc<AtomicRefCell<UnixSocket>>>,
366    queue_limit: u32,
367}
368struct ConnOrientedConnected {
369    bound_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
370    peer_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
371    peer: Arc<AtomicRefCell<UnixSocket>>,
372    reader_handle: ReaderHandle,
373    writer_handle: WriterHandle,
374    // these handles are never accessed, but we store them because of their drop impls
375    _recv_buffer_handle: BufferHandle,
376    _send_buffer_handle: BufferHandle,
377}
378struct ConnOrientedClosed {}
379
380struct ConnLessInitial {
381    this_socket: Weak<AtomicRefCell<UnixSocket>>,
382    bound_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
383    peer_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
384    peer: Option<Arc<AtomicRefCell<UnixSocket>>>,
385    recv_data: LinkedList<ByteData>,
386    reader_handle: ReaderHandle,
387    // this handle is never accessed, but we store it because of its drop impl
388    _recv_buffer_handle: BufferHandle,
389}
390struct ConnLessClosed {}
391
392impl ConnOrientedListening {
393    fn queue_is_full(&self) -> bool {
394        self.queue.len() >= self.queue_limit.try_into().unwrap()
395    }
396}
397
398/// The current protocol state of the unix socket. An `Option` is required for each variant so that
399/// the inner state object can be removed, transformed into a new state, and then re-added as a
400/// different variant.
401enum ProtocolState {
402    ConnOrientedInitial(Option<ConnOrientedInitial>),
403    ConnOrientedListening(Option<ConnOrientedListening>),
404    ConnOrientedConnected(Option<ConnOrientedConnected>),
405    ConnOrientedClosed(Option<ConnOrientedClosed>),
406    ConnLessInitial(Option<ConnLessInitial>),
407    ConnLessClosed(Option<ConnLessClosed>),
408}
409
410/// Upcast from a type to an enum variant.
411macro_rules! state_upcast {
412    ($type:ty, $parent:ident::$variant:ident) => {
413        impl From<$type> for $parent {
414            fn from(x: $type) -> Self {
415                Self::$variant(Some(x))
416            }
417        }
418    };
419}
420
421// implement upcasting for all state types
422state_upcast!(ConnOrientedInitial, ProtocolState::ConnOrientedInitial);
423state_upcast!(ConnOrientedListening, ProtocolState::ConnOrientedListening);
424state_upcast!(ConnOrientedConnected, ProtocolState::ConnOrientedConnected);
425state_upcast!(ConnOrientedClosed, ProtocolState::ConnOrientedClosed);
426state_upcast!(ConnLessInitial, ProtocolState::ConnLessInitial);
427state_upcast!(ConnLessClosed, ProtocolState::ConnLessClosed);
428
429impl ProtocolState {
430    fn new(
431        socket_type: UnixSocketType,
432        common: &mut UnixSocketCommon,
433        socket: &Weak<AtomicRefCell<UnixSocket>>,
434    ) -> Self {
435        match socket_type {
436            UnixSocketType::Stream | UnixSocketType::SeqPacket => {
437                Self::ConnOrientedInitial(Some(ConnOrientedInitial { bound_addr: None }))
438            }
439            UnixSocketType::Dgram => {
440                // this is a new socket and there are no listeners, so safe to use a temporary event queue
441                let mut cb_queue = CallbackQueue::new();
442
443                // dgram unix sockets are immediately able to receive data, so initialize the
444                // receive buffer
445
446                // increment the buffer's reader count
447                let reader_handle = common.recv_buffer.borrow_mut().add_reader(&mut cb_queue);
448
449                let weak = Weak::clone(socket);
450                let recv_buffer_handle = common.recv_buffer.borrow_mut().add_listener(
451                    BufferState::READABLE,
452                    BufferSignals::BUFFER_GREW,
453                    move |_, signals, cb_queue| {
454                        if let Some(socket) = weak.upgrade() {
455                            let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
456                                FileSignals::READ_BUFFER_GREW
457                            } else {
458                                FileSignals::empty()
459                            };
460
461                            socket.borrow_mut().refresh_file_state(signals, cb_queue);
462                        }
463                    },
464                );
465
466                // make sure no events were generated since if there were events to run, they would
467                // probably not run correctly if the socket's Arc is not fully created yet (as in
468                // the case of `Arc::new_cyclic`)
469                assert!(cb_queue.is_empty());
470
471                Self::ConnLessInitial(Some(ConnLessInitial {
472                    this_socket: Weak::clone(socket),
473                    bound_addr: None,
474                    peer_addr: None,
475                    peer: None,
476                    recv_data: LinkedList::new(),
477                    reader_handle,
478                    _recv_buffer_handle: recv_buffer_handle,
479                }))
480            }
481        }
482    }
483
484    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
485        match self {
486            Self::ConnOrientedInitial(x) => x.as_ref().unwrap().peer_address(),
487            Self::ConnOrientedListening(x) => x.as_ref().unwrap().peer_address(),
488            Self::ConnOrientedConnected(x) => x.as_ref().unwrap().peer_address(),
489            Self::ConnOrientedClosed(x) => x.as_ref().unwrap().peer_address(),
490            Self::ConnLessInitial(x) => x.as_ref().unwrap().peer_address(),
491            Self::ConnLessClosed(x) => x.as_ref().unwrap().peer_address(),
492        }
493    }
494
495    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
496        match self {
497            Self::ConnOrientedInitial(x) => x.as_ref().unwrap().bound_address(),
498            Self::ConnOrientedListening(x) => x.as_ref().unwrap().bound_address(),
499            Self::ConnOrientedConnected(x) => x.as_ref().unwrap().bound_address(),
500            Self::ConnOrientedClosed(x) => x.as_ref().unwrap().bound_address(),
501            Self::ConnLessInitial(x) => x.as_ref().unwrap().bound_address(),
502            Self::ConnLessClosed(x) => x.as_ref().unwrap().bound_address(),
503        }
504    }
505
506    fn refresh_file_state(
507        &self,
508        common: &mut UnixSocketCommon,
509        signals: FileSignals,
510        cb_queue: &mut CallbackQueue,
511    ) {
512        match self {
513            Self::ConnOrientedInitial(x) => x
514                .as_ref()
515                .unwrap()
516                .refresh_file_state(common, signals, cb_queue),
517            Self::ConnOrientedListening(x) => x
518                .as_ref()
519                .unwrap()
520                .refresh_file_state(common, signals, cb_queue),
521            Self::ConnOrientedConnected(x) => x
522                .as_ref()
523                .unwrap()
524                .refresh_file_state(common, signals, cb_queue),
525            Self::ConnOrientedClosed(x) => x
526                .as_ref()
527                .unwrap()
528                .refresh_file_state(common, signals, cb_queue),
529            Self::ConnLessInitial(x) => x
530                .as_ref()
531                .unwrap()
532                .refresh_file_state(common, signals, cb_queue),
533            Self::ConnLessClosed(x) => x
534                .as_ref()
535                .unwrap()
536                .refresh_file_state(common, signals, cb_queue),
537        }
538    }
539
540    fn close(
541        &mut self,
542        common: &mut UnixSocketCommon,
543        cb_queue: &mut CallbackQueue,
544    ) -> Result<(), SyscallError> {
545        let (new_state, rv) = match self {
546            Self::ConnOrientedInitial(x) => x.take().unwrap().close(common, cb_queue),
547            Self::ConnOrientedListening(x) => x.take().unwrap().close(common, cb_queue),
548            Self::ConnOrientedConnected(x) => x.take().unwrap().close(common, cb_queue),
549            Self::ConnOrientedClosed(x) => x.take().unwrap().close(common, cb_queue),
550            Self::ConnLessInitial(x) => x.take().unwrap().close(common, cb_queue),
551            Self::ConnLessClosed(x) => x.take().unwrap().close(common, cb_queue),
552        };
553
554        *self = new_state;
555        rv
556    }
557
558    fn bind(
559        &mut self,
560        common: &mut UnixSocketCommon,
561        socket: &Arc<AtomicRefCell<UnixSocket>>,
562        addr: Option<&SockaddrStorage>,
563        rng: impl rand::Rng,
564    ) -> Result<(), SyscallError> {
565        match self {
566            Self::ConnOrientedInitial(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
567            Self::ConnOrientedListening(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
568            Self::ConnOrientedConnected(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
569            Self::ConnOrientedClosed(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
570            Self::ConnLessInitial(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
571            Self::ConnLessClosed(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
572        }
573    }
574
575    fn sendmsg(
576        &mut self,
577        common: &mut UnixSocketCommon,
578        socket: &Arc<AtomicRefCell<UnixSocket>>,
579        args: SendmsgArgs,
580        mem: &mut MemoryManager,
581        cb_queue: &mut CallbackQueue,
582    ) -> Result<libc::ssize_t, SyscallError> {
583        match self {
584            Self::ConnOrientedInitial(x) => x
585                .as_mut()
586                .unwrap()
587                .sendmsg(common, socket, args, mem, cb_queue),
588            Self::ConnOrientedListening(x) => x
589                .as_mut()
590                .unwrap()
591                .sendmsg(common, socket, args, mem, cb_queue),
592            Self::ConnOrientedConnected(x) => x
593                .as_mut()
594                .unwrap()
595                .sendmsg(common, socket, args, mem, cb_queue),
596            Self::ConnOrientedClosed(x) => x
597                .as_mut()
598                .unwrap()
599                .sendmsg(common, socket, args, mem, cb_queue),
600            Self::ConnLessInitial(x) => x
601                .as_mut()
602                .unwrap()
603                .sendmsg(common, socket, args, mem, cb_queue),
604            Self::ConnLessClosed(x) => x
605                .as_mut()
606                .unwrap()
607                .sendmsg(common, socket, args, mem, cb_queue),
608        }
609    }
610
611    fn recvmsg(
612        &mut self,
613        common: &mut UnixSocketCommon,
614        socket: &Arc<AtomicRefCell<UnixSocket>>,
615        args: RecvmsgArgs,
616        mem: &mut MemoryManager,
617        cb_queue: &mut CallbackQueue,
618    ) -> Result<RecvmsgReturn, SyscallError> {
619        match self {
620            Self::ConnOrientedInitial(x) => x
621                .as_mut()
622                .unwrap()
623                .recvmsg(common, socket, args, mem, cb_queue),
624            Self::ConnOrientedListening(x) => x
625                .as_mut()
626                .unwrap()
627                .recvmsg(common, socket, args, mem, cb_queue),
628            Self::ConnOrientedConnected(x) => x
629                .as_mut()
630                .unwrap()
631                .recvmsg(common, socket, args, mem, cb_queue),
632            Self::ConnOrientedClosed(x) => x
633                .as_mut()
634                .unwrap()
635                .recvmsg(common, socket, args, mem, cb_queue),
636            Self::ConnLessInitial(x) => x
637                .as_mut()
638                .unwrap()
639                .recvmsg(common, socket, args, mem, cb_queue),
640            Self::ConnLessClosed(x) => x
641                .as_mut()
642                .unwrap()
643                .recvmsg(common, socket, args, mem, cb_queue),
644        }
645    }
646
647    fn inform_bytes_read(
648        &mut self,
649        common: &mut UnixSocketCommon,
650        num: u64,
651        cb_queue: &mut CallbackQueue,
652    ) {
653        match self {
654            Self::ConnOrientedInitial(x) => {
655                x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
656            }
657            Self::ConnOrientedListening(x) => {
658                x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
659            }
660            Self::ConnOrientedConnected(x) => {
661                x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
662            }
663            Self::ConnOrientedClosed(x) => {
664                x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
665            }
666            Self::ConnLessInitial(x) => {
667                x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
668            }
669            Self::ConnLessClosed(x) => x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue),
670        }
671    }
672
673    fn ioctl(
674        &mut self,
675        common: &mut UnixSocketCommon,
676        request: IoctlRequest,
677        arg_ptr: ForeignPtr<()>,
678        memory_manager: &mut MemoryManager,
679    ) -> SyscallResult {
680        match self {
681            Self::ConnOrientedInitial(x) => {
682                x.as_mut()
683                    .unwrap()
684                    .ioctl(common, request, arg_ptr, memory_manager)
685            }
686            Self::ConnOrientedListening(x) => {
687                x.as_mut()
688                    .unwrap()
689                    .ioctl(common, request, arg_ptr, memory_manager)
690            }
691            Self::ConnOrientedConnected(x) => {
692                x.as_mut()
693                    .unwrap()
694                    .ioctl(common, request, arg_ptr, memory_manager)
695            }
696            Self::ConnOrientedClosed(x) => {
697                x.as_mut()
698                    .unwrap()
699                    .ioctl(common, request, arg_ptr, memory_manager)
700            }
701            Self::ConnLessInitial(x) => {
702                x.as_mut()
703                    .unwrap()
704                    .ioctl(common, request, arg_ptr, memory_manager)
705            }
706            Self::ConnLessClosed(x) => {
707                x.as_mut()
708                    .unwrap()
709                    .ioctl(common, request, arg_ptr, memory_manager)
710            }
711        }
712    }
713
714    fn listen(
715        &mut self,
716        common: &mut UnixSocketCommon,
717        backlog: i32,
718        cb_queue: &mut CallbackQueue,
719    ) -> Result<(), Errno> {
720        let (new_state, rv) = match self {
721            Self::ConnOrientedInitial(x) => x.take().unwrap().listen(common, backlog, cb_queue),
722            Self::ConnOrientedListening(x) => x.take().unwrap().listen(common, backlog, cb_queue),
723            Self::ConnOrientedConnected(x) => x.take().unwrap().listen(common, backlog, cb_queue),
724            Self::ConnOrientedClosed(x) => x.take().unwrap().listen(common, backlog, cb_queue),
725            Self::ConnLessInitial(x) => x.take().unwrap().listen(common, backlog, cb_queue),
726            Self::ConnLessClosed(x) => x.take().unwrap().listen(common, backlog, cb_queue),
727        };
728
729        *self = new_state;
730        rv
731    }
732
733    fn connect(
734        &mut self,
735        common: &mut UnixSocketCommon,
736        socket: &Arc<AtomicRefCell<UnixSocket>>,
737        addr: &SockaddrStorage,
738        cb_queue: &mut CallbackQueue,
739    ) -> Result<(), SyscallError> {
740        let (new_state, rv) = match self {
741            Self::ConnOrientedInitial(x) => {
742                x.take().unwrap().connect(common, socket, addr, cb_queue)
743            }
744            Self::ConnOrientedListening(x) => {
745                x.take().unwrap().connect(common, socket, addr, cb_queue)
746            }
747            Self::ConnOrientedConnected(x) => {
748                x.take().unwrap().connect(common, socket, addr, cb_queue)
749            }
750            Self::ConnOrientedClosed(x) => {
751                x.take().unwrap().connect(common, socket, addr, cb_queue)
752            }
753            Self::ConnLessInitial(x) => x.take().unwrap().connect(common, socket, addr, cb_queue),
754            Self::ConnLessClosed(x) => x.take().unwrap().connect(common, socket, addr, cb_queue),
755        };
756
757        *self = new_state;
758        rv
759    }
760
761    fn connect_unnamed(
762        &mut self,
763        common: &mut UnixSocketCommon,
764        socket: &Arc<AtomicRefCell<UnixSocket>>,
765        peer: Arc<AtomicRefCell<UnixSocket>>,
766        cb_queue: &mut CallbackQueue,
767    ) -> Result<(), SyscallError> {
768        let (new_state, rv) = match self {
769            Self::ConnOrientedInitial(x) => x
770                .take()
771                .unwrap()
772                .connect_unnamed(common, socket, peer, cb_queue),
773            Self::ConnOrientedListening(x) => x
774                .take()
775                .unwrap()
776                .connect_unnamed(common, socket, peer, cb_queue),
777            Self::ConnOrientedConnected(x) => x
778                .take()
779                .unwrap()
780                .connect_unnamed(common, socket, peer, cb_queue),
781            Self::ConnOrientedClosed(x) => x
782                .take()
783                .unwrap()
784                .connect_unnamed(common, socket, peer, cb_queue),
785            Self::ConnLessInitial(x) => x
786                .take()
787                .unwrap()
788                .connect_unnamed(common, socket, peer, cb_queue),
789            Self::ConnLessClosed(x) => x
790                .take()
791                .unwrap()
792                .connect_unnamed(common, socket, peer, cb_queue),
793        };
794
795        *self = new_state;
796        rv
797    }
798
799    fn accept(
800        &mut self,
801        common: &mut UnixSocketCommon,
802        cb_queue: &mut CallbackQueue,
803    ) -> Result<OpenFile, SyscallError> {
804        match self {
805            Self::ConnOrientedInitial(x) => x.as_mut().unwrap().accept(common, cb_queue),
806            Self::ConnOrientedListening(x) => x.as_mut().unwrap().accept(common, cb_queue),
807            Self::ConnOrientedConnected(x) => x.as_mut().unwrap().accept(common, cb_queue),
808            Self::ConnOrientedClosed(x) => x.as_mut().unwrap().accept(common, cb_queue),
809            Self::ConnLessInitial(x) => x.as_mut().unwrap().accept(common, cb_queue),
810            Self::ConnLessClosed(x) => x.as_mut().unwrap().accept(common, cb_queue),
811        }
812    }
813
814    /// Called on the listening socket when there is an incoming connection.
815    fn queue_incoming_conn(
816        &mut self,
817        common: &mut UnixSocketCommon,
818        from_address: Option<SockaddrUnix<libc::sockaddr_un>>,
819        peer: &Arc<AtomicRefCell<UnixSocket>>,
820        child_send_buffer: &Arc<AtomicRefCell<SharedBuf>>,
821        cb_queue: &mut CallbackQueue,
822    ) -> Result<&Arc<AtomicRefCell<UnixSocket>>, IncomingConnError> {
823        match self {
824            Self::ConnOrientedInitial(x) => x.as_mut().unwrap().queue_incoming_conn(
825                common,
826                from_address,
827                peer,
828                child_send_buffer,
829                cb_queue,
830            ),
831            Self::ConnOrientedListening(x) => x.as_mut().unwrap().queue_incoming_conn(
832                common,
833                from_address,
834                peer,
835                child_send_buffer,
836                cb_queue,
837            ),
838            Self::ConnOrientedConnected(x) => x.as_mut().unwrap().queue_incoming_conn(
839                common,
840                from_address,
841                peer,
842                child_send_buffer,
843                cb_queue,
844            ),
845            Self::ConnOrientedClosed(x) => x.as_mut().unwrap().queue_incoming_conn(
846                common,
847                from_address,
848                peer,
849                child_send_buffer,
850                cb_queue,
851            ),
852            Self::ConnLessInitial(x) => x.as_mut().unwrap().queue_incoming_conn(
853                common,
854                from_address,
855                peer,
856                child_send_buffer,
857                cb_queue,
858            ),
859            Self::ConnLessClosed(x) => x.as_mut().unwrap().queue_incoming_conn(
860                common,
861                from_address,
862                peer,
863                child_send_buffer,
864                cb_queue,
865            ),
866        }
867    }
868}
869
870/// Methods that a protocol state may wish to handle. Default implementations which return an error
871/// status are provided for many methods. Each type that implements this trait can override any of
872/// these default implementations.
873trait Protocol
874where
875    Self: Sized + Into<ProtocolState>,
876{
877    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno>;
878    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno>;
879    fn refresh_file_state(
880        &self,
881        common: &mut UnixSocketCommon,
882        signals: FileSignals,
883        cb_queue: &mut CallbackQueue,
884    );
885
886    fn close(
887        self,
888        _common: &mut UnixSocketCommon,
889        _cb_queue: &mut CallbackQueue,
890    ) -> (ProtocolState, Result<(), SyscallError>) {
891        log::warn!("close() while in state {}", std::any::type_name::<Self>());
892        (self.into(), Err(Errno::EOPNOTSUPP.into()))
893    }
894
895    fn bind(
896        &mut self,
897        _common: &mut UnixSocketCommon,
898        _socket: &Arc<AtomicRefCell<UnixSocket>>,
899        _addr: Option<&SockaddrStorage>,
900        _rng: impl rand::Rng,
901    ) -> Result<(), SyscallError> {
902        log::warn!("bind() while in state {}", std::any::type_name::<Self>());
903        Err(Errno::EOPNOTSUPP.into())
904    }
905
906    fn sendmsg(
907        &mut self,
908        _common: &mut UnixSocketCommon,
909        _socket: &Arc<AtomicRefCell<UnixSocket>>,
910        _args: SendmsgArgs,
911        _mem: &mut MemoryManager,
912        _cb_queue: &mut CallbackQueue,
913    ) -> Result<libc::ssize_t, SyscallError> {
914        log::warn!("sendmsg() while in state {}", std::any::type_name::<Self>());
915        Err(Errno::EOPNOTSUPP.into())
916    }
917
918    fn recvmsg(
919        &mut self,
920        _common: &mut UnixSocketCommon,
921        _socket: &Arc<AtomicRefCell<UnixSocket>>,
922        _args: RecvmsgArgs,
923        _mem: &mut MemoryManager,
924        _cb_queue: &mut CallbackQueue,
925    ) -> Result<RecvmsgReturn, SyscallError> {
926        log::warn!("recvmsg() while in state {}", std::any::type_name::<Self>());
927        Err(Errno::EOPNOTSUPP.into())
928    }
929
930    fn inform_bytes_read(
931        &mut self,
932        _common: &mut UnixSocketCommon,
933        _num: u64,
934        _cb_queue: &mut CallbackQueue,
935    ) {
936        panic!(
937            "inform_bytes_read() while in state {}",
938            std::any::type_name::<Self>()
939        );
940    }
941
942    fn ioctl(
943        &mut self,
944        _common: &mut UnixSocketCommon,
945        _request: IoctlRequest,
946        _arg_ptr: ForeignPtr<()>,
947        _memory_manager: &mut MemoryManager,
948    ) -> SyscallResult {
949        log::warn!("ioctl() while in state {}", std::any::type_name::<Self>());
950        Err(Errno::EOPNOTSUPP.into())
951    }
952
953    fn listen(
954        self,
955        _common: &mut UnixSocketCommon,
956        _backlog: i32,
957        _cb_queue: &mut CallbackQueue,
958    ) -> (ProtocolState, Result<(), Errno>) {
959        log::warn!("listen() while in state {}", std::any::type_name::<Self>());
960        (self.into(), Err(Errno::EOPNOTSUPP))
961    }
962
963    fn connect(
964        self,
965        _common: &mut UnixSocketCommon,
966        _socket: &Arc<AtomicRefCell<UnixSocket>>,
967        _addr: &SockaddrStorage,
968        _cb_queue: &mut CallbackQueue,
969    ) -> (ProtocolState, Result<(), SyscallError>) {
970        log::warn!("connect() while in state {}", std::any::type_name::<Self>());
971        (self.into(), Err(Errno::EOPNOTSUPP.into()))
972    }
973
974    fn connect_unnamed(
975        self,
976        _common: &mut UnixSocketCommon,
977        _socket: &Arc<AtomicRefCell<UnixSocket>>,
978        _peer: Arc<AtomicRefCell<UnixSocket>>,
979        _cb_queue: &mut CallbackQueue,
980    ) -> (ProtocolState, Result<(), SyscallError>) {
981        log::warn!(
982            "connect_unnamed() while in state {}",
983            std::any::type_name::<Self>()
984        );
985        (self.into(), Err(Errno::EOPNOTSUPP.into()))
986    }
987
988    fn accept(
989        &mut self,
990        _common: &mut UnixSocketCommon,
991        _cb_queue: &mut CallbackQueue,
992    ) -> Result<OpenFile, SyscallError> {
993        log::warn!("accept() while in state {}", std::any::type_name::<Self>());
994        Err(Errno::EOPNOTSUPP.into())
995    }
996
997    fn queue_incoming_conn(
998        &mut self,
999        _common: &mut UnixSocketCommon,
1000        _from_address: Option<SockaddrUnix<libc::sockaddr_un>>,
1001        _peer: &Arc<AtomicRefCell<UnixSocket>>,
1002        _child_send_buffer: &Arc<AtomicRefCell<SharedBuf>>,
1003        _cb_queue: &mut CallbackQueue,
1004    ) -> Result<&Arc<AtomicRefCell<UnixSocket>>, IncomingConnError> {
1005        log::warn!(
1006            "queue_incoming_conn() while in state {}",
1007            std::any::type_name::<Self>()
1008        );
1009        Err(IncomingConnError::NotSupported)
1010    }
1011}
1012
1013impl Protocol for ConnOrientedInitial {
1014    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1015        Err(Errno::ENOTCONN)
1016    }
1017
1018    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1019        Ok(self.bound_addr)
1020    }
1021
1022    fn refresh_file_state(
1023        &self,
1024        common: &mut UnixSocketCommon,
1025        signals: FileSignals,
1026        cb_queue: &mut CallbackQueue,
1027    ) {
1028        assert!(!signals.contains(FileSignals::READ_BUFFER_GREW));
1029        common.update_state(
1030            /* mask= */ FileState::all(),
1031            FileState::ACTIVE,
1032            signals,
1033            cb_queue,
1034        );
1035    }
1036
1037    fn close(
1038        self,
1039        common: &mut UnixSocketCommon,
1040        cb_queue: &mut CallbackQueue,
1041    ) -> (ProtocolState, Result<(), SyscallError>) {
1042        let new_state = ConnOrientedClosed {};
1043        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1044        (new_state.into(), common.close(cb_queue))
1045    }
1046
1047    fn bind(
1048        &mut self,
1049        common: &mut UnixSocketCommon,
1050        socket: &Arc<AtomicRefCell<UnixSocket>>,
1051        addr: Option<&SockaddrStorage>,
1052        rng: impl rand::Rng,
1053    ) -> Result<(), SyscallError> {
1054        // if already bound
1055        if self.bound_addr.is_some() {
1056            return Err(Errno::EINVAL.into());
1057        }
1058
1059        self.bound_addr = Some(common.bind(socket, addr, rng)?);
1060        Ok(())
1061    }
1062
1063    fn sendmsg(
1064        &mut self,
1065        common: &mut UnixSocketCommon,
1066        _socket: &Arc<AtomicRefCell<UnixSocket>>,
1067        args: SendmsgArgs,
1068        _mem: &mut MemoryManager,
1069        _cb_queue: &mut CallbackQueue,
1070    ) -> Result<libc::ssize_t, SyscallError> {
1071        match (common.socket_type, args.addr) {
1072            (UnixSocketType::Stream, Some(_)) => Err(Errno::EOPNOTSUPP.into()),
1073            (UnixSocketType::Stream, None) => Err(Errno::ENOTCONN.into()),
1074            (UnixSocketType::SeqPacket, _) => Err(Errno::ENOTCONN.into()),
1075            (UnixSocketType::Dgram, _) => panic!(
1076                "A dgram unix socket is in the connection-oriented {:?} state",
1077                std::any::type_name::<Self>()
1078            ),
1079        }
1080    }
1081
1082    fn recvmsg(
1083        &mut self,
1084        common: &mut UnixSocketCommon,
1085        _socket: &Arc<AtomicRefCell<UnixSocket>>,
1086        _args: RecvmsgArgs,
1087        _mem: &mut MemoryManager,
1088        _cb_queue: &mut CallbackQueue,
1089    ) -> Result<RecvmsgReturn, SyscallError> {
1090        match common.socket_type {
1091            UnixSocketType::Stream => Err(Errno::EINVAL.into()),
1092            UnixSocketType::SeqPacket => Err(Errno::ENOTCONN.into()),
1093            UnixSocketType::Dgram => panic!(
1094                "A dgram unix socket is in the connection-oriented {:?} state",
1095                std::any::type_name::<Self>()
1096            ),
1097        }
1098    }
1099
1100    fn ioctl(
1101        &mut self,
1102        common: &mut UnixSocketCommon,
1103        request: IoctlRequest,
1104        arg_ptr: ForeignPtr<()>,
1105        memory_manager: &mut MemoryManager,
1106    ) -> SyscallResult {
1107        common.ioctl(request, arg_ptr, memory_manager)
1108    }
1109
1110    fn listen(
1111        self,
1112        common: &mut UnixSocketCommon,
1113        backlog: i32,
1114        cb_queue: &mut CallbackQueue,
1115    ) -> (ProtocolState, Result<(), Errno>) {
1116        // it must have already been bound
1117        let bound_addr = match self.bound_addr {
1118            Some(x) => x,
1119            None => return (self.into(), Err(Errno::EINVAL)),
1120        };
1121
1122        let new_state = ConnOrientedListening {
1123            bound_addr,
1124            queue: VecDeque::new(),
1125            queue_limit: backlog_to_queue_size(backlog),
1126        };
1127
1128        // refresh the socket's file state
1129        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1130
1131        (new_state.into(), Ok(()))
1132    }
1133
1134    fn connect(
1135        self,
1136        common: &mut UnixSocketCommon,
1137        socket: &Arc<AtomicRefCell<UnixSocket>>,
1138        addr: &SockaddrStorage,
1139        cb_queue: &mut CallbackQueue,
1140    ) -> (ProtocolState, Result<(), SyscallError>) {
1141        let Some(addr) = addr.as_unix() else {
1142            return (self.into(), Err(Errno::EINVAL.into()));
1143        };
1144
1145        // look up the server socket
1146        let server = match lookup_address(
1147            &common.namespace.borrow(),
1148            common.socket_type,
1149            &addr.as_ref(),
1150        ) {
1151            Ok(x) => x,
1152            Err(e) => return (self.into(), Err(e.into())),
1153        };
1154
1155        // need to tell the server to queue a new child socket, and then link the current socket
1156        // with the new child socket
1157
1158        // inform the server socket of the incoming connection and get the server socket's new child
1159        // socket
1160        let server_mut = &mut *server.borrow_mut();
1161        let peer = match server_mut.protocol_state.queue_incoming_conn(
1162            &mut server_mut.common,
1163            self.bound_addr,
1164            socket,
1165            &common.recv_buffer,
1166            cb_queue,
1167        ) {
1168            Ok(peer) => peer,
1169            Err(IncomingConnError::NotSupported) => {
1170                return (self.into(), Err(Errno::ECONNREFUSED.into()));
1171            }
1172            Err(IncomingConnError::QueueFull) => {
1173                if common.status.contains(FileStatus::NONBLOCK) {
1174                    return (self.into(), Err(Errno::EWOULDBLOCK.into()));
1175                }
1176
1177                // block until the server has room for new connections, or is closed
1178                let err = SyscallError::new_blocked_on_file(
1179                    File::Socket(Socket::Unix(Arc::clone(&server))),
1180                    FileState::SOCKET_ALLOWING_CONNECT | FileState::CLOSED,
1181                    server_mut.supports_sa_restart(),
1182                );
1183
1184                return (self.into(), Err(err));
1185            }
1186        };
1187
1188        // our send buffer will be the peer's receive buffer
1189        let send_buffer = Arc::clone(peer.borrow().recv_buffer());
1190
1191        let weak = Arc::downgrade(socket);
1192        let send_buffer_handle = send_buffer.borrow_mut().add_listener(
1193            BufferState::WRITABLE | BufferState::NO_READERS,
1194            BufferSignals::empty(),
1195            move |_, _, cb_queue| {
1196                if let Some(socket) = weak.upgrade() {
1197                    socket
1198                        .borrow_mut()
1199                        .refresh_file_state(FileSignals::empty(), cb_queue);
1200                }
1201            },
1202        );
1203
1204        // increment the buffer's writer count
1205        let writer_handle = send_buffer.borrow_mut().add_writer(cb_queue);
1206
1207        let weak = Arc::downgrade(socket);
1208        let recv_buffer_handle = common.recv_buffer.borrow_mut().add_listener(
1209            BufferState::READABLE | BufferState::NO_WRITERS,
1210            BufferSignals::BUFFER_GREW,
1211            move |_, signals, cb_queue| {
1212                if let Some(socket) = weak.upgrade() {
1213                    let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
1214                        FileSignals::READ_BUFFER_GREW
1215                    } else {
1216                        FileSignals::empty()
1217                    };
1218                    socket.borrow_mut().refresh_file_state(signals, cb_queue);
1219                }
1220            },
1221        );
1222
1223        // increment the buffer's reader count
1224        let reader_handle = common.recv_buffer.borrow_mut().add_reader(cb_queue);
1225
1226        let new_state = ConnOrientedConnected {
1227            bound_addr: self.bound_addr,
1228            peer_addr: Some(addr.into_owned()),
1229            peer: Arc::clone(peer),
1230            reader_handle,
1231            writer_handle,
1232            _recv_buffer_handle: recv_buffer_handle,
1233            _send_buffer_handle: send_buffer_handle,
1234        };
1235
1236        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1237
1238        (new_state.into(), Ok(()))
1239    }
1240
1241    fn connect_unnamed(
1242        self,
1243        common: &mut UnixSocketCommon,
1244        socket: &Arc<AtomicRefCell<UnixSocket>>,
1245        peer: Arc<AtomicRefCell<UnixSocket>>,
1246        cb_queue: &mut CallbackQueue,
1247    ) -> (ProtocolState, Result<(), SyscallError>) {
1248        assert!(self.bound_addr.is_none());
1249
1250        let send_buffer_handle;
1251        let writer_handle;
1252
1253        {
1254            let peer_ref = peer.borrow();
1255            let send_buffer = peer_ref.recv_buffer();
1256
1257            let weak = Arc::downgrade(socket);
1258            send_buffer_handle = send_buffer.borrow_mut().add_listener(
1259                BufferState::WRITABLE | BufferState::NO_READERS,
1260                BufferSignals::empty(),
1261                move |_, _, cb_queue| {
1262                    if let Some(socket) = weak.upgrade() {
1263                        socket
1264                            .borrow_mut()
1265                            .refresh_file_state(FileSignals::empty(), cb_queue);
1266                    }
1267                },
1268            );
1269
1270            // increment the buffer's writer count
1271            writer_handle = send_buffer.borrow_mut().add_writer(cb_queue);
1272        }
1273
1274        let weak = Arc::downgrade(socket);
1275        let recv_buffer_handle = common.recv_buffer.borrow_mut().add_listener(
1276            BufferState::READABLE | BufferState::NO_WRITERS,
1277            BufferSignals::BUFFER_GREW,
1278            move |_, signals, cb_queue| {
1279                if let Some(socket) = weak.upgrade() {
1280                    let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
1281                        FileSignals::READ_BUFFER_GREW
1282                    } else {
1283                        FileSignals::empty()
1284                    };
1285                    socket.borrow_mut().refresh_file_state(signals, cb_queue);
1286                }
1287            },
1288        );
1289
1290        // increment the buffer's reader count
1291        let reader_handle = common.recv_buffer.borrow_mut().add_reader(cb_queue);
1292
1293        let new_state = ConnOrientedConnected {
1294            bound_addr: None,
1295            peer_addr: None,
1296            peer,
1297            reader_handle,
1298            writer_handle,
1299            _recv_buffer_handle: recv_buffer_handle,
1300            _send_buffer_handle: send_buffer_handle,
1301        };
1302
1303        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1304
1305        (new_state.into(), Ok(()))
1306    }
1307
1308    fn accept(
1309        &mut self,
1310        _common: &mut UnixSocketCommon,
1311        _cb_queue: &mut CallbackQueue,
1312    ) -> Result<OpenFile, SyscallError> {
1313        log::warn!("accept() while in state {}", std::any::type_name::<Self>());
1314        Err(Errno::EINVAL.into())
1315    }
1316}
1317
1318impl Protocol for ConnOrientedListening {
1319    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1320        Err(Errno::ENOTCONN)
1321    }
1322
1323    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1324        Ok(Some(self.bound_addr))
1325    }
1326
1327    fn refresh_file_state(
1328        &self,
1329        common: &mut UnixSocketCommon,
1330        signals: FileSignals,
1331        cb_queue: &mut CallbackQueue,
1332    ) {
1333        let mut new_state = FileState::ACTIVE;
1334
1335        // socket is readable if the queue is not empty
1336        new_state.set(FileState::READABLE, !self.queue.is_empty());
1337
1338        // socket allows connections if the queue is not full
1339        new_state.set(FileState::SOCKET_ALLOWING_CONNECT, !self.queue_is_full());
1340
1341        // Note: This can cause a thundering-herd condition where multiple blocked connect() calls
1342        // are all notified at the same time, even if there isn't enough space to allow all of them.
1343        // In practice this should be uncommon so we don't worry about it, and avoids requiring that
1344        // the server keep a list of all connecting clients.
1345
1346        common.update_state(
1347            /* mask= */ FileState::all(),
1348            new_state,
1349            signals,
1350            cb_queue,
1351        );
1352    }
1353
1354    fn close(
1355        self,
1356        common: &mut UnixSocketCommon,
1357        cb_queue: &mut CallbackQueue,
1358    ) -> (ProtocolState, Result<(), SyscallError>) {
1359        for sock in self.queue {
1360            // close all queued sockets
1361            if let Err(e) = sock.borrow_mut().close(cb_queue) {
1362                log::warn!("Unexpected error while closing queued unix socket: {:?}", e);
1363            }
1364        }
1365
1366        let new_state = ConnOrientedClosed {};
1367        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1368        (new_state.into(), common.close(cb_queue))
1369    }
1370
1371    fn listen(
1372        mut self,
1373        common: &mut UnixSocketCommon,
1374        backlog: i32,
1375        cb_queue: &mut CallbackQueue,
1376    ) -> (ProtocolState, Result<(), Errno>) {
1377        self.queue_limit = backlog_to_queue_size(backlog);
1378
1379        // refresh the socket's file state
1380        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1381
1382        (self.into(), Ok(()))
1383    }
1384
1385    fn connect(
1386        self,
1387        _common: &mut UnixSocketCommon,
1388        _socket: &Arc<AtomicRefCell<UnixSocket>>,
1389        _addr: &SockaddrStorage,
1390        _cb_queue: &mut CallbackQueue,
1391    ) -> (ProtocolState, Result<(), SyscallError>) {
1392        (self.into(), Err(Errno::EINVAL.into()))
1393    }
1394
1395    fn accept(
1396        &mut self,
1397        common: &mut UnixSocketCommon,
1398        cb_queue: &mut CallbackQueue,
1399    ) -> Result<OpenFile, SyscallError> {
1400        let child_socket = match self.queue.pop_front() {
1401            Some(x) => x,
1402            None => return Err(Errno::EWOULDBLOCK.into()),
1403        };
1404
1405        // refresh the socket's file state
1406        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1407
1408        Ok(OpenFile::new(File::Socket(Socket::Unix(child_socket))))
1409    }
1410
1411    fn queue_incoming_conn(
1412        &mut self,
1413        common: &mut UnixSocketCommon,
1414        from_address: Option<SockaddrUnix<libc::sockaddr_un>>,
1415        peer: &Arc<AtomicRefCell<UnixSocket>>,
1416        child_send_buffer: &Arc<AtomicRefCell<SharedBuf>>,
1417        cb_queue: &mut CallbackQueue,
1418    ) -> Result<&Arc<AtomicRefCell<UnixSocket>>, IncomingConnError> {
1419        if self.queue.len() >= self.queue_limit.try_into().unwrap() {
1420            assert!(!common.state.contains(FileState::SOCKET_ALLOWING_CONNECT));
1421            return Err(IncomingConnError::QueueFull);
1422        }
1423
1424        assert!(common.state.contains(FileState::SOCKET_ALLOWING_CONNECT));
1425
1426        let child_socket = UnixSocket::new(
1427            // copy the parent's status
1428            common.status,
1429            common.socket_type,
1430            &common.namespace,
1431        );
1432
1433        let child_recv_buffer = Arc::clone(&child_socket.borrow_mut().common.recv_buffer);
1434
1435        let weak = Arc::downgrade(&child_socket);
1436        let send_buffer_handle = child_send_buffer.borrow_mut().add_listener(
1437            BufferState::WRITABLE | BufferState::NO_READERS,
1438            BufferSignals::empty(),
1439            move |_, _, cb_queue| {
1440                if let Some(socket) = weak.upgrade() {
1441                    socket
1442                        .borrow_mut()
1443                        .refresh_file_state(FileSignals::empty(), cb_queue);
1444                }
1445            },
1446        );
1447
1448        // increment the buffer's writer count
1449        let writer_handle = child_send_buffer.borrow_mut().add_writer(cb_queue);
1450
1451        let weak = Arc::downgrade(&child_socket);
1452        let recv_buffer_handle = child_recv_buffer.borrow_mut().add_listener(
1453            BufferState::READABLE | BufferState::NO_WRITERS,
1454            BufferSignals::BUFFER_GREW,
1455            move |_, signals, cb_queue| {
1456                if let Some(socket) = weak.upgrade() {
1457                    let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
1458                        FileSignals::READ_BUFFER_GREW
1459                    } else {
1460                        FileSignals::empty()
1461                    };
1462                    socket.borrow_mut().refresh_file_state(signals, cb_queue);
1463                }
1464            },
1465        );
1466
1467        // increment the buffer's reader count
1468        let reader_handle = child_recv_buffer.borrow_mut().add_reader(cb_queue);
1469
1470        let new_child_state = ConnOrientedConnected {
1471            // use the parent's bind address
1472            bound_addr: Some(self.bound_addr),
1473            peer_addr: from_address,
1474            peer: Arc::clone(peer),
1475            reader_handle,
1476            writer_handle,
1477            _recv_buffer_handle: recv_buffer_handle,
1478            _send_buffer_handle: send_buffer_handle,
1479        };
1480
1481        // update the child socket's state
1482        child_socket.borrow_mut().protocol_state = new_child_state.into();
1483
1484        // defer refreshing the child socket's file-state until later
1485        let weak = Arc::downgrade(&child_socket);
1486        cb_queue.add(move |cb_queue| {
1487            if let Some(child_socket) = weak.upgrade() {
1488                child_socket
1489                    .borrow_mut()
1490                    .refresh_file_state(FileSignals::empty(), cb_queue);
1491            }
1492        });
1493
1494        // add the child socket to the accept queue
1495        self.queue.push_back(child_socket);
1496
1497        // refresh the server socket's file state
1498        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1499
1500        // return a reference to the enqueued child socket
1501        Ok(self.queue.back().unwrap())
1502    }
1503}
1504
1505impl Protocol for ConnOrientedConnected {
1506    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1507        Ok(self.peer_addr)
1508    }
1509
1510    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1511        Ok(self.bound_addr)
1512    }
1513
1514    fn refresh_file_state(
1515        &self,
1516        common: &mut UnixSocketCommon,
1517        signals: FileSignals,
1518        cb_queue: &mut CallbackQueue,
1519    ) {
1520        let mut new_state = FileState::ACTIVE;
1521
1522        {
1523            let recv_buffer = common.recv_buffer.borrow();
1524            let peer = self.peer.borrow();
1525            let send_buffer = peer.recv_buffer().borrow();
1526
1527            new_state.set(
1528                FileState::READABLE,
1529                recv_buffer.has_data() || recv_buffer.num_writers() == 0,
1530            );
1531            new_state.set(
1532                FileState::WRITABLE,
1533                common.sent_len < common.send_limit || send_buffer.num_readers() == 0,
1534            );
1535        }
1536
1537        common.update_state(
1538            /* mask= */ FileState::all(),
1539            new_state,
1540            signals,
1541            cb_queue,
1542        );
1543    }
1544
1545    fn close(
1546        self,
1547        common: &mut UnixSocketCommon,
1548        cb_queue: &mut CallbackQueue,
1549    ) -> (ProtocolState, Result<(), SyscallError>) {
1550        // inform the buffer that there is one fewer readers
1551        common
1552            .recv_buffer
1553            .borrow_mut()
1554            .remove_reader(self.reader_handle, cb_queue);
1555
1556        // inform the buffer that there is one fewer writers
1557        self.peer
1558            .borrow()
1559            .recv_buffer()
1560            .borrow_mut()
1561            .remove_writer(self.writer_handle, cb_queue);
1562
1563        let new_state = ConnOrientedClosed {};
1564        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1565        (new_state.into(), common.close(cb_queue))
1566    }
1567
1568    fn sendmsg(
1569        &mut self,
1570        common: &mut UnixSocketCommon,
1571        socket: &Arc<AtomicRefCell<UnixSocket>>,
1572        args: SendmsgArgs,
1573        mem: &mut MemoryManager,
1574        cb_queue: &mut CallbackQueue,
1575    ) -> Result<libc::ssize_t, SyscallError> {
1576        if !args.control_ptr.ptr().is_null() {
1577            log::debug!("Unix sockets don't yet support control data for sendmsg()");
1578            return Err(Errno::EINVAL.into());
1579        }
1580
1581        let recv_socket = common.resolve_destination(Some(&self.peer), args.addr)?;
1582        let rv = common.sendmsg(socket, args.iovs, args.flags, &recv_socket, mem, cb_queue)?;
1583
1584        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1585
1586        Ok(rv.try_into().unwrap())
1587    }
1588
1589    fn recvmsg(
1590        &mut self,
1591        common: &mut UnixSocketCommon,
1592        socket: &Arc<AtomicRefCell<UnixSocket>>,
1593        args: RecvmsgArgs,
1594        mem: &mut MemoryManager,
1595        cb_queue: &mut CallbackQueue,
1596    ) -> Result<RecvmsgReturn, SyscallError> {
1597        if !args.control_ptr.ptr().is_null() {
1598            log::debug!("Unix sockets don't yet support control data for recvmsg()");
1599            return Err(Errno::EINVAL.into());
1600        }
1601
1602        let (rv, num_removed_from_buf, msg_flags) =
1603            common.recvmsg(socket, args.iovs, args.flags, mem, cb_queue)?;
1604        let num_removed_from_buf = u64::try_from(num_removed_from_buf).unwrap();
1605
1606        if num_removed_from_buf > 0 {
1607            // defer informing the peer until we're done processing the current socket
1608            let peer = Arc::clone(&self.peer);
1609            cb_queue.add(move |cb_queue| {
1610                peer.borrow_mut()
1611                    .inform_bytes_read(num_removed_from_buf, cb_queue);
1612            });
1613        }
1614
1615        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1616
1617        Ok(RecvmsgReturn {
1618            return_val: rv.try_into().unwrap(),
1619            addr: self.peer_addr.map(Into::into),
1620            msg_flags,
1621            control_len: 0,
1622        })
1623    }
1624
1625    fn inform_bytes_read(
1626        &mut self,
1627        common: &mut UnixSocketCommon,
1628        num: u64,
1629        cb_queue: &mut CallbackQueue,
1630    ) {
1631        common.sent_len = common.sent_len.checked_sub(num).unwrap();
1632        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1633    }
1634
1635    fn ioctl(
1636        &mut self,
1637        common: &mut UnixSocketCommon,
1638        request: IoctlRequest,
1639        arg_ptr: ForeignPtr<()>,
1640        memory_manager: &mut MemoryManager,
1641    ) -> SyscallResult {
1642        common.ioctl(request, arg_ptr, memory_manager)
1643    }
1644
1645    fn accept(
1646        &mut self,
1647        _common: &mut UnixSocketCommon,
1648        _cb_queue: &mut CallbackQueue,
1649    ) -> Result<OpenFile, SyscallError> {
1650        log::warn!("accept() while in state {}", std::any::type_name::<Self>());
1651        Err(Errno::EINVAL.into())
1652    }
1653}
1654
1655impl Protocol for ConnOrientedClosed {
1656    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1657        Err(Errno::ENOTCONN)
1658    }
1659
1660    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1661        Err(Errno::EBADFD)
1662    }
1663
1664    fn refresh_file_state(
1665        &self,
1666        common: &mut UnixSocketCommon,
1667        signals: FileSignals,
1668        cb_queue: &mut CallbackQueue,
1669    ) {
1670        assert!(!signals.contains(FileSignals::READ_BUFFER_GREW));
1671        common.update_state(
1672            /* mask= */ FileState::all(),
1673            FileState::CLOSED,
1674            signals,
1675            cb_queue,
1676        );
1677    }
1678
1679    fn close(
1680        self,
1681        _common: &mut UnixSocketCommon,
1682        _cb_queue: &mut CallbackQueue,
1683    ) -> (ProtocolState, Result<(), SyscallError>) {
1684        // why are we trying to close an already closed file? we probably want a bt here...
1685        panic!("Trying to close an already closed socket");
1686    }
1687
1688    fn inform_bytes_read(
1689        &mut self,
1690        _common: &mut UnixSocketCommon,
1691        _num: u64,
1692        _cb_queue: &mut CallbackQueue,
1693    ) {
1694        // do nothing since we're already closed
1695    }
1696}
1697
1698impl Protocol for ConnLessInitial {
1699    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1700        match self.peer {
1701            Some(_) => Ok(self.peer_addr),
1702            None => Err(Errno::ENOTCONN),
1703        }
1704    }
1705
1706    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1707        Ok(self.bound_addr)
1708    }
1709
1710    fn refresh_file_state(
1711        &self,
1712        common: &mut UnixSocketCommon,
1713        signals: FileSignals,
1714        cb_queue: &mut CallbackQueue,
1715    ) {
1716        let mut new_state = FileState::ACTIVE;
1717
1718        {
1719            let recv_buffer = common.recv_buffer.borrow();
1720
1721            new_state.set(FileState::READABLE, recv_buffer.has_data());
1722            new_state.set(FileState::WRITABLE, common.sent_len < common.send_limit);
1723        }
1724
1725        common.update_state(
1726            /* mask= */ FileState::all(),
1727            new_state,
1728            signals,
1729            cb_queue,
1730        );
1731    }
1732
1733    fn close(
1734        self,
1735        common: &mut UnixSocketCommon,
1736        cb_queue: &mut CallbackQueue,
1737    ) -> (ProtocolState, Result<(), SyscallError>) {
1738        // inform the buffer that there is one fewer readers
1739        common
1740            .recv_buffer
1741            .borrow_mut()
1742            .remove_reader(self.reader_handle, cb_queue);
1743
1744        for byte_data in self.recv_data.into_iter() {
1745            // defer informing the senders until we're done processing the current socket
1746            cb_queue.add(move |cb_queue| {
1747                byte_data
1748                    .from_socket
1749                    .borrow_mut()
1750                    .inform_bytes_read(byte_data.num_bytes, cb_queue);
1751            });
1752        }
1753
1754        let new_state = ConnLessClosed {};
1755        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1756        (new_state.into(), common.close(cb_queue))
1757    }
1758
1759    fn bind(
1760        &mut self,
1761        common: &mut UnixSocketCommon,
1762        socket: &Arc<AtomicRefCell<UnixSocket>>,
1763        addr: Option<&SockaddrStorage>,
1764        rng: impl rand::Rng,
1765    ) -> Result<(), SyscallError> {
1766        // if already bound
1767        if self.bound_addr.is_some() {
1768            return Err(Errno::EINVAL.into());
1769        }
1770
1771        self.bound_addr = Some(common.bind(socket, addr, rng)?);
1772        Ok(())
1773    }
1774
1775    fn sendmsg(
1776        &mut self,
1777        common: &mut UnixSocketCommon,
1778        socket: &Arc<AtomicRefCell<UnixSocket>>,
1779        args: SendmsgArgs,
1780        mem: &mut MemoryManager,
1781        cb_queue: &mut CallbackQueue,
1782    ) -> Result<libc::ssize_t, SyscallError> {
1783        if !args.control_ptr.ptr().is_null() {
1784            log::debug!("Unix sockets don't yet support control data for sendmsg()");
1785            return Err(Errno::EINVAL.into());
1786        }
1787
1788        let recv_socket = common.resolve_destination(self.peer.as_ref(), args.addr)?;
1789        let rv = common.sendmsg(socket, args.iovs, args.flags, &recv_socket, mem, cb_queue)?;
1790
1791        let byte_data = ByteData {
1792            from_socket: self.this_socket.upgrade().unwrap(),
1793            from_addr: self.bound_addr,
1794            num_bytes: rv.try_into().unwrap(),
1795        };
1796
1797        match &mut recv_socket.borrow_mut().protocol_state {
1798            ProtocolState::ConnLessInitial(state) => {
1799                state.as_mut().unwrap().recv_data.push_back(byte_data);
1800            }
1801            _ => panic!(
1802                "Sending bytes to a socket in state {}",
1803                std::any::type_name::<Self>()
1804            ),
1805        }
1806
1807        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1808
1809        Ok(rv.try_into().unwrap())
1810    }
1811
1812    fn recvmsg(
1813        &mut self,
1814        common: &mut UnixSocketCommon,
1815        socket: &Arc<AtomicRefCell<UnixSocket>>,
1816        args: RecvmsgArgs,
1817        mem: &mut MemoryManager,
1818        cb_queue: &mut CallbackQueue,
1819    ) -> Result<RecvmsgReturn, SyscallError> {
1820        if !args.control_ptr.ptr().is_null() {
1821            log::debug!("Unix sockets don't yet support control data for recvmsg()");
1822            return Err(Errno::EINVAL.into());
1823        }
1824
1825        let (rv, num_removed_from_buf, msg_flags) =
1826            common.recvmsg(socket, args.iovs, args.flags, mem, cb_queue)?;
1827        let num_removed_from_buf = u64::try_from(num_removed_from_buf).unwrap();
1828
1829        let byte_data = self.recv_data.pop_front().unwrap();
1830        assert!(num_removed_from_buf == byte_data.num_bytes);
1831
1832        // defer informing the sender until we're done processing the current socket
1833        cb_queue.add(move |cb_queue| {
1834            byte_data
1835                .from_socket
1836                .borrow_mut()
1837                .inform_bytes_read(byte_data.num_bytes, cb_queue);
1838        });
1839
1840        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1841
1842        Ok(RecvmsgReturn {
1843            return_val: rv.try_into().unwrap(),
1844            addr: byte_data.from_addr.map(Into::into),
1845            msg_flags,
1846            control_len: 0,
1847        })
1848    }
1849
1850    fn inform_bytes_read(
1851        &mut self,
1852        common: &mut UnixSocketCommon,
1853        num: u64,
1854        cb_queue: &mut CallbackQueue,
1855    ) {
1856        common.sent_len = common.sent_len.checked_sub(num).unwrap();
1857        self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1858    }
1859
1860    fn ioctl(
1861        &mut self,
1862        common: &mut UnixSocketCommon,
1863        request: IoctlRequest,
1864        arg_ptr: ForeignPtr<()>,
1865        memory_manager: &mut MemoryManager,
1866    ) -> SyscallResult {
1867        common.ioctl(request, arg_ptr, memory_manager)
1868    }
1869
1870    fn connect(
1871        self,
1872        common: &mut UnixSocketCommon,
1873        _socket: &Arc<AtomicRefCell<UnixSocket>>,
1874        addr: &SockaddrStorage,
1875        cb_queue: &mut CallbackQueue,
1876    ) -> (ProtocolState, Result<(), SyscallError>) {
1877        // TODO: support AF_UNSPEC to disassociate
1878        let Some(addr) = addr.as_unix() else {
1879            return (self.into(), Err(Errno::EINVAL.into()));
1880        };
1881
1882        // find the socket bound at the address
1883        let peer = match lookup_address(&common.namespace.borrow(), common.socket_type, &addr) {
1884            Ok(x) => x,
1885            Err(e) => return (self.into(), Err(e.into())),
1886        };
1887
1888        let new_state = Self {
1889            peer_addr: Some(addr.into_owned()),
1890            peer: Some(peer),
1891            ..self
1892        };
1893
1894        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1895
1896        (new_state.into(), Ok(()))
1897    }
1898
1899    fn connect_unnamed(
1900        self,
1901        common: &mut UnixSocketCommon,
1902        _socket: &Arc<AtomicRefCell<UnixSocket>>,
1903        peer: Arc<AtomicRefCell<UnixSocket>>,
1904        cb_queue: &mut CallbackQueue,
1905    ) -> (ProtocolState, Result<(), SyscallError>) {
1906        assert!(self.peer_addr.is_none());
1907        assert!(self.bound_addr.is_none());
1908
1909        let new_state = Self {
1910            bound_addr: None,
1911            peer_addr: None,
1912            peer: Some(peer),
1913            ..self
1914        };
1915
1916        new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1917
1918        (new_state.into(), Ok(()))
1919    }
1920}
1921
1922impl Protocol for ConnLessClosed {
1923    fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1924        Ok(None)
1925    }
1926
1927    fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1928        Ok(None)
1929    }
1930
1931    fn refresh_file_state(
1932        &self,
1933        common: &mut UnixSocketCommon,
1934        signals: FileSignals,
1935        cb_queue: &mut CallbackQueue,
1936    ) {
1937        assert!(!signals.contains(FileSignals::READ_BUFFER_GREW));
1938        common.update_state(
1939            /* mask= */ FileState::all(),
1940            FileState::CLOSED,
1941            signals,
1942            cb_queue,
1943        );
1944    }
1945
1946    fn close(
1947        self,
1948        _common: &mut UnixSocketCommon,
1949        _cb_queue: &mut CallbackQueue,
1950    ) -> (ProtocolState, Result<(), SyscallError>) {
1951        // why are we trying to close an already closed file? we probably want a bt here...
1952        panic!("Trying to close an already closed socket");
1953    }
1954
1955    fn inform_bytes_read(
1956        &mut self,
1957        _common: &mut UnixSocketCommon,
1958        _num: u64,
1959        _cb_queue: &mut CallbackQueue,
1960    ) {
1961        // do nothing since we're already closed
1962    }
1963}
1964
1965/// Common data and functionality that is useful for all states.
1966struct UnixSocketCommon {
1967    recv_buffer: Arc<AtomicRefCell<SharedBuf>>,
1968    /// The max number of "in flight" bytes (sent but not yet read from the receiving socket).
1969    send_limit: u64,
1970    /// The number of "in flight" bytes.
1971    sent_len: u64,
1972    event_source: StateEventSource,
1973    state: FileState,
1974    status: FileStatus,
1975    socket_type: UnixSocketType,
1976    namespace: Arc<AtomicRefCell<AbstractUnixNamespace>>,
1977    // should only be used by `OpenFile` to make sure there is only ever one `OpenFile` instance for
1978    // this file
1979    has_open_file: bool,
1980}
1981
1982impl UnixSocketCommon {
1983    pub fn supports_sa_restart(&self) -> bool {
1984        true
1985    }
1986
1987    pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
1988        // check that the CLOSED flag was set by the protocol state
1989        if !self.state.contains(FileState::CLOSED) {
1990            // set the flag here since we missed doing it before
1991            // do this before the below panic, otherwise rust gives us warnings
1992            self.update_state(
1993                /* mask= */ FileState::all(),
1994                FileState::CLOSED,
1995                FileSignals::empty(),
1996                cb_queue,
1997            );
1998
1999            // panic in debug builds since the backtrace will be helpful for debugging
2000            debug_panic!("When closing a unix socket, the CLOSED flag was not set");
2001        }
2002
2003        Ok(())
2004    }
2005
2006    pub fn bind(
2007        &mut self,
2008        socket: &Arc<AtomicRefCell<UnixSocket>>,
2009        addr: Option<&SockaddrStorage>,
2010        rng: impl rand::Rng,
2011    ) -> Result<SockaddrUnix<libc::sockaddr_un>, SyscallError> {
2012        // get the unix address
2013        let Some(addr) = addr.and_then(|x| x.as_unix()) else {
2014            log::warn!(
2015                "Attempted to bind unix socket to non-unix address {:?}",
2016                addr
2017            );
2018            return Err(Errno::EINVAL.into());
2019        };
2020
2021        // bind the socket
2022        let bound_addr = if let Some(name) = addr.as_abstract() {
2023            // if given an abstract socket address
2024            let namespace = Arc::clone(&self.namespace);
2025            match AbstractUnixNamespace::bind(
2026                &namespace,
2027                self.socket_type,
2028                name.to_vec(),
2029                socket,
2030                &mut self.event_source,
2031            ) {
2032                Ok(()) => addr.into_owned(),
2033                // address is in use
2034                Err(_) => return Err(Errno::EADDRINUSE.into()),
2035            }
2036        } else if addr.is_unnamed() {
2037            // if given an "unnamed" address
2038            let namespace = Arc::clone(&self.namespace);
2039            match AbstractUnixNamespace::autobind(
2040                &namespace,
2041                self.socket_type,
2042                socket,
2043                &mut self.event_source,
2044                rng,
2045            ) {
2046                Ok(ref name) => SockaddrUnix::new_abstract(name).unwrap(),
2047                Err(_) => return Err(Errno::EADDRINUSE.into()),
2048            }
2049        } else {
2050            log::warn!("Only abstract names are currently supported for unix sockets");
2051            return Err(Errno::ENOTSUP.into());
2052        };
2053
2054        Ok(bound_addr)
2055    }
2056
2057    pub fn resolve_destination(
2058        &self,
2059        peer: Option<&Arc<AtomicRefCell<UnixSocket>>>,
2060        addr: Option<SockaddrStorage>,
2061    ) -> Result<Arc<AtomicRefCell<UnixSocket>>, SyscallError> {
2062        let addr = match addr {
2063            Some(ref addr) => Some(addr.as_unix().ok_or(Errno::EINVAL)?),
2064            None => None,
2065        };
2066
2067        // returns either the send buffer, or None if we should look up the send buffer from the
2068        // socket address
2069        let peer = match (peer, addr) {
2070            // already connected but a destination address was given
2071            (Some(peer), Some(_addr)) => match self.socket_type {
2072                UnixSocketType::Stream => return Err(Errno::EISCONN.into()),
2073                // linux seems to ignore the destination address for connected seq packet sockets
2074                UnixSocketType::SeqPacket => Some(peer),
2075                UnixSocketType::Dgram => None,
2076            },
2077            // already connected and no destination address was given
2078            (Some(peer), None) => Some(peer),
2079            // not connected but a destination address was given
2080            (None, Some(_addr)) => match self.socket_type {
2081                UnixSocketType::Stream => return Err(Errno::EOPNOTSUPP.into()),
2082                UnixSocketType::SeqPacket => return Err(Errno::ENOTCONN.into()),
2083                UnixSocketType::Dgram => None,
2084            },
2085            // not connected and no destination address given
2086            (None, None) => return Err(Errno::ENOTCONN.into()),
2087        };
2088
2089        // either use the existing send buffer, or look up the send buffer from the address
2090        let peer = match peer {
2091            Some(x) => Arc::clone(x),
2092            None => {
2093                // look up the socket from the address name
2094                let recv_socket =
2095                    lookup_address(&self.namespace.borrow(), self.socket_type, &addr.unwrap())?;
2096                // store an Arc of the recv buffer
2097                Arc::clone(&recv_socket)
2098            }
2099        };
2100
2101        Ok(peer)
2102    }
2103
2104    pub fn sendmsg(
2105        &mut self,
2106        socket: &Arc<AtomicRefCell<UnixSocket>>,
2107        iovs: &[IoVec],
2108        flags: libc::c_int,
2109        peer: &Arc<AtomicRefCell<UnixSocket>>,
2110        mem: &mut MemoryManager,
2111        cb_queue: &mut CallbackQueue,
2112    ) -> Result<usize, SyscallError> {
2113        // MSG_NOSIGNAL is currently a no-op, since we haven't implemented the behavior
2114        // it's meant to disable.
2115        // TODO: Once we've implemented generating a SIGPIPE when the peer on a
2116        // stream-oriented socket has closed the connection, MSG_NOSIGNAL should
2117        // disable it.
2118        // Ignore the MSG_TRUNC flag since it doesn't do anything when sending.
2119        let supported_flags = MsgFlags::MSG_DONTWAIT | MsgFlags::MSG_NOSIGNAL | MsgFlags::MSG_TRUNC;
2120
2121        // if there's a flag we don't support, it's probably best to raise an error rather than do
2122        // the wrong thing
2123        let Some(mut flags) = MsgFlags::from_bits(flags) else {
2124            log::warn!("Unrecognized send flags: {:#b}", flags);
2125            return Err(Errno::EINVAL.into());
2126        };
2127        if flags.intersects(!supported_flags) {
2128            log::warn!("Unsupported send flags: {:?}", flags);
2129            return Err(Errno::EINVAL.into());
2130        }
2131
2132        if self.status.contains(FileStatus::NONBLOCK) {
2133            flags.insert(MsgFlags::MSG_DONTWAIT);
2134        }
2135
2136        // run in a closure so that an early return doesn't return from the syscall handler
2137        let result = (|| {
2138            let peer_ref = peer.borrow();
2139            let mut send_buffer = peer_ref.recv_buffer().borrow_mut();
2140
2141            // if the buffer has no readers, the destination socket is closed
2142            if send_buffer.num_readers() == 0 {
2143                return Err(match self.socket_type {
2144                    // connection-oriented socket
2145                    UnixSocketType::Stream | UnixSocketType::SeqPacket => Errno::EPIPE,
2146                    // connectionless socket
2147                    UnixSocketType::Dgram => Errno::ECONNREFUSED,
2148                });
2149            }
2150
2151            let len = iovs.iter().map(|x| x.len).sum::<libc::size_t>();
2152
2153            // we keep track of the send buffer size manually, since the unix socket buffers all have
2154            // usize::MAX length
2155            let space_available = self
2156                .send_limit
2157                .saturating_sub(self.sent_len)
2158                .try_into()
2159                .unwrap();
2160
2161            if space_available == 0 {
2162                return Err(Errno::EAGAIN);
2163            }
2164
2165            let len = match self.socket_type {
2166                UnixSocketType::Stream => std::cmp::min(len, space_available),
2167                UnixSocketType::Dgram | UnixSocketType::SeqPacket => {
2168                    if len <= space_available {
2169                        len
2170                    } else if len <= self.send_limit.try_into().unwrap() {
2171                        // we can send this when the buffer has more space available
2172                        return Err(Errno::EAGAIN);
2173                    } else {
2174                        // we could never send this message
2175                        return Err(Errno::EMSGSIZE);
2176                    }
2177                }
2178            };
2179
2180            let reader = IoVecReader::new(iovs, mem);
2181            let reader = reader.take(len.try_into().unwrap());
2182
2183            let num_copied = match self.socket_type {
2184                UnixSocketType::Stream => {
2185                    if len == 0 {
2186                        0
2187                    } else {
2188                        send_buffer
2189                            .write_stream(reader, len, cb_queue)
2190                            .map_err(|e| Errno::try_from(e).unwrap())?
2191                    }
2192                }
2193                UnixSocketType::Dgram | UnixSocketType::SeqPacket => {
2194                    send_buffer
2195                        .write_packet(reader, len, cb_queue)
2196                        .map_err(|e| Errno::try_from(e).unwrap())?;
2197                    len
2198                }
2199            };
2200
2201            // if we successfully sent bytes, update the sent count
2202            self.sent_len += u64::try_from(num_copied).unwrap();
2203
2204            Ok(num_copied)
2205        })();
2206
2207        // if the syscall would block and we don't have the MSG_DONTWAIT flag
2208        if result.as_ref().err() == Some(&Errno::EWOULDBLOCK)
2209            && !flags.contains(MsgFlags::MSG_DONTWAIT)
2210        {
2211            return Err(SyscallError::new_blocked_on_file(
2212                File::Socket(Socket::Unix(socket.clone())),
2213                FileState::WRITABLE,
2214                self.supports_sa_restart(),
2215            ));
2216        }
2217
2218        Ok(result?)
2219    }
2220
2221    pub fn recvmsg(
2222        &mut self,
2223        socket: &Arc<AtomicRefCell<UnixSocket>>,
2224        iovs: &[IoVec],
2225        flags: libc::c_int,
2226        mem: &mut MemoryManager,
2227        cb_queue: &mut CallbackQueue,
2228    ) -> Result<(usize, usize, libc::c_int), SyscallError> {
2229        let supported_flags = MsgFlags::MSG_DONTWAIT | MsgFlags::MSG_TRUNC;
2230
2231        // if there's a flag we don't support, it's probably best to raise an error rather than do
2232        // the wrong thing
2233        let Some(mut flags) = MsgFlags::from_bits(flags) else {
2234            log::warn!("Unrecognized recv flags: {:#b}", flags);
2235            return Err(Errno::EINVAL.into());
2236        };
2237        if flags.intersects(!supported_flags) {
2238            log::warn!("Unsupported recv flags: {:?}", flags);
2239            return Err(Errno::EINVAL.into());
2240        }
2241
2242        if self.status.contains(FileStatus::NONBLOCK) {
2243            flags.insert(MsgFlags::MSG_DONTWAIT);
2244        }
2245
2246        // run in a closure so that an early return doesn't return from the syscall handler
2247        let result = (|| {
2248            let mut recv_buffer = self.recv_buffer.borrow_mut();
2249
2250            // the read would block if all:
2251            //  1. the recv buffer has no data
2252            //  2. it's a connectionless socket OR the connection-oriented destination socket is not
2253            //     closed
2254            if !recv_buffer.has_data()
2255                && (self.socket_type == UnixSocketType::Dgram || recv_buffer.num_writers() > 0)
2256            {
2257                // return EWOULDBLOCK even if 'bytes' has length 0
2258                return Err(Errno::EWOULDBLOCK);
2259            }
2260
2261            let writer = IoVecWriter::new(iovs, mem);
2262
2263            let (num_copied, num_removed_from_buf) = recv_buffer
2264                .read(writer, cb_queue)
2265                .map_err(|e| Errno::try_from(e).unwrap())?;
2266
2267            let mut msg_flags = 0;
2268
2269            if flags.contains(MsgFlags::MSG_TRUNC)
2270                && [UnixSocketType::Dgram, UnixSocketType::SeqPacket].contains(&self.socket_type)
2271            {
2272                if num_copied < num_removed_from_buf {
2273                    msg_flags |= libc::MSG_TRUNC;
2274                }
2275
2276                // we're a message-based socket and MSG_TRUNC is set, so return the total size of
2277                // the message, not the number of bytes we read
2278                Ok((num_removed_from_buf, num_removed_from_buf, msg_flags))
2279            } else {
2280                // We're a stream-based socket. Unlike TCP sockets, unix stream sockets ignore the
2281                // MSG_TRUNC flag.
2282                Ok((num_copied, num_removed_from_buf, msg_flags))
2283            }
2284        })();
2285
2286        // if the syscall would block and we don't have the MSG_DONTWAIT flag
2287        if result.as_ref().err() == Some(&Errno::EWOULDBLOCK)
2288            && !flags.contains(MsgFlags::MSG_DONTWAIT)
2289        {
2290            return Err(SyscallError::new_blocked_on_file(
2291                File::Socket(Socket::Unix(socket.clone())),
2292                FileState::READABLE,
2293                self.supports_sa_restart(),
2294            ));
2295        }
2296
2297        Ok(result?)
2298    }
2299
2300    pub fn ioctl(
2301        &mut self,
2302        request: IoctlRequest,
2303        _arg_ptr: ForeignPtr<()>,
2304        _memory_manager: &mut MemoryManager,
2305    ) -> SyscallResult {
2306        log::warn!("We do not yet handle ioctl request {request:?} on unix sockets");
2307        Err(Errno::EINVAL.into())
2308    }
2309
2310    fn update_state(
2311        &mut self,
2312        mask: FileState,
2313        state: FileState,
2314        signals: FileSignals,
2315        cb_queue: &mut CallbackQueue,
2316    ) {
2317        let old_state = self.state;
2318
2319        // remove the masked flags, then copy the masked flags
2320        self.state.remove(mask);
2321        self.state.insert(state & mask);
2322
2323        self.handle_state_change(old_state, signals, cb_queue);
2324    }
2325
2326    fn handle_state_change(
2327        &mut self,
2328        old_state: FileState,
2329        signals: FileSignals,
2330        cb_queue: &mut CallbackQueue,
2331    ) {
2332        let states_changed = self.state ^ old_state;
2333
2334        // if nothing changed
2335        if states_changed.is_empty() && signals.is_empty() {
2336            return;
2337        }
2338
2339        self.event_source
2340            .notify_listeners(self.state, states_changed, signals, cb_queue);
2341    }
2342}
2343
2344fn lookup_address(
2345    namespace: &AbstractUnixNamespace,
2346    socket_type: UnixSocketType,
2347    addr: &SockaddrUnix<&libc::sockaddr_un>,
2348) -> Result<Arc<AtomicRefCell<UnixSocket>>, linux_api::errno::Errno> {
2349    // if an abstract address
2350    if let Some(name) = addr.as_abstract() {
2351        // look up the socket from the address name
2352        namespace
2353            .lookup(socket_type, name)
2354            .ok_or(linux_api::errno::Errno::ECONNREFUSED)
2355    } else {
2356        warn_once_then_debug!("Unix sockets with pathname addresses are not yet supported");
2357        Err(linux_api::errno::Errno::ENOENT)
2358    }
2359}
2360
2361fn backlog_to_queue_size(backlog: i32) -> u32 {
2362    // linux also makes this cast, so negative backlogs wrap around to large positive backlogs
2363    // https://elixir.free-electrons.com/linux/v5.11.22/source/net/unix/af_unix.c#L628
2364    let backlog = backlog as u32;
2365
2366    // the linux '__sys_listen()' applies the somaxconn max to all protocols, including unix sockets
2367    let queue_limit = std::cmp::min(backlog, c::SHADOW_SOMAXCONN);
2368
2369    // linux uses a limit of one greater than the provided backlog (ex: a backlog value of 0 allows
2370    // for one incoming connection at a time)
2371    queue_limit.saturating_add(1)
2372}
2373
2374// WARNING: don't add new enum variants without updating 'AbstractUnixNamespace::new()'
2375#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
2376pub enum UnixSocketType {
2377    Stream,
2378    Dgram,
2379    SeqPacket,
2380}
2381
2382impl TryFrom<libc::c_int> for UnixSocketType {
2383    type Error = UnixSocketTypeConversionError;
2384    fn try_from(val: libc::c_int) -> Result<Self, Self::Error> {
2385        match val {
2386            libc::SOCK_STREAM => Ok(Self::Stream),
2387            libc::SOCK_DGRAM => Ok(Self::Dgram),
2388            libc::SOCK_SEQPACKET => Ok(Self::SeqPacket),
2389            x => Err(UnixSocketTypeConversionError(x)),
2390        }
2391    }
2392}
2393
2394#[derive(Copy, Clone, Debug)]
2395pub struct UnixSocketTypeConversionError(libc::c_int);
2396
2397impl std::error::Error for UnixSocketTypeConversionError {}
2398
2399impl std::fmt::Display for UnixSocketTypeConversionError {
2400    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2401        write!(
2402            f,
2403            "Invalid socket type {}; unix sockets only support SOCK_STREAM, SOCK_DGRAM, and SOCK_SEQPACKET",
2404            self.0
2405        )
2406    }
2407}
2408
2409#[derive(Copy, Clone, Debug)]
2410enum IncomingConnError {
2411    QueueFull,
2412    NotSupported,
2413}
2414
2415struct ByteData {
2416    from_socket: Arc<AtomicRefCell<UnixSocket>>,
2417    from_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
2418    num_bytes: u64,
2419}