1use core::fmt::Display;
2use core::mem::MaybeUninit;
3
4use vasi::VirtualAddressSpaceIndependent;
5
6use crate::sync::{self, AtomicU32, UnsafeCell};
7
8#[derive(Debug, Copy, Clone, Eq, PartialEq, VirtualAddressSpaceIndependent)]
9#[repr(u8)]
10enum ChannelContentsState {
11 Empty,
12 Writing,
13 Ready,
14 Reading,
15}
16
17impl From<u8> for ChannelContentsState {
18 fn from(value: u8) -> Self {
19 const EMPTY: u8 = ChannelContentsState::Empty as u8;
20 const WRITING: u8 = ChannelContentsState::Writing as u8;
21 const READY: u8 = ChannelContentsState::Ready as u8;
22 const READING: u8 = ChannelContentsState::Reading as u8;
23 match value {
24 EMPTY => ChannelContentsState::Empty,
25 WRITING => ChannelContentsState::Writing,
26 READY => ChannelContentsState::Ready,
27 READING => ChannelContentsState::Reading,
28 _ => panic!("Bad value {value}"),
29 }
30 }
31}
32
33const WRITER_CLOSED: u32 = 0x1 << 9;
35const HAS_SLEEPER: u32 = 0x1 << 10;
36
37#[repr(C)]
38#[derive(Debug, Eq, PartialEq, Copy, Clone, VirtualAddressSpaceIndependent)]
39struct ChannelState {
40 writer_closed: bool,
41 has_sleeper: bool,
42 contents_state: ChannelContentsState,
43}
44
45impl From<u32> for ChannelState {
46 fn from(value: u32) -> Self {
47 let has_sleeper = (value & HAS_SLEEPER) != 0;
48 let writer_closed = (value & WRITER_CLOSED) != 0;
49 let contents_state = ((value & 0xff) as u8).into();
50 ChannelState {
51 has_sleeper,
52 writer_closed,
53 contents_state,
54 }
55 }
56}
57
58impl From<ChannelState> for u32 {
59 fn from(value: ChannelState) -> Self {
60 let has_sleeper = if value.has_sleeper { HAS_SLEEPER } else { 0 };
61 let writer_closed = if value.writer_closed {
62 WRITER_CLOSED
63 } else {
64 0
65 };
66 writer_closed | has_sleeper | (value.contents_state as u32)
67 }
68}
69
70#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
71#[repr(transparent)]
72struct AtomicChannelState(AtomicU32);
73impl AtomicChannelState {
74 pub fn new() -> Self {
75 Self(AtomicU32::new(
76 ChannelState {
77 has_sleeper: false,
78 writer_closed: false,
79 contents_state: ChannelContentsState::Empty,
80 }
81 .into(),
82 ))
83 }
84
85 fn fetch_update<F>(
87 &self,
88 set_order: sync::atomic::Ordering,
89 fetch_order: sync::atomic::Ordering,
90 mut f: F,
91 ) -> Result<ChannelState, ChannelState>
92 where
93 F: FnMut(ChannelState) -> Option<ChannelState>,
94 {
95 self.0
96 .fetch_update(set_order, fetch_order, |x| {
97 let res = f(ChannelState::from(x));
98 res.map(u32::from)
99 })
100 .map(ChannelState::from)
101 .map_err(ChannelState::from)
102 }
103
104 fn load(&self, order: sync::atomic::Ordering) -> ChannelState {
106 ChannelState::from(self.0.load(order))
107 }
108
109 fn compare_exchange(
111 &self,
112 current: ChannelState,
113 new: ChannelState,
114 success: sync::atomic::Ordering,
115 failure: sync::atomic::Ordering,
116 ) -> Result<ChannelState, ChannelState> {
117 self.0
118 .compare_exchange(current.into(), new.into(), success, failure)
119 .map(ChannelState::from)
120 .map_err(ChannelState::from)
121 }
122}
123
124#[derive(Debug, Copy, Clone, Eq, PartialEq)]
125pub enum SelfContainedChannelError {
126 WriterIsClosed,
127}
128
129impl Display for SelfContainedChannelError {
130 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
131 match self {
132 SelfContainedChannelError::WriterIsClosed => write!(f, "WriterIsClosed"),
133 }
134 }
135}
136
137#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
165#[repr(C)]
166pub struct SelfContainedChannel<T> {
167 message: UnsafeCell<MaybeUninit<T>>,
168 state: AtomicChannelState,
169}
170
171impl<T> SelfContainedChannel<T> {
172 pub fn new() -> Self {
173 Self {
174 message: UnsafeCell::new(MaybeUninit::uninit()),
175 state: AtomicChannelState::new(),
176 }
177 }
178
179 pub fn send(&self, message: T) {
183 self.state
184 .fetch_update(
185 sync::atomic::Ordering::Acquire,
186 sync::atomic::Ordering::Relaxed,
187 |mut state| {
188 assert_eq!(state.contents_state, ChannelContentsState::Empty);
189 state.contents_state = ChannelContentsState::Writing;
190 Some(state)
191 },
192 )
193 .unwrap();
194 unsafe { self.message.get_mut().deref().as_mut_ptr().write(message) };
195 let prev = self
196 .state
197 .fetch_update(
198 sync::atomic::Ordering::Release,
199 sync::atomic::Ordering::Relaxed,
200 |mut state| {
201 assert_eq!(state.contents_state, ChannelContentsState::Writing);
202 state.contents_state = ChannelContentsState::Ready;
203 Some(state)
204 },
205 )
206 .unwrap();
207 if prev.has_sleeper {
208 sync::futex_wake_one(&self.state.0).unwrap();
209 }
210 }
211
212 pub fn receive(&self) -> Result<T, SelfContainedChannelError> {
220 let mut state = self.state.load(sync::atomic::Ordering::Relaxed);
221 loop {
222 if state.contents_state == ChannelContentsState::Ready {
223 break;
224 }
225 if state.writer_closed {
226 return Err(SelfContainedChannelError::WriterIsClosed);
227 }
228 assert!(
229 state.contents_state == ChannelContentsState::Empty
230 || state.contents_state == ChannelContentsState::Writing
231 );
232 assert!(!state.has_sleeper);
233 let mut sleeper_state = state;
234 sleeper_state.has_sleeper = true;
235 match self.state.compare_exchange(
236 state,
237 sleeper_state,
238 sync::atomic::Ordering::Relaxed,
239 sync::atomic::Ordering::Relaxed,
240 ) {
241 Ok(_) => (),
242 Err(s) => {
243 state = s;
245 continue;
246 }
247 };
248 let expected = sleeper_state.into();
249 match sync::futex_wait(&self.state.0, expected) {
250 Ok(_) | Err(rustix::io::Errno::INTR) | Err(rustix::io::Errno::AGAIN) => {
251 let mut updated_state = self
253 .state
254 .fetch_update(
255 sync::atomic::Ordering::Relaxed,
256 sync::atomic::Ordering::Relaxed,
257 |mut state| {
258 state.has_sleeper = false;
259 Some(state)
260 },
261 )
262 .unwrap();
263 updated_state.has_sleeper = false;
264 state = updated_state;
265 continue;
266 }
267 Err(e) => panic!("Unexpected futex error {:?}", e),
268 };
269 }
270 self.state
271 .fetch_update(
272 sync::atomic::Ordering::Acquire,
273 sync::atomic::Ordering::Relaxed,
274 |mut state| {
275 assert_eq!(state.contents_state, ChannelContentsState::Ready);
276 state.contents_state = ChannelContentsState::Reading;
277 Some(state)
278 },
279 )
280 .unwrap();
281 let val = unsafe { self.message.get_mut().deref().assume_init_read() };
282 self.state
283 .fetch_update(
284 sync::atomic::Ordering::Release,
285 sync::atomic::Ordering::Relaxed,
286 |mut state| {
287 assert_eq!(state.contents_state, ChannelContentsState::Reading);
288 state.contents_state = ChannelContentsState::Empty;
289 Some(state)
290 },
291 )
292 .unwrap();
293 Ok(val)
294 }
295
296 pub fn close_writer(&self) {
303 let prev = self
304 .state
305 .fetch_update(
306 sync::atomic::Ordering::Relaxed,
307 sync::atomic::Ordering::Relaxed,
308 |mut state| {
309 state.writer_closed = true;
310 Some(state)
311 },
312 )
313 .unwrap();
314 if prev.has_sleeper {
315 sync::futex_wake_one(&self.state.0).unwrap();
316 }
317 }
318
319 pub fn writer_is_closed(&self) -> bool {
321 self.state
322 .load(sync::atomic::Ordering::Relaxed)
323 .writer_closed
324 }
325}
326
327unsafe impl<T> Send for SelfContainedChannel<T> where T: Send {}
328unsafe impl<T> Sync for SelfContainedChannel<T> where T: Send {}
329
330impl<T> Drop for SelfContainedChannel<T> {
331 fn drop(&mut self) {
332 let state = self.state.load(sync::atomic::Ordering::Acquire);
340 if state.contents_state == ChannelContentsState::Ready {
341 unsafe { self.message.get_mut().deref().assume_init_drop() }
342 }
343 }
344}
345
346impl<T> Default for SelfContainedChannel<T> {
347 fn default() -> Self {
348 Self::new()
349 }
350}