vasi_sync/
scchannel.rs

1use core::fmt::Display;
2use core::mem::MaybeUninit;
3
4use vasi::VirtualAddressSpaceIndependent;
5
6use crate::sync::{self, AtomicU32, UnsafeCell};
7
8#[derive(Debug, Copy, Clone, Eq, PartialEq, VirtualAddressSpaceIndependent)]
9#[repr(u8)]
10enum ChannelContentsState {
11    Empty,
12    Writing,
13    Ready,
14    Reading,
15}
16
17impl From<u8> for ChannelContentsState {
18    fn from(value: u8) -> Self {
19        const EMPTY: u8 = ChannelContentsState::Empty as u8;
20        const WRITING: u8 = ChannelContentsState::Writing as u8;
21        const READY: u8 = ChannelContentsState::Ready as u8;
22        const READING: u8 = ChannelContentsState::Reading as u8;
23        match value {
24            EMPTY => ChannelContentsState::Empty,
25            WRITING => ChannelContentsState::Writing,
26            READY => ChannelContentsState::Ready,
27            READING => ChannelContentsState::Reading,
28            _ => panic!("Bad value {value}"),
29        }
30    }
31}
32
33// bit flags in ChannelState
34const WRITER_CLOSED: u32 = 0x1 << 9;
35const HAS_SLEEPER: u32 = 0x1 << 10;
36
37#[repr(C)]
38#[derive(Debug, Eq, PartialEq, Copy, Clone, VirtualAddressSpaceIndependent)]
39struct ChannelState {
40    writer_closed: bool,
41    has_sleeper: bool,
42    contents_state: ChannelContentsState,
43}
44
45impl From<u32> for ChannelState {
46    fn from(value: u32) -> Self {
47        let has_sleeper = (value & HAS_SLEEPER) != 0;
48        let writer_closed = (value & WRITER_CLOSED) != 0;
49        let contents_state = ((value & 0xff) as u8).into();
50        ChannelState {
51            has_sleeper,
52            writer_closed,
53            contents_state,
54        }
55    }
56}
57
58impl From<ChannelState> for u32 {
59    fn from(value: ChannelState) -> Self {
60        let has_sleeper = if value.has_sleeper { HAS_SLEEPER } else { 0 };
61        let writer_closed = if value.writer_closed {
62            WRITER_CLOSED
63        } else {
64            0
65        };
66        writer_closed | has_sleeper | (value.contents_state as u32)
67    }
68}
69
70#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
71#[repr(transparent)]
72struct AtomicChannelState(AtomicU32);
73impl AtomicChannelState {
74    pub fn new() -> Self {
75        Self(AtomicU32::new(
76            ChannelState {
77                has_sleeper: false,
78                writer_closed: false,
79                contents_state: ChannelContentsState::Empty,
80            }
81            .into(),
82        ))
83    }
84
85    /// Typed interface to `AtomicU32::fetch_update`
86    fn fetch_update<F>(
87        &self,
88        set_order: sync::atomic::Ordering,
89        fetch_order: sync::atomic::Ordering,
90        mut f: F,
91    ) -> Result<ChannelState, ChannelState>
92    where
93        F: FnMut(ChannelState) -> Option<ChannelState>,
94    {
95        self.0
96            .fetch_update(set_order, fetch_order, |x| {
97                let res = f(ChannelState::from(x));
98                res.map(u32::from)
99            })
100            .map(ChannelState::from)
101            .map_err(ChannelState::from)
102    }
103
104    /// Typed interface to `AtomicU32::load`
105    fn load(&self, order: sync::atomic::Ordering) -> ChannelState {
106        ChannelState::from(self.0.load(order))
107    }
108
109    /// Typed interface to `AtomicU32::compare_exchange`
110    fn compare_exchange(
111        &self,
112        current: ChannelState,
113        new: ChannelState,
114        success: sync::atomic::Ordering,
115        failure: sync::atomic::Ordering,
116    ) -> Result<ChannelState, ChannelState> {
117        self.0
118            .compare_exchange(current.into(), new.into(), success, failure)
119            .map(ChannelState::from)
120            .map_err(ChannelState::from)
121    }
122}
123
124#[derive(Debug, Copy, Clone, Eq, PartialEq)]
125pub enum SelfContainedChannelError {
126    WriterIsClosed,
127}
128
129impl Display for SelfContainedChannelError {
130    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
131        match self {
132            SelfContainedChannelError::WriterIsClosed => write!(f, "WriterIsClosed"),
133        }
134    }
135}
136
137/// A fairly minimal channel implementation that implements the
138/// [`vasi::VirtualAddressSpaceIndependent`] trait.
139///
140/// Breaking the documented API contracts may result both in immediate panics,
141/// and panics in subsequent operations on the channel. To avoid this, the user
142/// must:
143/// * Never allow parallel `send` or `receive` operations. This is a
144///   single-producer, single-consumer channel.
145/// * Never call `send` when there is already a message pending.
146///
147/// Loosely inspired by the channel implementation examples in "Rust Atomics and
148/// Locks" by Mara Box (O'Reilly). Copyright 2023 Mara Box, 978-1-098-11944-7.
149/// (From the preface: "You may use all example code offered with this book for
150/// any purpose").
151///
152/// TODO: Several candidate optimizations have been evaluated and discarded, but
153/// are left in the commit history for posterity along with their corresponding
154/// microbenchmark results.
155///
156/// One that might be worth revisiting is to remove the internal "Reading" and
157/// "Writing" states and either make the interfaces `unsafe` (since it becomes
158/// the caller's responsibility to avoid parallel reads or writes), or add
159/// checked creation of `!Sync` Reader and Writer objects. This optimization
160/// appeared to have a 22% benefit in the "ping pong" microbenchmark on a large
161/// simulation machine, but only a 3.5% benefit in the "ping pong pinned"
162/// microbenchmark; the latter is expected to be more representative of real
163/// large simulation runs (i.e. pinning should be enabled).
164#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
165#[repr(C)]
166pub struct SelfContainedChannel<T> {
167    message: UnsafeCell<MaybeUninit<T>>,
168    state: AtomicChannelState,
169}
170
171impl<T> SelfContainedChannel<T> {
172    pub fn new() -> Self {
173        Self {
174            message: UnsafeCell::new(MaybeUninit::uninit()),
175            state: AtomicChannelState::new(),
176        }
177    }
178
179    /// Sends `message` through the channel.
180    ///
181    /// Panics if the channel already has an unreceived message.
182    pub fn send(&self, message: T) {
183        self.state
184            .fetch_update(
185                sync::atomic::Ordering::Acquire,
186                sync::atomic::Ordering::Relaxed,
187                |mut state| {
188                    assert_eq!(state.contents_state, ChannelContentsState::Empty);
189                    state.contents_state = ChannelContentsState::Writing;
190                    Some(state)
191                },
192            )
193            .unwrap();
194        unsafe { self.message.get_mut().deref().as_mut_ptr().write(message) };
195        let prev = self
196            .state
197            .fetch_update(
198                sync::atomic::Ordering::Release,
199                sync::atomic::Ordering::Relaxed,
200                |mut state| {
201                    assert_eq!(state.contents_state, ChannelContentsState::Writing);
202                    state.contents_state = ChannelContentsState::Ready;
203                    Some(state)
204                },
205            )
206            .unwrap();
207        if prev.has_sleeper {
208            sync::futex_wake_one(&self.state.0).unwrap();
209        }
210    }
211
212    /// Blocks until either the channel contains a message, or the writer has
213    /// closed the channel.
214    ///
215    /// Returns `Ok(T)` if a message was received, or
216    /// `Err(SelfContainedMutexError::WriterIsClosed)` if the writer is closed.
217    ///
218    /// Panics if another thread is already trying to receive on this channel.
219    pub fn receive(&self) -> Result<T, SelfContainedChannelError> {
220        let mut state = self.state.load(sync::atomic::Ordering::Relaxed);
221        loop {
222            if state.contents_state == ChannelContentsState::Ready {
223                break;
224            }
225            if state.writer_closed {
226                return Err(SelfContainedChannelError::WriterIsClosed);
227            }
228            assert!(
229                state.contents_state == ChannelContentsState::Empty
230                    || state.contents_state == ChannelContentsState::Writing
231            );
232            assert!(!state.has_sleeper);
233            let mut sleeper_state = state;
234            sleeper_state.has_sleeper = true;
235            match self.state.compare_exchange(
236                state,
237                sleeper_state,
238                sync::atomic::Ordering::Relaxed,
239                sync::atomic::Ordering::Relaxed,
240            ) {
241                Ok(_) => (),
242                Err(s) => {
243                    // Something changed; re-evaluate.
244                    state = s;
245                    continue;
246                }
247            };
248            let expected = sleeper_state.into();
249            match sync::futex_wait(&self.state.0, expected) {
250                Ok(_) | Err(rustix::io::Errno::INTR) | Err(rustix::io::Errno::AGAIN) => {
251                    // Something changed; clear the sleeper bit and try again.
252                    let mut updated_state = self
253                        .state
254                        .fetch_update(
255                            sync::atomic::Ordering::Relaxed,
256                            sync::atomic::Ordering::Relaxed,
257                            |mut state| {
258                                state.has_sleeper = false;
259                                Some(state)
260                            },
261                        )
262                        .unwrap();
263                    updated_state.has_sleeper = false;
264                    state = updated_state;
265                    continue;
266                }
267                Err(e) => panic!("Unexpected futex error {:?}", e),
268            };
269        }
270        self.state
271            .fetch_update(
272                sync::atomic::Ordering::Acquire,
273                sync::atomic::Ordering::Relaxed,
274                |mut state| {
275                    assert_eq!(state.contents_state, ChannelContentsState::Ready);
276                    state.contents_state = ChannelContentsState::Reading;
277                    Some(state)
278                },
279            )
280            .unwrap();
281        let val = unsafe { self.message.get_mut().deref().assume_init_read() };
282        self.state
283            .fetch_update(
284                sync::atomic::Ordering::Release,
285                sync::atomic::Ordering::Relaxed,
286                |mut state| {
287                    assert_eq!(state.contents_state, ChannelContentsState::Reading);
288                    state.contents_state = ChannelContentsState::Empty;
289                    Some(state)
290                },
291            )
292            .unwrap();
293        Ok(val)
294    }
295
296    /// Closes the "write" end of the channel. This will cause any current
297    /// and subsequent `receive` operations to fail once the channel is empty.
298    ///
299    /// This method *can* be called in parallel with other methods on the
300    /// channel, making it suitable to be called from a separate watchdog thread
301    /// that detects that the writing thread (or process) has died.
302    pub fn close_writer(&self) {
303        let prev = self
304            .state
305            .fetch_update(
306                sync::atomic::Ordering::Relaxed,
307                sync::atomic::Ordering::Relaxed,
308                |mut state| {
309                    state.writer_closed = true;
310                    Some(state)
311                },
312            )
313            .unwrap();
314        if prev.has_sleeper {
315            sync::futex_wake_one(&self.state.0).unwrap();
316        }
317    }
318
319    /// Whether the write-end of the channel has been closed (via `close_writer`).
320    pub fn writer_is_closed(&self) -> bool {
321        self.state
322            .load(sync::atomic::Ordering::Relaxed)
323            .writer_closed
324    }
325}
326
327unsafe impl<T> Send for SelfContainedChannel<T> where T: Send {}
328unsafe impl<T> Sync for SelfContainedChannel<T> where T: Send {}
329
330impl<T> Drop for SelfContainedChannel<T> {
331    fn drop(&mut self) {
332        // Conservatively use Acquire-ordering here to synchronize with the
333        // Release-ordered store in `send`.
334        //
335        // This shouldn't be strictly necessary - for us to have a `&mut` reference, some
336        // external synchronization must have already happened over the whole channel. Indeed
337        // the original Box implementation uses get_mut here, which doesn't have an atomic
338        // operation at all.
339        let state = self.state.load(sync::atomic::Ordering::Acquire);
340        if state.contents_state == ChannelContentsState::Ready {
341            unsafe { self.message.get_mut().deref().assume_init_drop() }
342        }
343    }
344}
345
346impl<T> Default for SelfContainedChannel<T> {
347    fn default() -> Self {
348        Self::new()
349    }
350}