vasi_sync/
scmutex.rs

1use core::{marker::PhantomData, pin::Pin};
2
3use vasi::VirtualAddressSpaceIndependent;
4
5use crate::sync;
6
7#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
8#[repr(transparent)]
9struct AtomicFutexWord(sync::atomic::AtomicU32);
10
11impl AtomicFutexWord {
12    // TODO: merge with `new` if and when loom's `AtomicU32` supports a const `new`.
13    #[cfg(not(loom))]
14    pub const fn const_new(val: FutexWord) -> Self {
15        Self(crate::sync::atomic::AtomicU32::new(val.to_u32()))
16    }
17
18    pub fn new(val: FutexWord) -> Self {
19        Self(crate::sync::atomic::AtomicU32::new(val.to_u32()))
20    }
21
22    pub fn inc_sleepers_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
23        // The number of sleepers is stored in the low bits of the futex word,
24        // so we can increment the whole word.
25        let prev = FutexWord::from(self.0.fetch_add(1, ord));
26
27        // We'll panic here if we've overflowed she "sleepers" half of the word,
28        // leaving the lock in a bad state. Since UNLOCKED is 0, this will never
29        // cause a spurious unlock, but still-live threads using the lock
30        // will likely panic or deadlock.
31        FutexWord {
32            lock_state: prev.lock_state,
33            num_sleepers: prev.num_sleepers.checked_add(1).unwrap(),
34        }
35    }
36
37    pub fn dec_sleepers_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
38        // The number of sleepers is stored in the low bits of the futex word,
39        // so we can decrement the whole word.
40
41        // Ideally we'd just use an atomic op on the "sleepers" part of the
42        // larger word, but that sort of aliasing breaks loom's analysis.
43        let prev = FutexWord::from(self.0.fetch_sub(1, ord));
44
45        // We'll panic here if we've underflowed the "sleepers" half of the word,
46        // leaving the lock in a bad state. This shouldn't be possible assuming
47        // SelfContainedMutex itself isn't buggy.
48        FutexWord {
49            lock_state: prev.lock_state,
50            num_sleepers: prev.num_sleepers.checked_sub(1).unwrap(),
51        }
52    }
53
54    pub fn unlock_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
55        // We avoid having to synchronize the number of sleepers by using fetch_sub
56        // instead of a compare and swap.
57        debug_assert_eq!(UNLOCKED, 0);
58        let prev = FutexWord::from(self.0.fetch_sub(
59            u32::from(FutexWord {
60                lock_state: LOCKED,
61                num_sleepers: 0,
62            }),
63            ord,
64        ));
65        assert_eq!(prev.lock_state, LOCKED);
66        FutexWord {
67            lock_state: UNLOCKED,
68            num_sleepers: prev.num_sleepers,
69        }
70    }
71
72    pub fn disconnect(&self, ord: sync::atomic::Ordering) {
73        // We avoid having to synchronize the number of sleepers by using fetch_add
74        // instead of a compare and swap.
75        //
76        // We'll panic here if we've somehow underflowed the word. This
77        // shouldn't be possible assuming SelfContainedMutex itself isn't buggy.
78        let to_add = LOCKED_DISCONNECTED.checked_sub(LOCKED).unwrap();
79        let prev = FutexWord::from(self.0.fetch_add(
80            u32::from(FutexWord {
81                lock_state: to_add,
82                num_sleepers: 0,
83            }),
84            ord,
85        ));
86        assert_eq!(prev.lock_state, LOCKED);
87    }
88
89    pub fn load(&self, ord: sync::atomic::Ordering) -> FutexWord {
90        self.0.load(ord).into()
91    }
92
93    pub fn compare_exchange(
94        &self,
95        current: FutexWord,
96        new: FutexWord,
97        success: sync::atomic::Ordering,
98        failure: sync::atomic::Ordering,
99    ) -> Result<FutexWord, FutexWord> {
100        let raw_res = self
101            .0
102            .compare_exchange(current.into(), new.into(), success, failure);
103        raw_res.map(FutexWord::from).map_err(FutexWord::from)
104    }
105}
106
107#[repr(C)]
108#[derive(Copy, Clone, Debug, Eq, PartialEq)]
109struct FutexWord {
110    lock_state: u16,
111    num_sleepers: u16,
112}
113
114impl FutexWord {
115    const fn to_u32(self) -> u32 {
116        ((self.lock_state as u32) << 16) | (self.num_sleepers as u32)
117    }
118}
119
120impl From<u32> for FutexWord {
121    fn from(val: u32) -> Self {
122        Self {
123            lock_state: (val >> 16).try_into().unwrap(),
124            num_sleepers: (val & 0xff_ff).try_into().unwrap(),
125        }
126    }
127}
128
129impl From<FutexWord> for u32 {
130    fn from(val: FutexWord) -> Self {
131        val.to_u32()
132    }
133}
134
135/// Simple mutex that is suitable for use in shared memory:
136///
137/// * It has a fixed layout (repr(C))
138/// * It's self-contained; e.g. isn't boxed and doesn't refer
139///   to global lock-state in this process's address space.
140/// * Works across processes (e.g. doesn't use FUTEX_PRIVATE_FLAG)
141///
142/// Performance is optimized primarily for low-contention scenarios.
143#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
144#[repr(C)]
145pub struct SelfContainedMutex<T> {
146    futex: AtomicFutexWord,
147    val: sync::UnsafeCell<T>,
148}
149
150unsafe impl<T> Send for SelfContainedMutex<T> where T: Send {}
151unsafe impl<T> Sync for SelfContainedMutex<T> where T: Send {}
152
153const UNLOCKED: u16 = 0;
154const LOCKED: u16 = 1;
155const LOCKED_DISCONNECTED: u16 = 2;
156
157impl<T> SelfContainedMutex<T> {
158    // TODO: merge with `new` when `AtomicFutexWord` supports a const `new`.
159    #[cfg(not(loom))]
160    pub const fn const_new(val: T) -> Self {
161        Self {
162            futex: AtomicFutexWord::const_new(FutexWord {
163                lock_state: UNLOCKED,
164                num_sleepers: 0,
165            }),
166            val: sync::UnsafeCell::new(val),
167        }
168    }
169
170    pub fn new(val: T) -> Self {
171        Self {
172            futex: AtomicFutexWord::new(FutexWord {
173                lock_state: UNLOCKED,
174                num_sleepers: 0,
175            }),
176            val: sync::UnsafeCell::new(val),
177        }
178    }
179
180    pub fn lock(&self) -> SelfContainedMutexGuard<T> {
181        // On first attempt, optimistically assume the lock is uncontended.
182        let mut current = FutexWord {
183            lock_state: UNLOCKED,
184            num_sleepers: 0,
185        };
186        loop {
187            if current.lock_state == UNLOCKED {
188                // Try to take the lock.
189                let current_res = self.futex.compare_exchange(
190                    current,
191                    FutexWord {
192                        lock_state: LOCKED,
193                        num_sleepers: current.num_sleepers,
194                    },
195                    sync::Ordering::Acquire,
196                    sync::Ordering::Relaxed,
197                );
198                current = match current_res {
199                    Ok(_) => {
200                        // We successfully took the lock.
201                        break;
202                    }
203                    // We weren't able to take the lock.
204                    Err(i) => i,
205                };
206            }
207
208            // If the lock is available, try again now that we've sync'd the
209            // rest of the futex word (num_sleepers).
210            if current.lock_state == UNLOCKED {
211                continue;
212            }
213
214            // Try to sleep on the futex.
215
216            // Since incrementing is a read-modify-write operation, this does
217            // not break the release sequence since the last unlock.
218            current = self.futex.inc_sleepers_and_fetch(sync::Ordering::Relaxed);
219            loop {
220                // We may now see an UNLOCKED state from having done the increment
221                // above, or the load below.
222                if current.lock_state == UNLOCKED {
223                    break;
224                }
225                match sync::futex_wait(&self.futex.0, current.into()) {
226                    Ok(_) | Err(rustix::io::Errno::INTR) => break,
227                    Err(rustix::io::Errno::AGAIN) => {
228                        // We may have gotten this because another thread is
229                        // also trying to sleep on the futex, and just
230                        // incremented the sleeper count. If we naively
231                        // decremented the sleeper count and ran the whole lock
232                        // loop again, both threads could theoretically end up
233                        // in a live-lock where neither ever gets to sleep on
234                        // the futex.
235                        //
236                        // To avoid that, we update our current view of the
237                        // atomic and consider trying again before removing
238                        // ourselves from the sleeper count.
239                        current = self.futex.load(sync::Ordering::Relaxed)
240                    }
241                    Err(e) => panic!("Unexpected futex error {:?}", e),
242                };
243            }
244            // Since decrementing is a read-modify-write operation, this does
245            // not break the release sequence since the last unlock.
246            current = self.futex.dec_sleepers_and_fetch(sync::Ordering::Relaxed);
247        }
248        SelfContainedMutexGuard {
249            mutex: Some(self),
250            ptr: Some(self.val.get_mut()),
251            _phantom: PhantomData,
252        }
253    }
254
255    pub fn lock_pinned(self: Pin<&Self>) -> Pin<SelfContainedMutexGuard<'_, T>> {
256        // SAFETY: `SelfContainedMutexGuard` doesn't provide DerefMut when `T`
257        // is `!Unpin`.
258        unsafe { Pin::new_unchecked(self.get_ref().lock()) }
259    }
260
261    fn unlock(&self) {
262        let current = self.futex.unlock_and_fetch(sync::Ordering::Release);
263
264        // Only perform a FUTEX_WAKE operation if other threads are actually
265        // sleeping on the lock.
266        if current.num_sleepers > 0 {
267            sync::futex_wake_one(&self.futex.0).unwrap();
268        }
269    }
270}
271
272pub struct SelfContainedMutexGuard<'a, T> {
273    mutex: Option<&'a SelfContainedMutex<T>>,
274    ptr: Option<sync::MutPtr<T>>,
275    // For purposes of deriving Send, Sync, etc.,
276    // this type should act as `&mut T`.
277    _phantom: PhantomData<&'a mut T>,
278}
279
280impl<'a, T> SelfContainedMutexGuard<'a, T> {
281    /// Drops the guard *without releasing the lock*.
282    ///
283    /// This is useful when a lock must be held across some span of code within
284    /// a single thread, but it's difficult to pass the the guard between the
285    /// two parts of the code.
286    pub fn disconnect(mut self) {
287        self.mutex
288            .unwrap()
289            .futex
290            .disconnect(sync::Ordering::Relaxed);
291        self.mutex.take();
292        self.ptr.take();
293    }
294
295    /// Reconstitutes a guard that was previously disposed of via `disconnect`.
296    ///
297    /// Panics if the lock is not disconnected (i.e. if `reconnect` was
298    /// already called).
299    ///
300    /// Ok to reconnect from a different thread,though some external
301    /// synchronization may be needed to ensure the mutex is disconnected before
302    /// it tries to do so.
303    pub fn reconnect(mutex: &'a SelfContainedMutex<T>) -> Self {
304        let mut current = FutexWord {
305            lock_state: LOCKED_DISCONNECTED,
306            num_sleepers: 0,
307        };
308        loop {
309            assert_eq!(current.lock_state, LOCKED_DISCONNECTED);
310            let current_res = mutex.futex.compare_exchange(
311                current,
312                FutexWord {
313                    lock_state: LOCKED,
314                    num_sleepers: current.num_sleepers,
315                },
316                sync::Ordering::Relaxed,
317                sync::Ordering::Relaxed,
318            );
319            match current_res {
320                Ok(_) => {
321                    // Done.
322                    return Self {
323                        mutex: Some(mutex),
324                        ptr: Some(mutex.val.get_mut()),
325                        _phantom: PhantomData,
326                    };
327                }
328                Err(c) => {
329                    // Try again with updated state
330                    current = c;
331                }
332            }
333        }
334    }
335
336    /// Map the guard into a function of Pin<&mut T>.
337    ///
338    /// When T implements `Unpin`, the caller can just use deref_mut instead.
339    ///
340    // We can't provide an API that simply returns a Pin<&mut T>, since the Pin
341    // API doesn't provide a way to get to the inner guard without consuming the outer Pin.
342    pub fn map_pinned<F, O>(guard: Pin<Self>, f: F) -> O
343    where
344        F: FnOnce(Pin<&mut T>) -> O,
345    {
346        // SAFETY: We ensure that the &mut T made available from the unpinned guard isn't
347        // moved-from, by only giving `f` access to a Pin<&mut T>.
348        let guard: SelfContainedMutexGuard<T> = unsafe { Pin::into_inner_unchecked(guard) };
349        // SAFETY: The pointer is valid because it came from the mutex, which we know is live.
350        // The mutex ensures there can be no other live references to the internal data.
351        let ref_t = unsafe { guard.ptr.as_ref().unwrap().deref() };
352        // SAFETY: We know the original data is pinned, since the guard was Pin<Self>.
353        let pinned_t: Pin<&mut T> = unsafe { Pin::new_unchecked(ref_t) };
354        f(pinned_t)
355    }
356}
357
358impl<T> Drop for SelfContainedMutexGuard<'_, T> {
359    fn drop(&mut self) {
360        if let Some(mutex) = self.mutex {
361            // We have to drop this pointer before unlocking when running
362            // under loom, which could otherwise detect multiple mutable
363            // references to the underlying cell. Under non loom, the drop
364            // has no effect.
365            #[allow(clippy::drop_non_drop)]
366            drop(self.ptr.take());
367            mutex.unlock();
368        }
369    }
370}
371
372impl<T> core::ops::Deref for SelfContainedMutexGuard<'_, T> {
373    type Target = T;
374
375    fn deref(&self) -> &Self::Target {
376        // We can't call self.ptr.as_ref().unwrap().deref() here, since that
377        // would create a `&mut T`, and there could already exist a `&T`
378        // borrowed from `&self`.
379        // https://github.com/tokio-rs/loom/issues/293
380        self.ptr.as_ref().unwrap().with(|p| unsafe { &*p })
381    }
382}
383
384/// When T is Unpin, we can implement DerefMut. Otherwise it's unsafe
385/// to do so, since SelfContainedMutex is an Archive type.
386impl<T> core::ops::DerefMut for SelfContainedMutexGuard<'_, T>
387where
388    T: Unpin,
389{
390    fn deref_mut(&mut self) -> &mut Self::Target {
391        unsafe { self.ptr.as_ref().unwrap().deref() }
392    }
393}
394
395// For unit tests see tests/scmutex-tests.rs