1use std::collections::{LinkedList, VecDeque};
2use std::io::Read;
3use std::ops::DerefMut;
4use std::sync::{Arc, Weak};
5
6use atomic_refcell::AtomicRefCell;
7use linux_api::errno::Errno;
8use linux_api::ioctls::IoctlRequest;
9use linux_api::socket::Shutdown;
10use nix::sys::socket::MsgFlags;
11use shadow_shim_helper_rs::syscall_types::ForeignPtr;
12
13use crate::cshadow as c;
14use crate::host::descriptor::listener::{StateEventSource, StateListenHandle, StateListenerFilter};
15use crate::host::descriptor::shared_buf::{
16 BufferHandle, BufferSignals, BufferState, ReaderHandle, SharedBuf, WriterHandle,
17};
18use crate::host::descriptor::socket::abstract_unix_ns::AbstractUnixNamespace;
19use crate::host::descriptor::socket::{RecvmsgArgs, RecvmsgReturn, SendmsgArgs, Socket};
20use crate::host::descriptor::{
21 File, FileMode, FileSignals, FileState, FileStatus, OpenFile, SyscallResult,
22};
23use crate::host::memory_manager::MemoryManager;
24use crate::host::network::namespace::NetworkNamespace;
25use crate::host::syscall::io::{IoVec, IoVecReader, IoVecWriter};
26use crate::host::syscall::types::SyscallError;
27use crate::utility::HostTreePointer;
28use crate::utility::callback_queue::CallbackQueue;
29use crate::utility::sockaddr::{SockaddrStorage, SockaddrUnix};
30
31const UNIX_SOCKET_DEFAULT_BUFFER_SIZE: u64 = 212_992;
32
33pub struct UnixSocket {
36 common: UnixSocketCommon,
38 protocol_state: ProtocolState,
40}
41
42impl UnixSocket {
43 pub fn new(
44 status: FileStatus,
45 socket_type: UnixSocketType,
46 namespace: &Arc<AtomicRefCell<AbstractUnixNamespace>>,
47 ) -> Arc<AtomicRefCell<Self>> {
48 Arc::new_cyclic(|weak| {
49 let recv_buffer = SharedBuf::new(usize::MAX);
52 let recv_buffer = Arc::new(AtomicRefCell::new(recv_buffer));
53
54 let mut common = UnixSocketCommon {
55 recv_buffer,
56 send_limit: UNIX_SOCKET_DEFAULT_BUFFER_SIZE,
57 sent_len: 0,
58 event_source: StateEventSource::new(),
59 state: FileState::ACTIVE,
60 status,
61 socket_type,
62 namespace: Arc::clone(namespace),
63 has_open_file: false,
64 };
65
66 let protocol_state = ProtocolState::new(socket_type, &mut common, weak);
68
69 AtomicRefCell::new(Self {
70 common,
71 protocol_state,
72 })
73 })
74 }
75
76 pub fn status(&self) -> FileStatus {
77 self.common.status
78 }
79
80 pub fn set_status(&mut self, status: FileStatus) {
81 self.common.status = status;
82 }
83
84 pub fn mode(&self) -> FileMode {
85 FileMode::READ | FileMode::WRITE
86 }
87
88 pub fn has_open_file(&self) -> bool {
89 self.common.has_open_file
90 }
91
92 pub fn supports_sa_restart(&self) -> bool {
93 self.common.supports_sa_restart()
94 }
95
96 pub fn set_has_open_file(&mut self, val: bool) {
97 self.common.has_open_file = val;
98 }
99
100 pub fn getsockname(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
101 Ok(Some(
103 self.protocol_state
104 .bound_address()?
105 .unwrap_or_else(SockaddrUnix::new_unnamed),
106 ))
107 }
108
109 pub fn getpeername(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
110 Ok(Some(
112 self.protocol_state
113 .peer_address()?
114 .unwrap_or_else(SockaddrUnix::new_unnamed),
115 ))
116 }
117
118 pub fn address_family(&self) -> linux_api::socket::AddressFamily {
119 linux_api::socket::AddressFamily::AF_UNIX
120 }
121
122 fn recv_buffer(&self) -> &Arc<AtomicRefCell<SharedBuf>> {
123 &self.common.recv_buffer
124 }
125
126 fn inform_bytes_read(&mut self, num: u64, cb_queue: &mut CallbackQueue) {
127 self.protocol_state
128 .inform_bytes_read(&mut self.common, num, cb_queue);
129 }
130
131 pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
132 self.protocol_state.close(&mut self.common, cb_queue)
133 }
134
135 fn refresh_file_state(&mut self, signals: FileSignals, cb_queue: &mut CallbackQueue) {
136 self.protocol_state
137 .refresh_file_state(&mut self.common, signals, cb_queue)
138 }
139
140 pub fn bind(
141 socket: &Arc<AtomicRefCell<Self>>,
142 addr: Option<&SockaddrStorage>,
143 _net_ns: &NetworkNamespace,
144 rng: impl rand::Rng,
145 ) -> Result<(), SyscallError> {
146 let socket_ref = &mut *socket.borrow_mut();
147 socket_ref
148 .protocol_state
149 .bind(&mut socket_ref.common, socket, addr, rng)
150 }
151
152 pub fn readv(
153 &mut self,
154 _iovs: &[IoVec],
155 _offset: Option<libc::off_t>,
156 _flags: libc::c_int,
157 _mem: &mut MemoryManager,
158 _cb_queue: &mut CallbackQueue,
159 ) -> Result<libc::ssize_t, SyscallError> {
160 panic!("Called UnixSocket::readv() on a unix socket.");
164 }
165
166 pub fn writev(
167 &mut self,
168 _iovs: &[IoVec],
169 _offset: Option<libc::off_t>,
170 _flags: libc::c_int,
171 _mem: &mut MemoryManager,
172 _cb_queue: &mut CallbackQueue,
173 ) -> Result<libc::ssize_t, SyscallError> {
174 panic!("Called UnixSocket::writev() on a unix socket");
178 }
179
180 pub fn sendmsg(
181 socket: &Arc<AtomicRefCell<Self>>,
182 args: SendmsgArgs,
183 mem: &mut MemoryManager,
184 _net_ns: &NetworkNamespace,
185 _rng: impl rand::Rng,
186 cb_queue: &mut CallbackQueue,
187 ) -> Result<libc::ssize_t, SyscallError> {
188 let socket_ref = &mut *socket.borrow_mut();
189 socket_ref
190 .protocol_state
191 .sendmsg(&mut socket_ref.common, socket, args, mem, cb_queue)
192 }
193
194 pub fn recvmsg(
195 socket: &Arc<AtomicRefCell<Self>>,
196 args: RecvmsgArgs,
197 mem: &mut MemoryManager,
198 cb_queue: &mut CallbackQueue,
199 ) -> Result<RecvmsgReturn, SyscallError> {
200 let socket_ref = &mut *socket.borrow_mut();
201 socket_ref
202 .protocol_state
203 .recvmsg(&mut socket_ref.common, socket, args, mem, cb_queue)
204 }
205
206 pub fn ioctl(
207 &mut self,
208 request: IoctlRequest,
209 arg_ptr: ForeignPtr<()>,
210 memory_manager: &mut MemoryManager,
211 ) -> SyscallResult {
212 self.protocol_state
213 .ioctl(&mut self.common, request, arg_ptr, memory_manager)
214 }
215
216 pub fn stat(&self) -> Result<linux_api::stat::stat, SyscallError> {
217 warn_once_then_debug!("We do not yet handle stat calls on unix sockets");
218 Err(Errno::EINVAL.into())
219 }
220
221 pub fn listen(
222 socket: &Arc<AtomicRefCell<Self>>,
223 backlog: i32,
224 _net_ns: &NetworkNamespace,
225 _rng: impl rand::Rng,
226 cb_queue: &mut CallbackQueue,
227 ) -> Result<(), Errno> {
228 let mut socket_ref = socket.borrow_mut();
229 let socket_ref = socket_ref.deref_mut();
230 socket_ref
231 .protocol_state
232 .listen(&mut socket_ref.common, backlog, cb_queue)
233 }
234
235 pub fn connect(
236 socket: &Arc<AtomicRefCell<Self>>,
237 addr: &SockaddrStorage,
238 _net_ns: &NetworkNamespace,
239 _rng: impl rand::Rng,
240 cb_queue: &mut CallbackQueue,
241 ) -> Result<(), SyscallError> {
242 let socket_ref = &mut *socket.borrow_mut();
243 socket_ref
244 .protocol_state
245 .connect(&mut socket_ref.common, socket, addr, cb_queue)
246 }
247
248 pub fn accept(
249 &mut self,
250 _net_ns: &NetworkNamespace,
251 _rng: impl rand::Rng,
252 cb_queue: &mut CallbackQueue,
253 ) -> Result<OpenFile, SyscallError> {
254 self.protocol_state.accept(&mut self.common, cb_queue)
255 }
256
257 pub fn shutdown(
258 &mut self,
259 _how: Shutdown,
260 _cb_queue: &mut CallbackQueue,
261 ) -> Result<(), SyscallError> {
262 log::warn!("shutdown() syscall not yet supported for unix sockets; Returning ENOSYS");
263 Err(Errno::ENOSYS.into())
264 }
265
266 pub fn getsockopt(
267 &mut self,
268 _level: libc::c_int,
269 _optname: libc::c_int,
270 _optval_ptr: ForeignPtr<()>,
271 _optlen: libc::socklen_t,
272 _memory_manager: &mut MemoryManager,
273 _cb_queue: &mut CallbackQueue,
274 ) -> Result<libc::socklen_t, SyscallError> {
275 log::warn!("getsockopt() syscall not yet supported for unix sockets; Returning ENOSYS");
276 Err(Errno::ENOSYS.into())
277 }
278
279 pub fn setsockopt(
280 &mut self,
281 _level: libc::c_int,
282 _optname: libc::c_int,
283 _optval_ptr: ForeignPtr<()>,
284 _optlen: libc::socklen_t,
285 _memory_manager: &MemoryManager,
286 ) -> Result<(), SyscallError> {
287 log::warn!("setsockopt() syscall not yet supported for unix sockets; Returning ENOSYS");
288 Err(Errno::ENOSYS.into())
289 }
290
291 pub fn pair(
292 status: FileStatus,
293 socket_type: UnixSocketType,
294 namespace: &Arc<AtomicRefCell<AbstractUnixNamespace>>,
295 cb_queue: &mut CallbackQueue,
296 ) -> (Arc<AtomicRefCell<Self>>, Arc<AtomicRefCell<Self>>) {
297 let socket_1 = UnixSocket::new(status, socket_type, namespace);
298 let socket_2 = UnixSocket::new(status, socket_type, namespace);
299
300 {
301 let socket_1_ref = &mut *socket_1.borrow_mut();
302 socket_1_ref
303 .protocol_state
304 .connect_unnamed(
305 &mut socket_1_ref.common,
306 &socket_1,
307 Arc::clone(&socket_2),
308 cb_queue,
309 )
310 .unwrap();
311 }
312
313 {
314 let socket_2_ref = &mut *socket_2.borrow_mut();
315 socket_2_ref
316 .protocol_state
317 .connect_unnamed(
318 &mut socket_2_ref.common,
319 &socket_2,
320 Arc::clone(&socket_1),
321 cb_queue,
322 )
323 .unwrap();
324 }
325
326 (socket_1, socket_2)
327 }
328
329 pub fn add_listener(
330 &mut self,
331 monitoring_state: FileState,
332 monitoring_signals: FileSignals,
333 filter: StateListenerFilter,
334 notify_fn: impl Fn(FileState, FileState, FileSignals, &mut CallbackQueue)
335 + Send
336 + Sync
337 + 'static,
338 ) -> StateListenHandle {
339 self.common.event_source.add_listener(
340 monitoring_state,
341 monitoring_signals,
342 filter,
343 notify_fn,
344 )
345 }
346
347 pub fn add_legacy_listener(&mut self, ptr: HostTreePointer<c::StatusListener>) {
348 self.common.event_source.add_legacy_listener(ptr);
349 }
350
351 pub fn remove_legacy_listener(&mut self, ptr: *mut c::StatusListener) {
352 self.common.event_source.remove_legacy_listener(ptr);
353 }
354
355 pub fn state(&self) -> FileState {
356 self.common.state
357 }
358}
359
360struct ConnOrientedInitial {
361 bound_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
362}
363struct ConnOrientedListening {
364 bound_addr: SockaddrUnix<libc::sockaddr_un>,
365 queue: VecDeque<Arc<AtomicRefCell<UnixSocket>>>,
366 queue_limit: u32,
367}
368struct ConnOrientedConnected {
369 bound_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
370 peer_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
371 peer: Arc<AtomicRefCell<UnixSocket>>,
372 reader_handle: ReaderHandle,
373 writer_handle: WriterHandle,
374 _recv_buffer_handle: BufferHandle,
376 _send_buffer_handle: BufferHandle,
377}
378struct ConnOrientedClosed {}
379
380struct ConnLessInitial {
381 this_socket: Weak<AtomicRefCell<UnixSocket>>,
382 bound_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
383 peer_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
384 peer: Option<Arc<AtomicRefCell<UnixSocket>>>,
385 recv_data: LinkedList<ByteData>,
386 reader_handle: ReaderHandle,
387 _recv_buffer_handle: BufferHandle,
389}
390struct ConnLessClosed {}
391
392impl ConnOrientedListening {
393 fn queue_is_full(&self) -> bool {
394 self.queue.len() >= self.queue_limit.try_into().unwrap()
395 }
396}
397
398enum ProtocolState {
402 ConnOrientedInitial(Option<ConnOrientedInitial>),
403 ConnOrientedListening(Option<ConnOrientedListening>),
404 ConnOrientedConnected(Option<ConnOrientedConnected>),
405 ConnOrientedClosed(Option<ConnOrientedClosed>),
406 ConnLessInitial(Option<ConnLessInitial>),
407 ConnLessClosed(Option<ConnLessClosed>),
408}
409
410macro_rules! state_upcast {
412 ($type:ty, $parent:ident::$variant:ident) => {
413 impl From<$type> for $parent {
414 fn from(x: $type) -> Self {
415 Self::$variant(Some(x))
416 }
417 }
418 };
419}
420
421state_upcast!(ConnOrientedInitial, ProtocolState::ConnOrientedInitial);
423state_upcast!(ConnOrientedListening, ProtocolState::ConnOrientedListening);
424state_upcast!(ConnOrientedConnected, ProtocolState::ConnOrientedConnected);
425state_upcast!(ConnOrientedClosed, ProtocolState::ConnOrientedClosed);
426state_upcast!(ConnLessInitial, ProtocolState::ConnLessInitial);
427state_upcast!(ConnLessClosed, ProtocolState::ConnLessClosed);
428
429impl ProtocolState {
430 fn new(
431 socket_type: UnixSocketType,
432 common: &mut UnixSocketCommon,
433 socket: &Weak<AtomicRefCell<UnixSocket>>,
434 ) -> Self {
435 match socket_type {
436 UnixSocketType::Stream | UnixSocketType::SeqPacket => {
437 Self::ConnOrientedInitial(Some(ConnOrientedInitial { bound_addr: None }))
438 }
439 UnixSocketType::Dgram => {
440 let mut cb_queue = CallbackQueue::new();
442
443 let reader_handle = common.recv_buffer.borrow_mut().add_reader(&mut cb_queue);
448
449 let weak = Weak::clone(socket);
450 let recv_buffer_handle = common.recv_buffer.borrow_mut().add_listener(
451 BufferState::READABLE,
452 BufferSignals::BUFFER_GREW,
453 move |_, signals, cb_queue| {
454 if let Some(socket) = weak.upgrade() {
455 let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
456 FileSignals::READ_BUFFER_GREW
457 } else {
458 FileSignals::empty()
459 };
460
461 socket.borrow_mut().refresh_file_state(signals, cb_queue);
462 }
463 },
464 );
465
466 assert!(cb_queue.is_empty());
470
471 Self::ConnLessInitial(Some(ConnLessInitial {
472 this_socket: Weak::clone(socket),
473 bound_addr: None,
474 peer_addr: None,
475 peer: None,
476 recv_data: LinkedList::new(),
477 reader_handle,
478 _recv_buffer_handle: recv_buffer_handle,
479 }))
480 }
481 }
482 }
483
484 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
485 match self {
486 Self::ConnOrientedInitial(x) => x.as_ref().unwrap().peer_address(),
487 Self::ConnOrientedListening(x) => x.as_ref().unwrap().peer_address(),
488 Self::ConnOrientedConnected(x) => x.as_ref().unwrap().peer_address(),
489 Self::ConnOrientedClosed(x) => x.as_ref().unwrap().peer_address(),
490 Self::ConnLessInitial(x) => x.as_ref().unwrap().peer_address(),
491 Self::ConnLessClosed(x) => x.as_ref().unwrap().peer_address(),
492 }
493 }
494
495 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
496 match self {
497 Self::ConnOrientedInitial(x) => x.as_ref().unwrap().bound_address(),
498 Self::ConnOrientedListening(x) => x.as_ref().unwrap().bound_address(),
499 Self::ConnOrientedConnected(x) => x.as_ref().unwrap().bound_address(),
500 Self::ConnOrientedClosed(x) => x.as_ref().unwrap().bound_address(),
501 Self::ConnLessInitial(x) => x.as_ref().unwrap().bound_address(),
502 Self::ConnLessClosed(x) => x.as_ref().unwrap().bound_address(),
503 }
504 }
505
506 fn refresh_file_state(
507 &self,
508 common: &mut UnixSocketCommon,
509 signals: FileSignals,
510 cb_queue: &mut CallbackQueue,
511 ) {
512 match self {
513 Self::ConnOrientedInitial(x) => x
514 .as_ref()
515 .unwrap()
516 .refresh_file_state(common, signals, cb_queue),
517 Self::ConnOrientedListening(x) => x
518 .as_ref()
519 .unwrap()
520 .refresh_file_state(common, signals, cb_queue),
521 Self::ConnOrientedConnected(x) => x
522 .as_ref()
523 .unwrap()
524 .refresh_file_state(common, signals, cb_queue),
525 Self::ConnOrientedClosed(x) => x
526 .as_ref()
527 .unwrap()
528 .refresh_file_state(common, signals, cb_queue),
529 Self::ConnLessInitial(x) => x
530 .as_ref()
531 .unwrap()
532 .refresh_file_state(common, signals, cb_queue),
533 Self::ConnLessClosed(x) => x
534 .as_ref()
535 .unwrap()
536 .refresh_file_state(common, signals, cb_queue),
537 }
538 }
539
540 fn close(
541 &mut self,
542 common: &mut UnixSocketCommon,
543 cb_queue: &mut CallbackQueue,
544 ) -> Result<(), SyscallError> {
545 let (new_state, rv) = match self {
546 Self::ConnOrientedInitial(x) => x.take().unwrap().close(common, cb_queue),
547 Self::ConnOrientedListening(x) => x.take().unwrap().close(common, cb_queue),
548 Self::ConnOrientedConnected(x) => x.take().unwrap().close(common, cb_queue),
549 Self::ConnOrientedClosed(x) => x.take().unwrap().close(common, cb_queue),
550 Self::ConnLessInitial(x) => x.take().unwrap().close(common, cb_queue),
551 Self::ConnLessClosed(x) => x.take().unwrap().close(common, cb_queue),
552 };
553
554 *self = new_state;
555 rv
556 }
557
558 fn bind(
559 &mut self,
560 common: &mut UnixSocketCommon,
561 socket: &Arc<AtomicRefCell<UnixSocket>>,
562 addr: Option<&SockaddrStorage>,
563 rng: impl rand::Rng,
564 ) -> Result<(), SyscallError> {
565 match self {
566 Self::ConnOrientedInitial(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
567 Self::ConnOrientedListening(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
568 Self::ConnOrientedConnected(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
569 Self::ConnOrientedClosed(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
570 Self::ConnLessInitial(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
571 Self::ConnLessClosed(x) => x.as_mut().unwrap().bind(common, socket, addr, rng),
572 }
573 }
574
575 fn sendmsg(
576 &mut self,
577 common: &mut UnixSocketCommon,
578 socket: &Arc<AtomicRefCell<UnixSocket>>,
579 args: SendmsgArgs,
580 mem: &mut MemoryManager,
581 cb_queue: &mut CallbackQueue,
582 ) -> Result<libc::ssize_t, SyscallError> {
583 match self {
584 Self::ConnOrientedInitial(x) => x
585 .as_mut()
586 .unwrap()
587 .sendmsg(common, socket, args, mem, cb_queue),
588 Self::ConnOrientedListening(x) => x
589 .as_mut()
590 .unwrap()
591 .sendmsg(common, socket, args, mem, cb_queue),
592 Self::ConnOrientedConnected(x) => x
593 .as_mut()
594 .unwrap()
595 .sendmsg(common, socket, args, mem, cb_queue),
596 Self::ConnOrientedClosed(x) => x
597 .as_mut()
598 .unwrap()
599 .sendmsg(common, socket, args, mem, cb_queue),
600 Self::ConnLessInitial(x) => x
601 .as_mut()
602 .unwrap()
603 .sendmsg(common, socket, args, mem, cb_queue),
604 Self::ConnLessClosed(x) => x
605 .as_mut()
606 .unwrap()
607 .sendmsg(common, socket, args, mem, cb_queue),
608 }
609 }
610
611 fn recvmsg(
612 &mut self,
613 common: &mut UnixSocketCommon,
614 socket: &Arc<AtomicRefCell<UnixSocket>>,
615 args: RecvmsgArgs,
616 mem: &mut MemoryManager,
617 cb_queue: &mut CallbackQueue,
618 ) -> Result<RecvmsgReturn, SyscallError> {
619 match self {
620 Self::ConnOrientedInitial(x) => x
621 .as_mut()
622 .unwrap()
623 .recvmsg(common, socket, args, mem, cb_queue),
624 Self::ConnOrientedListening(x) => x
625 .as_mut()
626 .unwrap()
627 .recvmsg(common, socket, args, mem, cb_queue),
628 Self::ConnOrientedConnected(x) => x
629 .as_mut()
630 .unwrap()
631 .recvmsg(common, socket, args, mem, cb_queue),
632 Self::ConnOrientedClosed(x) => x
633 .as_mut()
634 .unwrap()
635 .recvmsg(common, socket, args, mem, cb_queue),
636 Self::ConnLessInitial(x) => x
637 .as_mut()
638 .unwrap()
639 .recvmsg(common, socket, args, mem, cb_queue),
640 Self::ConnLessClosed(x) => x
641 .as_mut()
642 .unwrap()
643 .recvmsg(common, socket, args, mem, cb_queue),
644 }
645 }
646
647 fn inform_bytes_read(
648 &mut self,
649 common: &mut UnixSocketCommon,
650 num: u64,
651 cb_queue: &mut CallbackQueue,
652 ) {
653 match self {
654 Self::ConnOrientedInitial(x) => {
655 x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
656 }
657 Self::ConnOrientedListening(x) => {
658 x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
659 }
660 Self::ConnOrientedConnected(x) => {
661 x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
662 }
663 Self::ConnOrientedClosed(x) => {
664 x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
665 }
666 Self::ConnLessInitial(x) => {
667 x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue)
668 }
669 Self::ConnLessClosed(x) => x.as_mut().unwrap().inform_bytes_read(common, num, cb_queue),
670 }
671 }
672
673 fn ioctl(
674 &mut self,
675 common: &mut UnixSocketCommon,
676 request: IoctlRequest,
677 arg_ptr: ForeignPtr<()>,
678 memory_manager: &mut MemoryManager,
679 ) -> SyscallResult {
680 match self {
681 Self::ConnOrientedInitial(x) => {
682 x.as_mut()
683 .unwrap()
684 .ioctl(common, request, arg_ptr, memory_manager)
685 }
686 Self::ConnOrientedListening(x) => {
687 x.as_mut()
688 .unwrap()
689 .ioctl(common, request, arg_ptr, memory_manager)
690 }
691 Self::ConnOrientedConnected(x) => {
692 x.as_mut()
693 .unwrap()
694 .ioctl(common, request, arg_ptr, memory_manager)
695 }
696 Self::ConnOrientedClosed(x) => {
697 x.as_mut()
698 .unwrap()
699 .ioctl(common, request, arg_ptr, memory_manager)
700 }
701 Self::ConnLessInitial(x) => {
702 x.as_mut()
703 .unwrap()
704 .ioctl(common, request, arg_ptr, memory_manager)
705 }
706 Self::ConnLessClosed(x) => {
707 x.as_mut()
708 .unwrap()
709 .ioctl(common, request, arg_ptr, memory_manager)
710 }
711 }
712 }
713
714 fn listen(
715 &mut self,
716 common: &mut UnixSocketCommon,
717 backlog: i32,
718 cb_queue: &mut CallbackQueue,
719 ) -> Result<(), Errno> {
720 let (new_state, rv) = match self {
721 Self::ConnOrientedInitial(x) => x.take().unwrap().listen(common, backlog, cb_queue),
722 Self::ConnOrientedListening(x) => x.take().unwrap().listen(common, backlog, cb_queue),
723 Self::ConnOrientedConnected(x) => x.take().unwrap().listen(common, backlog, cb_queue),
724 Self::ConnOrientedClosed(x) => x.take().unwrap().listen(common, backlog, cb_queue),
725 Self::ConnLessInitial(x) => x.take().unwrap().listen(common, backlog, cb_queue),
726 Self::ConnLessClosed(x) => x.take().unwrap().listen(common, backlog, cb_queue),
727 };
728
729 *self = new_state;
730 rv
731 }
732
733 fn connect(
734 &mut self,
735 common: &mut UnixSocketCommon,
736 socket: &Arc<AtomicRefCell<UnixSocket>>,
737 addr: &SockaddrStorage,
738 cb_queue: &mut CallbackQueue,
739 ) -> Result<(), SyscallError> {
740 let (new_state, rv) = match self {
741 Self::ConnOrientedInitial(x) => {
742 x.take().unwrap().connect(common, socket, addr, cb_queue)
743 }
744 Self::ConnOrientedListening(x) => {
745 x.take().unwrap().connect(common, socket, addr, cb_queue)
746 }
747 Self::ConnOrientedConnected(x) => {
748 x.take().unwrap().connect(common, socket, addr, cb_queue)
749 }
750 Self::ConnOrientedClosed(x) => {
751 x.take().unwrap().connect(common, socket, addr, cb_queue)
752 }
753 Self::ConnLessInitial(x) => x.take().unwrap().connect(common, socket, addr, cb_queue),
754 Self::ConnLessClosed(x) => x.take().unwrap().connect(common, socket, addr, cb_queue),
755 };
756
757 *self = new_state;
758 rv
759 }
760
761 fn connect_unnamed(
762 &mut self,
763 common: &mut UnixSocketCommon,
764 socket: &Arc<AtomicRefCell<UnixSocket>>,
765 peer: Arc<AtomicRefCell<UnixSocket>>,
766 cb_queue: &mut CallbackQueue,
767 ) -> Result<(), SyscallError> {
768 let (new_state, rv) = match self {
769 Self::ConnOrientedInitial(x) => x
770 .take()
771 .unwrap()
772 .connect_unnamed(common, socket, peer, cb_queue),
773 Self::ConnOrientedListening(x) => x
774 .take()
775 .unwrap()
776 .connect_unnamed(common, socket, peer, cb_queue),
777 Self::ConnOrientedConnected(x) => x
778 .take()
779 .unwrap()
780 .connect_unnamed(common, socket, peer, cb_queue),
781 Self::ConnOrientedClosed(x) => x
782 .take()
783 .unwrap()
784 .connect_unnamed(common, socket, peer, cb_queue),
785 Self::ConnLessInitial(x) => x
786 .take()
787 .unwrap()
788 .connect_unnamed(common, socket, peer, cb_queue),
789 Self::ConnLessClosed(x) => x
790 .take()
791 .unwrap()
792 .connect_unnamed(common, socket, peer, cb_queue),
793 };
794
795 *self = new_state;
796 rv
797 }
798
799 fn accept(
800 &mut self,
801 common: &mut UnixSocketCommon,
802 cb_queue: &mut CallbackQueue,
803 ) -> Result<OpenFile, SyscallError> {
804 match self {
805 Self::ConnOrientedInitial(x) => x.as_mut().unwrap().accept(common, cb_queue),
806 Self::ConnOrientedListening(x) => x.as_mut().unwrap().accept(common, cb_queue),
807 Self::ConnOrientedConnected(x) => x.as_mut().unwrap().accept(common, cb_queue),
808 Self::ConnOrientedClosed(x) => x.as_mut().unwrap().accept(common, cb_queue),
809 Self::ConnLessInitial(x) => x.as_mut().unwrap().accept(common, cb_queue),
810 Self::ConnLessClosed(x) => x.as_mut().unwrap().accept(common, cb_queue),
811 }
812 }
813
814 fn queue_incoming_conn(
816 &mut self,
817 common: &mut UnixSocketCommon,
818 from_address: Option<SockaddrUnix<libc::sockaddr_un>>,
819 peer: &Arc<AtomicRefCell<UnixSocket>>,
820 child_send_buffer: &Arc<AtomicRefCell<SharedBuf>>,
821 cb_queue: &mut CallbackQueue,
822 ) -> Result<&Arc<AtomicRefCell<UnixSocket>>, IncomingConnError> {
823 match self {
824 Self::ConnOrientedInitial(x) => x.as_mut().unwrap().queue_incoming_conn(
825 common,
826 from_address,
827 peer,
828 child_send_buffer,
829 cb_queue,
830 ),
831 Self::ConnOrientedListening(x) => x.as_mut().unwrap().queue_incoming_conn(
832 common,
833 from_address,
834 peer,
835 child_send_buffer,
836 cb_queue,
837 ),
838 Self::ConnOrientedConnected(x) => x.as_mut().unwrap().queue_incoming_conn(
839 common,
840 from_address,
841 peer,
842 child_send_buffer,
843 cb_queue,
844 ),
845 Self::ConnOrientedClosed(x) => x.as_mut().unwrap().queue_incoming_conn(
846 common,
847 from_address,
848 peer,
849 child_send_buffer,
850 cb_queue,
851 ),
852 Self::ConnLessInitial(x) => x.as_mut().unwrap().queue_incoming_conn(
853 common,
854 from_address,
855 peer,
856 child_send_buffer,
857 cb_queue,
858 ),
859 Self::ConnLessClosed(x) => x.as_mut().unwrap().queue_incoming_conn(
860 common,
861 from_address,
862 peer,
863 child_send_buffer,
864 cb_queue,
865 ),
866 }
867 }
868}
869
870trait Protocol
874where
875 Self: Sized + Into<ProtocolState>,
876{
877 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno>;
878 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno>;
879 fn refresh_file_state(
880 &self,
881 common: &mut UnixSocketCommon,
882 signals: FileSignals,
883 cb_queue: &mut CallbackQueue,
884 );
885
886 fn close(
887 self,
888 _common: &mut UnixSocketCommon,
889 _cb_queue: &mut CallbackQueue,
890 ) -> (ProtocolState, Result<(), SyscallError>) {
891 log::warn!("close() while in state {}", std::any::type_name::<Self>());
892 (self.into(), Err(Errno::EOPNOTSUPP.into()))
893 }
894
895 fn bind(
896 &mut self,
897 _common: &mut UnixSocketCommon,
898 _socket: &Arc<AtomicRefCell<UnixSocket>>,
899 _addr: Option<&SockaddrStorage>,
900 _rng: impl rand::Rng,
901 ) -> Result<(), SyscallError> {
902 log::warn!("bind() while in state {}", std::any::type_name::<Self>());
903 Err(Errno::EOPNOTSUPP.into())
904 }
905
906 fn sendmsg(
907 &mut self,
908 _common: &mut UnixSocketCommon,
909 _socket: &Arc<AtomicRefCell<UnixSocket>>,
910 _args: SendmsgArgs,
911 _mem: &mut MemoryManager,
912 _cb_queue: &mut CallbackQueue,
913 ) -> Result<libc::ssize_t, SyscallError> {
914 log::warn!("sendmsg() while in state {}", std::any::type_name::<Self>());
915 Err(Errno::EOPNOTSUPP.into())
916 }
917
918 fn recvmsg(
919 &mut self,
920 _common: &mut UnixSocketCommon,
921 _socket: &Arc<AtomicRefCell<UnixSocket>>,
922 _args: RecvmsgArgs,
923 _mem: &mut MemoryManager,
924 _cb_queue: &mut CallbackQueue,
925 ) -> Result<RecvmsgReturn, SyscallError> {
926 log::warn!("recvmsg() while in state {}", std::any::type_name::<Self>());
927 Err(Errno::EOPNOTSUPP.into())
928 }
929
930 fn inform_bytes_read(
931 &mut self,
932 _common: &mut UnixSocketCommon,
933 _num: u64,
934 _cb_queue: &mut CallbackQueue,
935 ) {
936 panic!(
937 "inform_bytes_read() while in state {}",
938 std::any::type_name::<Self>()
939 );
940 }
941
942 fn ioctl(
943 &mut self,
944 _common: &mut UnixSocketCommon,
945 _request: IoctlRequest,
946 _arg_ptr: ForeignPtr<()>,
947 _memory_manager: &mut MemoryManager,
948 ) -> SyscallResult {
949 log::warn!("ioctl() while in state {}", std::any::type_name::<Self>());
950 Err(Errno::EOPNOTSUPP.into())
951 }
952
953 fn listen(
954 self,
955 _common: &mut UnixSocketCommon,
956 _backlog: i32,
957 _cb_queue: &mut CallbackQueue,
958 ) -> (ProtocolState, Result<(), Errno>) {
959 log::warn!("listen() while in state {}", std::any::type_name::<Self>());
960 (self.into(), Err(Errno::EOPNOTSUPP))
961 }
962
963 fn connect(
964 self,
965 _common: &mut UnixSocketCommon,
966 _socket: &Arc<AtomicRefCell<UnixSocket>>,
967 _addr: &SockaddrStorage,
968 _cb_queue: &mut CallbackQueue,
969 ) -> (ProtocolState, Result<(), SyscallError>) {
970 log::warn!("connect() while in state {}", std::any::type_name::<Self>());
971 (self.into(), Err(Errno::EOPNOTSUPP.into()))
972 }
973
974 fn connect_unnamed(
975 self,
976 _common: &mut UnixSocketCommon,
977 _socket: &Arc<AtomicRefCell<UnixSocket>>,
978 _peer: Arc<AtomicRefCell<UnixSocket>>,
979 _cb_queue: &mut CallbackQueue,
980 ) -> (ProtocolState, Result<(), SyscallError>) {
981 log::warn!(
982 "connect_unnamed() while in state {}",
983 std::any::type_name::<Self>()
984 );
985 (self.into(), Err(Errno::EOPNOTSUPP.into()))
986 }
987
988 fn accept(
989 &mut self,
990 _common: &mut UnixSocketCommon,
991 _cb_queue: &mut CallbackQueue,
992 ) -> Result<OpenFile, SyscallError> {
993 log::warn!("accept() while in state {}", std::any::type_name::<Self>());
994 Err(Errno::EOPNOTSUPP.into())
995 }
996
997 fn queue_incoming_conn(
998 &mut self,
999 _common: &mut UnixSocketCommon,
1000 _from_address: Option<SockaddrUnix<libc::sockaddr_un>>,
1001 _peer: &Arc<AtomicRefCell<UnixSocket>>,
1002 _child_send_buffer: &Arc<AtomicRefCell<SharedBuf>>,
1003 _cb_queue: &mut CallbackQueue,
1004 ) -> Result<&Arc<AtomicRefCell<UnixSocket>>, IncomingConnError> {
1005 log::warn!(
1006 "queue_incoming_conn() while in state {}",
1007 std::any::type_name::<Self>()
1008 );
1009 Err(IncomingConnError::NotSupported)
1010 }
1011}
1012
1013impl Protocol for ConnOrientedInitial {
1014 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1015 Err(Errno::ENOTCONN)
1016 }
1017
1018 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1019 Ok(self.bound_addr)
1020 }
1021
1022 fn refresh_file_state(
1023 &self,
1024 common: &mut UnixSocketCommon,
1025 signals: FileSignals,
1026 cb_queue: &mut CallbackQueue,
1027 ) {
1028 assert!(!signals.contains(FileSignals::READ_BUFFER_GREW));
1029 common.update_state(
1030 FileState::all(),
1031 FileState::ACTIVE,
1032 signals,
1033 cb_queue,
1034 );
1035 }
1036
1037 fn close(
1038 self,
1039 common: &mut UnixSocketCommon,
1040 cb_queue: &mut CallbackQueue,
1041 ) -> (ProtocolState, Result<(), SyscallError>) {
1042 let new_state = ConnOrientedClosed {};
1043 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1044 (new_state.into(), common.close(cb_queue))
1045 }
1046
1047 fn bind(
1048 &mut self,
1049 common: &mut UnixSocketCommon,
1050 socket: &Arc<AtomicRefCell<UnixSocket>>,
1051 addr: Option<&SockaddrStorage>,
1052 rng: impl rand::Rng,
1053 ) -> Result<(), SyscallError> {
1054 if self.bound_addr.is_some() {
1056 return Err(Errno::EINVAL.into());
1057 }
1058
1059 self.bound_addr = Some(common.bind(socket, addr, rng)?);
1060 Ok(())
1061 }
1062
1063 fn sendmsg(
1064 &mut self,
1065 common: &mut UnixSocketCommon,
1066 _socket: &Arc<AtomicRefCell<UnixSocket>>,
1067 args: SendmsgArgs,
1068 _mem: &mut MemoryManager,
1069 _cb_queue: &mut CallbackQueue,
1070 ) -> Result<libc::ssize_t, SyscallError> {
1071 match (common.socket_type, args.addr) {
1072 (UnixSocketType::Stream, Some(_)) => Err(Errno::EOPNOTSUPP.into()),
1073 (UnixSocketType::Stream, None) => Err(Errno::ENOTCONN.into()),
1074 (UnixSocketType::SeqPacket, _) => Err(Errno::ENOTCONN.into()),
1075 (UnixSocketType::Dgram, _) => panic!(
1076 "A dgram unix socket is in the connection-oriented {:?} state",
1077 std::any::type_name::<Self>()
1078 ),
1079 }
1080 }
1081
1082 fn recvmsg(
1083 &mut self,
1084 common: &mut UnixSocketCommon,
1085 _socket: &Arc<AtomicRefCell<UnixSocket>>,
1086 _args: RecvmsgArgs,
1087 _mem: &mut MemoryManager,
1088 _cb_queue: &mut CallbackQueue,
1089 ) -> Result<RecvmsgReturn, SyscallError> {
1090 match common.socket_type {
1091 UnixSocketType::Stream => Err(Errno::EINVAL.into()),
1092 UnixSocketType::SeqPacket => Err(Errno::ENOTCONN.into()),
1093 UnixSocketType::Dgram => panic!(
1094 "A dgram unix socket is in the connection-oriented {:?} state",
1095 std::any::type_name::<Self>()
1096 ),
1097 }
1098 }
1099
1100 fn ioctl(
1101 &mut self,
1102 common: &mut UnixSocketCommon,
1103 request: IoctlRequest,
1104 arg_ptr: ForeignPtr<()>,
1105 memory_manager: &mut MemoryManager,
1106 ) -> SyscallResult {
1107 common.ioctl(request, arg_ptr, memory_manager)
1108 }
1109
1110 fn listen(
1111 self,
1112 common: &mut UnixSocketCommon,
1113 backlog: i32,
1114 cb_queue: &mut CallbackQueue,
1115 ) -> (ProtocolState, Result<(), Errno>) {
1116 let bound_addr = match self.bound_addr {
1118 Some(x) => x,
1119 None => return (self.into(), Err(Errno::EINVAL)),
1120 };
1121
1122 let new_state = ConnOrientedListening {
1123 bound_addr,
1124 queue: VecDeque::new(),
1125 queue_limit: backlog_to_queue_size(backlog),
1126 };
1127
1128 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1130
1131 (new_state.into(), Ok(()))
1132 }
1133
1134 fn connect(
1135 self,
1136 common: &mut UnixSocketCommon,
1137 socket: &Arc<AtomicRefCell<UnixSocket>>,
1138 addr: &SockaddrStorage,
1139 cb_queue: &mut CallbackQueue,
1140 ) -> (ProtocolState, Result<(), SyscallError>) {
1141 let Some(addr) = addr.as_unix() else {
1142 return (self.into(), Err(Errno::EINVAL.into()));
1143 };
1144
1145 let server = match lookup_address(
1147 &common.namespace.borrow(),
1148 common.socket_type,
1149 &addr.as_ref(),
1150 ) {
1151 Ok(x) => x,
1152 Err(e) => return (self.into(), Err(e.into())),
1153 };
1154
1155 let server_mut = &mut *server.borrow_mut();
1161 let peer = match server_mut.protocol_state.queue_incoming_conn(
1162 &mut server_mut.common,
1163 self.bound_addr,
1164 socket,
1165 &common.recv_buffer,
1166 cb_queue,
1167 ) {
1168 Ok(peer) => peer,
1169 Err(IncomingConnError::NotSupported) => {
1170 return (self.into(), Err(Errno::ECONNREFUSED.into()));
1171 }
1172 Err(IncomingConnError::QueueFull) => {
1173 if common.status.contains(FileStatus::NONBLOCK) {
1174 return (self.into(), Err(Errno::EWOULDBLOCK.into()));
1175 }
1176
1177 let err = SyscallError::new_blocked_on_file(
1179 File::Socket(Socket::Unix(Arc::clone(&server))),
1180 FileState::SOCKET_ALLOWING_CONNECT | FileState::CLOSED,
1181 server_mut.supports_sa_restart(),
1182 );
1183
1184 return (self.into(), Err(err));
1185 }
1186 };
1187
1188 let send_buffer = Arc::clone(peer.borrow().recv_buffer());
1190
1191 let weak = Arc::downgrade(socket);
1192 let send_buffer_handle = send_buffer.borrow_mut().add_listener(
1193 BufferState::WRITABLE | BufferState::NO_READERS,
1194 BufferSignals::empty(),
1195 move |_, _, cb_queue| {
1196 if let Some(socket) = weak.upgrade() {
1197 socket
1198 .borrow_mut()
1199 .refresh_file_state(FileSignals::empty(), cb_queue);
1200 }
1201 },
1202 );
1203
1204 let writer_handle = send_buffer.borrow_mut().add_writer(cb_queue);
1206
1207 let weak = Arc::downgrade(socket);
1208 let recv_buffer_handle = common.recv_buffer.borrow_mut().add_listener(
1209 BufferState::READABLE | BufferState::NO_WRITERS,
1210 BufferSignals::BUFFER_GREW,
1211 move |_, signals, cb_queue| {
1212 if let Some(socket) = weak.upgrade() {
1213 let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
1214 FileSignals::READ_BUFFER_GREW
1215 } else {
1216 FileSignals::empty()
1217 };
1218 socket.borrow_mut().refresh_file_state(signals, cb_queue);
1219 }
1220 },
1221 );
1222
1223 let reader_handle = common.recv_buffer.borrow_mut().add_reader(cb_queue);
1225
1226 let new_state = ConnOrientedConnected {
1227 bound_addr: self.bound_addr,
1228 peer_addr: Some(addr.into_owned()),
1229 peer: Arc::clone(peer),
1230 reader_handle,
1231 writer_handle,
1232 _recv_buffer_handle: recv_buffer_handle,
1233 _send_buffer_handle: send_buffer_handle,
1234 };
1235
1236 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1237
1238 (new_state.into(), Ok(()))
1239 }
1240
1241 fn connect_unnamed(
1242 self,
1243 common: &mut UnixSocketCommon,
1244 socket: &Arc<AtomicRefCell<UnixSocket>>,
1245 peer: Arc<AtomicRefCell<UnixSocket>>,
1246 cb_queue: &mut CallbackQueue,
1247 ) -> (ProtocolState, Result<(), SyscallError>) {
1248 assert!(self.bound_addr.is_none());
1249
1250 let send_buffer_handle;
1251 let writer_handle;
1252
1253 {
1254 let peer_ref = peer.borrow();
1255 let send_buffer = peer_ref.recv_buffer();
1256
1257 let weak = Arc::downgrade(socket);
1258 send_buffer_handle = send_buffer.borrow_mut().add_listener(
1259 BufferState::WRITABLE | BufferState::NO_READERS,
1260 BufferSignals::empty(),
1261 move |_, _, cb_queue| {
1262 if let Some(socket) = weak.upgrade() {
1263 socket
1264 .borrow_mut()
1265 .refresh_file_state(FileSignals::empty(), cb_queue);
1266 }
1267 },
1268 );
1269
1270 writer_handle = send_buffer.borrow_mut().add_writer(cb_queue);
1272 }
1273
1274 let weak = Arc::downgrade(socket);
1275 let recv_buffer_handle = common.recv_buffer.borrow_mut().add_listener(
1276 BufferState::READABLE | BufferState::NO_WRITERS,
1277 BufferSignals::BUFFER_GREW,
1278 move |_, signals, cb_queue| {
1279 if let Some(socket) = weak.upgrade() {
1280 let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
1281 FileSignals::READ_BUFFER_GREW
1282 } else {
1283 FileSignals::empty()
1284 };
1285 socket.borrow_mut().refresh_file_state(signals, cb_queue);
1286 }
1287 },
1288 );
1289
1290 let reader_handle = common.recv_buffer.borrow_mut().add_reader(cb_queue);
1292
1293 let new_state = ConnOrientedConnected {
1294 bound_addr: None,
1295 peer_addr: None,
1296 peer,
1297 reader_handle,
1298 writer_handle,
1299 _recv_buffer_handle: recv_buffer_handle,
1300 _send_buffer_handle: send_buffer_handle,
1301 };
1302
1303 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1304
1305 (new_state.into(), Ok(()))
1306 }
1307
1308 fn accept(
1309 &mut self,
1310 _common: &mut UnixSocketCommon,
1311 _cb_queue: &mut CallbackQueue,
1312 ) -> Result<OpenFile, SyscallError> {
1313 log::warn!("accept() while in state {}", std::any::type_name::<Self>());
1314 Err(Errno::EINVAL.into())
1315 }
1316}
1317
1318impl Protocol for ConnOrientedListening {
1319 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1320 Err(Errno::ENOTCONN)
1321 }
1322
1323 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1324 Ok(Some(self.bound_addr))
1325 }
1326
1327 fn refresh_file_state(
1328 &self,
1329 common: &mut UnixSocketCommon,
1330 signals: FileSignals,
1331 cb_queue: &mut CallbackQueue,
1332 ) {
1333 let mut new_state = FileState::ACTIVE;
1334
1335 new_state.set(FileState::READABLE, !self.queue.is_empty());
1337
1338 new_state.set(FileState::SOCKET_ALLOWING_CONNECT, !self.queue_is_full());
1340
1341 common.update_state(
1347 FileState::all(),
1348 new_state,
1349 signals,
1350 cb_queue,
1351 );
1352 }
1353
1354 fn close(
1355 self,
1356 common: &mut UnixSocketCommon,
1357 cb_queue: &mut CallbackQueue,
1358 ) -> (ProtocolState, Result<(), SyscallError>) {
1359 for sock in self.queue {
1360 if let Err(e) = sock.borrow_mut().close(cb_queue) {
1362 log::warn!("Unexpected error while closing queued unix socket: {:?}", e);
1363 }
1364 }
1365
1366 let new_state = ConnOrientedClosed {};
1367 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1368 (new_state.into(), common.close(cb_queue))
1369 }
1370
1371 fn listen(
1372 mut self,
1373 common: &mut UnixSocketCommon,
1374 backlog: i32,
1375 cb_queue: &mut CallbackQueue,
1376 ) -> (ProtocolState, Result<(), Errno>) {
1377 self.queue_limit = backlog_to_queue_size(backlog);
1378
1379 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1381
1382 (self.into(), Ok(()))
1383 }
1384
1385 fn connect(
1386 self,
1387 _common: &mut UnixSocketCommon,
1388 _socket: &Arc<AtomicRefCell<UnixSocket>>,
1389 _addr: &SockaddrStorage,
1390 _cb_queue: &mut CallbackQueue,
1391 ) -> (ProtocolState, Result<(), SyscallError>) {
1392 (self.into(), Err(Errno::EINVAL.into()))
1393 }
1394
1395 fn accept(
1396 &mut self,
1397 common: &mut UnixSocketCommon,
1398 cb_queue: &mut CallbackQueue,
1399 ) -> Result<OpenFile, SyscallError> {
1400 let child_socket = match self.queue.pop_front() {
1401 Some(x) => x,
1402 None => return Err(Errno::EWOULDBLOCK.into()),
1403 };
1404
1405 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1407
1408 Ok(OpenFile::new(File::Socket(Socket::Unix(child_socket))))
1409 }
1410
1411 fn queue_incoming_conn(
1412 &mut self,
1413 common: &mut UnixSocketCommon,
1414 from_address: Option<SockaddrUnix<libc::sockaddr_un>>,
1415 peer: &Arc<AtomicRefCell<UnixSocket>>,
1416 child_send_buffer: &Arc<AtomicRefCell<SharedBuf>>,
1417 cb_queue: &mut CallbackQueue,
1418 ) -> Result<&Arc<AtomicRefCell<UnixSocket>>, IncomingConnError> {
1419 if self.queue.len() >= self.queue_limit.try_into().unwrap() {
1420 assert!(!common.state.contains(FileState::SOCKET_ALLOWING_CONNECT));
1421 return Err(IncomingConnError::QueueFull);
1422 }
1423
1424 assert!(common.state.contains(FileState::SOCKET_ALLOWING_CONNECT));
1425
1426 let child_socket = UnixSocket::new(
1427 common.status,
1429 common.socket_type,
1430 &common.namespace,
1431 );
1432
1433 let child_recv_buffer = Arc::clone(&child_socket.borrow_mut().common.recv_buffer);
1434
1435 let weak = Arc::downgrade(&child_socket);
1436 let send_buffer_handle = child_send_buffer.borrow_mut().add_listener(
1437 BufferState::WRITABLE | BufferState::NO_READERS,
1438 BufferSignals::empty(),
1439 move |_, _, cb_queue| {
1440 if let Some(socket) = weak.upgrade() {
1441 socket
1442 .borrow_mut()
1443 .refresh_file_state(FileSignals::empty(), cb_queue);
1444 }
1445 },
1446 );
1447
1448 let writer_handle = child_send_buffer.borrow_mut().add_writer(cb_queue);
1450
1451 let weak = Arc::downgrade(&child_socket);
1452 let recv_buffer_handle = child_recv_buffer.borrow_mut().add_listener(
1453 BufferState::READABLE | BufferState::NO_WRITERS,
1454 BufferSignals::BUFFER_GREW,
1455 move |_, signals, cb_queue| {
1456 if let Some(socket) = weak.upgrade() {
1457 let signals = if signals.contains(BufferSignals::BUFFER_GREW) {
1458 FileSignals::READ_BUFFER_GREW
1459 } else {
1460 FileSignals::empty()
1461 };
1462 socket.borrow_mut().refresh_file_state(signals, cb_queue);
1463 }
1464 },
1465 );
1466
1467 let reader_handle = child_recv_buffer.borrow_mut().add_reader(cb_queue);
1469
1470 let new_child_state = ConnOrientedConnected {
1471 bound_addr: Some(self.bound_addr),
1473 peer_addr: from_address,
1474 peer: Arc::clone(peer),
1475 reader_handle,
1476 writer_handle,
1477 _recv_buffer_handle: recv_buffer_handle,
1478 _send_buffer_handle: send_buffer_handle,
1479 };
1480
1481 child_socket.borrow_mut().protocol_state = new_child_state.into();
1483
1484 let weak = Arc::downgrade(&child_socket);
1486 cb_queue.add(move |cb_queue| {
1487 if let Some(child_socket) = weak.upgrade() {
1488 child_socket
1489 .borrow_mut()
1490 .refresh_file_state(FileSignals::empty(), cb_queue);
1491 }
1492 });
1493
1494 self.queue.push_back(child_socket);
1496
1497 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1499
1500 Ok(self.queue.back().unwrap())
1502 }
1503}
1504
1505impl Protocol for ConnOrientedConnected {
1506 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1507 Ok(self.peer_addr)
1508 }
1509
1510 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1511 Ok(self.bound_addr)
1512 }
1513
1514 fn refresh_file_state(
1515 &self,
1516 common: &mut UnixSocketCommon,
1517 signals: FileSignals,
1518 cb_queue: &mut CallbackQueue,
1519 ) {
1520 let mut new_state = FileState::ACTIVE;
1521
1522 {
1523 let recv_buffer = common.recv_buffer.borrow();
1524 let peer = self.peer.borrow();
1525 let send_buffer = peer.recv_buffer().borrow();
1526
1527 new_state.set(
1528 FileState::READABLE,
1529 recv_buffer.has_data() || recv_buffer.num_writers() == 0,
1530 );
1531 new_state.set(
1532 FileState::WRITABLE,
1533 common.sent_len < common.send_limit || send_buffer.num_readers() == 0,
1534 );
1535 }
1536
1537 common.update_state(
1538 FileState::all(),
1539 new_state,
1540 signals,
1541 cb_queue,
1542 );
1543 }
1544
1545 fn close(
1546 self,
1547 common: &mut UnixSocketCommon,
1548 cb_queue: &mut CallbackQueue,
1549 ) -> (ProtocolState, Result<(), SyscallError>) {
1550 common
1552 .recv_buffer
1553 .borrow_mut()
1554 .remove_reader(self.reader_handle, cb_queue);
1555
1556 self.peer
1558 .borrow()
1559 .recv_buffer()
1560 .borrow_mut()
1561 .remove_writer(self.writer_handle, cb_queue);
1562
1563 let new_state = ConnOrientedClosed {};
1564 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1565 (new_state.into(), common.close(cb_queue))
1566 }
1567
1568 fn sendmsg(
1569 &mut self,
1570 common: &mut UnixSocketCommon,
1571 socket: &Arc<AtomicRefCell<UnixSocket>>,
1572 args: SendmsgArgs,
1573 mem: &mut MemoryManager,
1574 cb_queue: &mut CallbackQueue,
1575 ) -> Result<libc::ssize_t, SyscallError> {
1576 if !args.control_ptr.ptr().is_null() {
1577 log::debug!("Unix sockets don't yet support control data for sendmsg()");
1578 return Err(Errno::EINVAL.into());
1579 }
1580
1581 let recv_socket = common.resolve_destination(Some(&self.peer), args.addr)?;
1582 let rv = common.sendmsg(socket, args.iovs, args.flags, &recv_socket, mem, cb_queue)?;
1583
1584 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1585
1586 Ok(rv.try_into().unwrap())
1587 }
1588
1589 fn recvmsg(
1590 &mut self,
1591 common: &mut UnixSocketCommon,
1592 socket: &Arc<AtomicRefCell<UnixSocket>>,
1593 args: RecvmsgArgs,
1594 mem: &mut MemoryManager,
1595 cb_queue: &mut CallbackQueue,
1596 ) -> Result<RecvmsgReturn, SyscallError> {
1597 if !args.control_ptr.ptr().is_null() {
1598 log::debug!("Unix sockets don't yet support control data for recvmsg()");
1599 return Err(Errno::EINVAL.into());
1600 }
1601
1602 let (rv, num_removed_from_buf, msg_flags) =
1603 common.recvmsg(socket, args.iovs, args.flags, mem, cb_queue)?;
1604 let num_removed_from_buf = u64::try_from(num_removed_from_buf).unwrap();
1605
1606 if num_removed_from_buf > 0 {
1607 let peer = Arc::clone(&self.peer);
1609 cb_queue.add(move |cb_queue| {
1610 peer.borrow_mut()
1611 .inform_bytes_read(num_removed_from_buf, cb_queue);
1612 });
1613 }
1614
1615 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1616
1617 Ok(RecvmsgReturn {
1618 return_val: rv.try_into().unwrap(),
1619 addr: self.peer_addr.map(Into::into),
1620 msg_flags,
1621 control_len: 0,
1622 })
1623 }
1624
1625 fn inform_bytes_read(
1626 &mut self,
1627 common: &mut UnixSocketCommon,
1628 num: u64,
1629 cb_queue: &mut CallbackQueue,
1630 ) {
1631 common.sent_len = common.sent_len.checked_sub(num).unwrap();
1632 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1633 }
1634
1635 fn ioctl(
1636 &mut self,
1637 common: &mut UnixSocketCommon,
1638 request: IoctlRequest,
1639 arg_ptr: ForeignPtr<()>,
1640 memory_manager: &mut MemoryManager,
1641 ) -> SyscallResult {
1642 common.ioctl(request, arg_ptr, memory_manager)
1643 }
1644
1645 fn accept(
1646 &mut self,
1647 _common: &mut UnixSocketCommon,
1648 _cb_queue: &mut CallbackQueue,
1649 ) -> Result<OpenFile, SyscallError> {
1650 log::warn!("accept() while in state {}", std::any::type_name::<Self>());
1651 Err(Errno::EINVAL.into())
1652 }
1653}
1654
1655impl Protocol for ConnOrientedClosed {
1656 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1657 Err(Errno::ENOTCONN)
1658 }
1659
1660 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1661 Err(Errno::EBADFD)
1662 }
1663
1664 fn refresh_file_state(
1665 &self,
1666 common: &mut UnixSocketCommon,
1667 signals: FileSignals,
1668 cb_queue: &mut CallbackQueue,
1669 ) {
1670 assert!(!signals.contains(FileSignals::READ_BUFFER_GREW));
1671 common.update_state(
1672 FileState::all(),
1673 FileState::CLOSED,
1674 signals,
1675 cb_queue,
1676 );
1677 }
1678
1679 fn close(
1680 self,
1681 _common: &mut UnixSocketCommon,
1682 _cb_queue: &mut CallbackQueue,
1683 ) -> (ProtocolState, Result<(), SyscallError>) {
1684 panic!("Trying to close an already closed socket");
1686 }
1687
1688 fn inform_bytes_read(
1689 &mut self,
1690 _common: &mut UnixSocketCommon,
1691 _num: u64,
1692 _cb_queue: &mut CallbackQueue,
1693 ) {
1694 }
1696}
1697
1698impl Protocol for ConnLessInitial {
1699 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1700 match self.peer {
1701 Some(_) => Ok(self.peer_addr),
1702 None => Err(Errno::ENOTCONN),
1703 }
1704 }
1705
1706 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1707 Ok(self.bound_addr)
1708 }
1709
1710 fn refresh_file_state(
1711 &self,
1712 common: &mut UnixSocketCommon,
1713 signals: FileSignals,
1714 cb_queue: &mut CallbackQueue,
1715 ) {
1716 let mut new_state = FileState::ACTIVE;
1717
1718 {
1719 let recv_buffer = common.recv_buffer.borrow();
1720
1721 new_state.set(FileState::READABLE, recv_buffer.has_data());
1722 new_state.set(FileState::WRITABLE, common.sent_len < common.send_limit);
1723 }
1724
1725 common.update_state(
1726 FileState::all(),
1727 new_state,
1728 signals,
1729 cb_queue,
1730 );
1731 }
1732
1733 fn close(
1734 self,
1735 common: &mut UnixSocketCommon,
1736 cb_queue: &mut CallbackQueue,
1737 ) -> (ProtocolState, Result<(), SyscallError>) {
1738 common
1740 .recv_buffer
1741 .borrow_mut()
1742 .remove_reader(self.reader_handle, cb_queue);
1743
1744 for byte_data in self.recv_data.into_iter() {
1745 cb_queue.add(move |cb_queue| {
1747 byte_data
1748 .from_socket
1749 .borrow_mut()
1750 .inform_bytes_read(byte_data.num_bytes, cb_queue);
1751 });
1752 }
1753
1754 let new_state = ConnLessClosed {};
1755 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1756 (new_state.into(), common.close(cb_queue))
1757 }
1758
1759 fn bind(
1760 &mut self,
1761 common: &mut UnixSocketCommon,
1762 socket: &Arc<AtomicRefCell<UnixSocket>>,
1763 addr: Option<&SockaddrStorage>,
1764 rng: impl rand::Rng,
1765 ) -> Result<(), SyscallError> {
1766 if self.bound_addr.is_some() {
1768 return Err(Errno::EINVAL.into());
1769 }
1770
1771 self.bound_addr = Some(common.bind(socket, addr, rng)?);
1772 Ok(())
1773 }
1774
1775 fn sendmsg(
1776 &mut self,
1777 common: &mut UnixSocketCommon,
1778 socket: &Arc<AtomicRefCell<UnixSocket>>,
1779 args: SendmsgArgs,
1780 mem: &mut MemoryManager,
1781 cb_queue: &mut CallbackQueue,
1782 ) -> Result<libc::ssize_t, SyscallError> {
1783 if !args.control_ptr.ptr().is_null() {
1784 log::debug!("Unix sockets don't yet support control data for sendmsg()");
1785 return Err(Errno::EINVAL.into());
1786 }
1787
1788 let recv_socket = common.resolve_destination(self.peer.as_ref(), args.addr)?;
1789 let rv = common.sendmsg(socket, args.iovs, args.flags, &recv_socket, mem, cb_queue)?;
1790
1791 let byte_data = ByteData {
1792 from_socket: self.this_socket.upgrade().unwrap(),
1793 from_addr: self.bound_addr,
1794 num_bytes: rv.try_into().unwrap(),
1795 };
1796
1797 match &mut recv_socket.borrow_mut().protocol_state {
1798 ProtocolState::ConnLessInitial(state) => {
1799 state.as_mut().unwrap().recv_data.push_back(byte_data);
1800 }
1801 _ => panic!(
1802 "Sending bytes to a socket in state {}",
1803 std::any::type_name::<Self>()
1804 ),
1805 }
1806
1807 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1808
1809 Ok(rv.try_into().unwrap())
1810 }
1811
1812 fn recvmsg(
1813 &mut self,
1814 common: &mut UnixSocketCommon,
1815 socket: &Arc<AtomicRefCell<UnixSocket>>,
1816 args: RecvmsgArgs,
1817 mem: &mut MemoryManager,
1818 cb_queue: &mut CallbackQueue,
1819 ) -> Result<RecvmsgReturn, SyscallError> {
1820 if !args.control_ptr.ptr().is_null() {
1821 log::debug!("Unix sockets don't yet support control data for recvmsg()");
1822 return Err(Errno::EINVAL.into());
1823 }
1824
1825 let (rv, num_removed_from_buf, msg_flags) =
1826 common.recvmsg(socket, args.iovs, args.flags, mem, cb_queue)?;
1827 let num_removed_from_buf = u64::try_from(num_removed_from_buf).unwrap();
1828
1829 let byte_data = self.recv_data.pop_front().unwrap();
1830 assert!(num_removed_from_buf == byte_data.num_bytes);
1831
1832 cb_queue.add(move |cb_queue| {
1834 byte_data
1835 .from_socket
1836 .borrow_mut()
1837 .inform_bytes_read(byte_data.num_bytes, cb_queue);
1838 });
1839
1840 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1841
1842 Ok(RecvmsgReturn {
1843 return_val: rv.try_into().unwrap(),
1844 addr: byte_data.from_addr.map(Into::into),
1845 msg_flags,
1846 control_len: 0,
1847 })
1848 }
1849
1850 fn inform_bytes_read(
1851 &mut self,
1852 common: &mut UnixSocketCommon,
1853 num: u64,
1854 cb_queue: &mut CallbackQueue,
1855 ) {
1856 common.sent_len = common.sent_len.checked_sub(num).unwrap();
1857 self.refresh_file_state(common, FileSignals::empty(), cb_queue);
1858 }
1859
1860 fn ioctl(
1861 &mut self,
1862 common: &mut UnixSocketCommon,
1863 request: IoctlRequest,
1864 arg_ptr: ForeignPtr<()>,
1865 memory_manager: &mut MemoryManager,
1866 ) -> SyscallResult {
1867 common.ioctl(request, arg_ptr, memory_manager)
1868 }
1869
1870 fn connect(
1871 self,
1872 common: &mut UnixSocketCommon,
1873 _socket: &Arc<AtomicRefCell<UnixSocket>>,
1874 addr: &SockaddrStorage,
1875 cb_queue: &mut CallbackQueue,
1876 ) -> (ProtocolState, Result<(), SyscallError>) {
1877 let Some(addr) = addr.as_unix() else {
1879 return (self.into(), Err(Errno::EINVAL.into()));
1880 };
1881
1882 let peer = match lookup_address(&common.namespace.borrow(), common.socket_type, &addr) {
1884 Ok(x) => x,
1885 Err(e) => return (self.into(), Err(e.into())),
1886 };
1887
1888 let new_state = Self {
1889 peer_addr: Some(addr.into_owned()),
1890 peer: Some(peer),
1891 ..self
1892 };
1893
1894 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1895
1896 (new_state.into(), Ok(()))
1897 }
1898
1899 fn connect_unnamed(
1900 self,
1901 common: &mut UnixSocketCommon,
1902 _socket: &Arc<AtomicRefCell<UnixSocket>>,
1903 peer: Arc<AtomicRefCell<UnixSocket>>,
1904 cb_queue: &mut CallbackQueue,
1905 ) -> (ProtocolState, Result<(), SyscallError>) {
1906 assert!(self.peer_addr.is_none());
1907 assert!(self.bound_addr.is_none());
1908
1909 let new_state = Self {
1910 bound_addr: None,
1911 peer_addr: None,
1912 peer: Some(peer),
1913 ..self
1914 };
1915
1916 new_state.refresh_file_state(common, FileSignals::empty(), cb_queue);
1917
1918 (new_state.into(), Ok(()))
1919 }
1920}
1921
1922impl Protocol for ConnLessClosed {
1923 fn peer_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1924 Ok(None)
1925 }
1926
1927 fn bound_address(&self) -> Result<Option<SockaddrUnix<libc::sockaddr_un>>, Errno> {
1928 Ok(None)
1929 }
1930
1931 fn refresh_file_state(
1932 &self,
1933 common: &mut UnixSocketCommon,
1934 signals: FileSignals,
1935 cb_queue: &mut CallbackQueue,
1936 ) {
1937 assert!(!signals.contains(FileSignals::READ_BUFFER_GREW));
1938 common.update_state(
1939 FileState::all(),
1940 FileState::CLOSED,
1941 signals,
1942 cb_queue,
1943 );
1944 }
1945
1946 fn close(
1947 self,
1948 _common: &mut UnixSocketCommon,
1949 _cb_queue: &mut CallbackQueue,
1950 ) -> (ProtocolState, Result<(), SyscallError>) {
1951 panic!("Trying to close an already closed socket");
1953 }
1954
1955 fn inform_bytes_read(
1956 &mut self,
1957 _common: &mut UnixSocketCommon,
1958 _num: u64,
1959 _cb_queue: &mut CallbackQueue,
1960 ) {
1961 }
1963}
1964
1965struct UnixSocketCommon {
1967 recv_buffer: Arc<AtomicRefCell<SharedBuf>>,
1968 send_limit: u64,
1970 sent_len: u64,
1972 event_source: StateEventSource,
1973 state: FileState,
1974 status: FileStatus,
1975 socket_type: UnixSocketType,
1976 namespace: Arc<AtomicRefCell<AbstractUnixNamespace>>,
1977 has_open_file: bool,
1980}
1981
1982impl UnixSocketCommon {
1983 pub fn supports_sa_restart(&self) -> bool {
1984 true
1985 }
1986
1987 pub fn close(&mut self, cb_queue: &mut CallbackQueue) -> Result<(), SyscallError> {
1988 if !self.state.contains(FileState::CLOSED) {
1990 self.update_state(
1993 FileState::all(),
1994 FileState::CLOSED,
1995 FileSignals::empty(),
1996 cb_queue,
1997 );
1998
1999 debug_panic!("When closing a unix socket, the CLOSED flag was not set");
2001 }
2002
2003 Ok(())
2004 }
2005
2006 pub fn bind(
2007 &mut self,
2008 socket: &Arc<AtomicRefCell<UnixSocket>>,
2009 addr: Option<&SockaddrStorage>,
2010 rng: impl rand::Rng,
2011 ) -> Result<SockaddrUnix<libc::sockaddr_un>, SyscallError> {
2012 let Some(addr) = addr.and_then(|x| x.as_unix()) else {
2014 log::warn!(
2015 "Attempted to bind unix socket to non-unix address {:?}",
2016 addr
2017 );
2018 return Err(Errno::EINVAL.into());
2019 };
2020
2021 let bound_addr = if let Some(name) = addr.as_abstract() {
2023 let namespace = Arc::clone(&self.namespace);
2025 match AbstractUnixNamespace::bind(
2026 &namespace,
2027 self.socket_type,
2028 name.to_vec(),
2029 socket,
2030 &mut self.event_source,
2031 ) {
2032 Ok(()) => addr.into_owned(),
2033 Err(_) => return Err(Errno::EADDRINUSE.into()),
2035 }
2036 } else if addr.is_unnamed() {
2037 let namespace = Arc::clone(&self.namespace);
2039 match AbstractUnixNamespace::autobind(
2040 &namespace,
2041 self.socket_type,
2042 socket,
2043 &mut self.event_source,
2044 rng,
2045 ) {
2046 Ok(ref name) => SockaddrUnix::new_abstract(name).unwrap(),
2047 Err(_) => return Err(Errno::EADDRINUSE.into()),
2048 }
2049 } else {
2050 log::warn!("Only abstract names are currently supported for unix sockets");
2051 return Err(Errno::ENOTSUP.into());
2052 };
2053
2054 Ok(bound_addr)
2055 }
2056
2057 pub fn resolve_destination(
2058 &self,
2059 peer: Option<&Arc<AtomicRefCell<UnixSocket>>>,
2060 addr: Option<SockaddrStorage>,
2061 ) -> Result<Arc<AtomicRefCell<UnixSocket>>, SyscallError> {
2062 let addr = match addr {
2063 Some(ref addr) => Some(addr.as_unix().ok_or(Errno::EINVAL)?),
2064 None => None,
2065 };
2066
2067 let peer = match (peer, addr) {
2070 (Some(peer), Some(_addr)) => match self.socket_type {
2072 UnixSocketType::Stream => return Err(Errno::EISCONN.into()),
2073 UnixSocketType::SeqPacket => Some(peer),
2075 UnixSocketType::Dgram => None,
2076 },
2077 (Some(peer), None) => Some(peer),
2079 (None, Some(_addr)) => match self.socket_type {
2081 UnixSocketType::Stream => return Err(Errno::EOPNOTSUPP.into()),
2082 UnixSocketType::SeqPacket => return Err(Errno::ENOTCONN.into()),
2083 UnixSocketType::Dgram => None,
2084 },
2085 (None, None) => return Err(Errno::ENOTCONN.into()),
2087 };
2088
2089 let peer = match peer {
2091 Some(x) => Arc::clone(x),
2092 None => {
2093 let recv_socket =
2095 lookup_address(&self.namespace.borrow(), self.socket_type, &addr.unwrap())?;
2096 Arc::clone(&recv_socket)
2098 }
2099 };
2100
2101 Ok(peer)
2102 }
2103
2104 pub fn sendmsg(
2105 &mut self,
2106 socket: &Arc<AtomicRefCell<UnixSocket>>,
2107 iovs: &[IoVec],
2108 flags: libc::c_int,
2109 peer: &Arc<AtomicRefCell<UnixSocket>>,
2110 mem: &mut MemoryManager,
2111 cb_queue: &mut CallbackQueue,
2112 ) -> Result<usize, SyscallError> {
2113 let supported_flags = MsgFlags::MSG_DONTWAIT | MsgFlags::MSG_NOSIGNAL | MsgFlags::MSG_TRUNC;
2120
2121 let Some(mut flags) = MsgFlags::from_bits(flags) else {
2124 log::warn!("Unrecognized send flags: {:#b}", flags);
2125 return Err(Errno::EINVAL.into());
2126 };
2127 if flags.intersects(!supported_flags) {
2128 log::warn!("Unsupported send flags: {:?}", flags);
2129 return Err(Errno::EINVAL.into());
2130 }
2131
2132 if self.status.contains(FileStatus::NONBLOCK) {
2133 flags.insert(MsgFlags::MSG_DONTWAIT);
2134 }
2135
2136 let result = (|| {
2138 let peer_ref = peer.borrow();
2139 let mut send_buffer = peer_ref.recv_buffer().borrow_mut();
2140
2141 if send_buffer.num_readers() == 0 {
2143 return Err(match self.socket_type {
2144 UnixSocketType::Stream | UnixSocketType::SeqPacket => Errno::EPIPE,
2146 UnixSocketType::Dgram => Errno::ECONNREFUSED,
2148 });
2149 }
2150
2151 let len = iovs.iter().map(|x| x.len).sum::<libc::size_t>();
2152
2153 let space_available = self
2156 .send_limit
2157 .saturating_sub(self.sent_len)
2158 .try_into()
2159 .unwrap();
2160
2161 if space_available == 0 {
2162 return Err(Errno::EAGAIN);
2163 }
2164
2165 let len = match self.socket_type {
2166 UnixSocketType::Stream => std::cmp::min(len, space_available),
2167 UnixSocketType::Dgram | UnixSocketType::SeqPacket => {
2168 if len <= space_available {
2169 len
2170 } else if len <= self.send_limit.try_into().unwrap() {
2171 return Err(Errno::EAGAIN);
2173 } else {
2174 return Err(Errno::EMSGSIZE);
2176 }
2177 }
2178 };
2179
2180 let reader = IoVecReader::new(iovs, mem);
2181 let reader = reader.take(len.try_into().unwrap());
2182
2183 let num_copied = match self.socket_type {
2184 UnixSocketType::Stream => {
2185 if len == 0 {
2186 0
2187 } else {
2188 send_buffer
2189 .write_stream(reader, len, cb_queue)
2190 .map_err(|e| Errno::try_from(e).unwrap())?
2191 }
2192 }
2193 UnixSocketType::Dgram | UnixSocketType::SeqPacket => {
2194 send_buffer
2195 .write_packet(reader, len, cb_queue)
2196 .map_err(|e| Errno::try_from(e).unwrap())?;
2197 len
2198 }
2199 };
2200
2201 self.sent_len += u64::try_from(num_copied).unwrap();
2203
2204 Ok(num_copied)
2205 })();
2206
2207 if result.as_ref().err() == Some(&Errno::EWOULDBLOCK)
2209 && !flags.contains(MsgFlags::MSG_DONTWAIT)
2210 {
2211 return Err(SyscallError::new_blocked_on_file(
2212 File::Socket(Socket::Unix(socket.clone())),
2213 FileState::WRITABLE,
2214 self.supports_sa_restart(),
2215 ));
2216 }
2217
2218 Ok(result?)
2219 }
2220
2221 pub fn recvmsg(
2222 &mut self,
2223 socket: &Arc<AtomicRefCell<UnixSocket>>,
2224 iovs: &[IoVec],
2225 flags: libc::c_int,
2226 mem: &mut MemoryManager,
2227 cb_queue: &mut CallbackQueue,
2228 ) -> Result<(usize, usize, libc::c_int), SyscallError> {
2229 let supported_flags = MsgFlags::MSG_DONTWAIT | MsgFlags::MSG_TRUNC;
2230
2231 let Some(mut flags) = MsgFlags::from_bits(flags) else {
2234 log::warn!("Unrecognized recv flags: {:#b}", flags);
2235 return Err(Errno::EINVAL.into());
2236 };
2237 if flags.intersects(!supported_flags) {
2238 log::warn!("Unsupported recv flags: {:?}", flags);
2239 return Err(Errno::EINVAL.into());
2240 }
2241
2242 if self.status.contains(FileStatus::NONBLOCK) {
2243 flags.insert(MsgFlags::MSG_DONTWAIT);
2244 }
2245
2246 let result = (|| {
2248 let mut recv_buffer = self.recv_buffer.borrow_mut();
2249
2250 if !recv_buffer.has_data()
2255 && (self.socket_type == UnixSocketType::Dgram || recv_buffer.num_writers() > 0)
2256 {
2257 return Err(Errno::EWOULDBLOCK);
2259 }
2260
2261 let writer = IoVecWriter::new(iovs, mem);
2262
2263 let (num_copied, num_removed_from_buf) = recv_buffer
2264 .read(writer, cb_queue)
2265 .map_err(|e| Errno::try_from(e).unwrap())?;
2266
2267 let mut msg_flags = 0;
2268
2269 if flags.contains(MsgFlags::MSG_TRUNC)
2270 && [UnixSocketType::Dgram, UnixSocketType::SeqPacket].contains(&self.socket_type)
2271 {
2272 if num_copied < num_removed_from_buf {
2273 msg_flags |= libc::MSG_TRUNC;
2274 }
2275
2276 Ok((num_removed_from_buf, num_removed_from_buf, msg_flags))
2279 } else {
2280 Ok((num_copied, num_removed_from_buf, msg_flags))
2283 }
2284 })();
2285
2286 if result.as_ref().err() == Some(&Errno::EWOULDBLOCK)
2288 && !flags.contains(MsgFlags::MSG_DONTWAIT)
2289 {
2290 return Err(SyscallError::new_blocked_on_file(
2291 File::Socket(Socket::Unix(socket.clone())),
2292 FileState::READABLE,
2293 self.supports_sa_restart(),
2294 ));
2295 }
2296
2297 Ok(result?)
2298 }
2299
2300 pub fn ioctl(
2301 &mut self,
2302 request: IoctlRequest,
2303 _arg_ptr: ForeignPtr<()>,
2304 _memory_manager: &mut MemoryManager,
2305 ) -> SyscallResult {
2306 log::warn!("We do not yet handle ioctl request {request:?} on unix sockets");
2307 Err(Errno::EINVAL.into())
2308 }
2309
2310 fn update_state(
2311 &mut self,
2312 mask: FileState,
2313 state: FileState,
2314 signals: FileSignals,
2315 cb_queue: &mut CallbackQueue,
2316 ) {
2317 let old_state = self.state;
2318
2319 self.state.remove(mask);
2321 self.state.insert(state & mask);
2322
2323 self.handle_state_change(old_state, signals, cb_queue);
2324 }
2325
2326 fn handle_state_change(
2327 &mut self,
2328 old_state: FileState,
2329 signals: FileSignals,
2330 cb_queue: &mut CallbackQueue,
2331 ) {
2332 let states_changed = self.state ^ old_state;
2333
2334 if states_changed.is_empty() && signals.is_empty() {
2336 return;
2337 }
2338
2339 self.event_source
2340 .notify_listeners(self.state, states_changed, signals, cb_queue);
2341 }
2342}
2343
2344fn lookup_address(
2345 namespace: &AbstractUnixNamespace,
2346 socket_type: UnixSocketType,
2347 addr: &SockaddrUnix<&libc::sockaddr_un>,
2348) -> Result<Arc<AtomicRefCell<UnixSocket>>, linux_api::errno::Errno> {
2349 if let Some(name) = addr.as_abstract() {
2351 namespace
2353 .lookup(socket_type, name)
2354 .ok_or(linux_api::errno::Errno::ECONNREFUSED)
2355 } else {
2356 warn_once_then_debug!("Unix sockets with pathname addresses are not yet supported");
2357 Err(linux_api::errno::Errno::ENOENT)
2358 }
2359}
2360
2361fn backlog_to_queue_size(backlog: i32) -> u32 {
2362 let backlog = backlog as u32;
2365
2366 let queue_limit = std::cmp::min(backlog, c::SHADOW_SOMAXCONN);
2368
2369 queue_limit.saturating_add(1)
2372}
2373
2374#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
2376pub enum UnixSocketType {
2377 Stream,
2378 Dgram,
2379 SeqPacket,
2380}
2381
2382impl TryFrom<libc::c_int> for UnixSocketType {
2383 type Error = UnixSocketTypeConversionError;
2384 fn try_from(val: libc::c_int) -> Result<Self, Self::Error> {
2385 match val {
2386 libc::SOCK_STREAM => Ok(Self::Stream),
2387 libc::SOCK_DGRAM => Ok(Self::Dgram),
2388 libc::SOCK_SEQPACKET => Ok(Self::SeqPacket),
2389 x => Err(UnixSocketTypeConversionError(x)),
2390 }
2391 }
2392}
2393
2394#[derive(Copy, Clone, Debug)]
2395pub struct UnixSocketTypeConversionError(libc::c_int);
2396
2397impl std::error::Error for UnixSocketTypeConversionError {}
2398
2399impl std::fmt::Display for UnixSocketTypeConversionError {
2400 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2401 write!(
2402 f,
2403 "Invalid socket type {}; unix sockets only support SOCK_STREAM, SOCK_DGRAM, and SOCK_SEQPACKET",
2404 self.0
2405 )
2406 }
2407}
2408
2409#[derive(Copy, Clone, Debug)]
2410enum IncomingConnError {
2411 QueueFull,
2412 NotSupported,
2413}
2414
2415struct ByteData {
2416 from_socket: Arc<AtomicRefCell<UnixSocket>>,
2417 from_addr: Option<SockaddrUnix<libc::sockaddr_un>>,
2418 num_bytes: u64,
2419}