vasi_sync/scmutex.rs
1use core::{marker::PhantomData, pin::Pin};
2
3use vasi::VirtualAddressSpaceIndependent;
4
5use crate::sync;
6
7#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
8#[repr(transparent)]
9struct AtomicFutexWord(sync::atomic::AtomicU32);
10
11impl AtomicFutexWord {
12 // TODO: merge with `new` if and when loom's `AtomicU32` supports a const `new`.
13 #[cfg(not(loom))]
14 pub const fn const_new(val: FutexWord) -> Self {
15 Self(crate::sync::atomic::AtomicU32::new(val.to_u32()))
16 }
17
18 pub fn new(val: FutexWord) -> Self {
19 Self(crate::sync::atomic::AtomicU32::new(val.to_u32()))
20 }
21
22 pub fn inc_sleepers_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
23 // The number of sleepers is stored in the low bits of the futex word,
24 // so we can increment the whole word.
25 let prev = FutexWord::from(self.0.fetch_add(1, ord));
26
27 // We'll panic here if we've overflowed she "sleepers" half of the word,
28 // leaving the lock in a bad state. Since UNLOCKED is 0, this will never
29 // cause a spurious unlock, but still-live threads using the lock
30 // will likely panic or deadlock.
31 FutexWord {
32 lock_state: prev.lock_state,
33 num_sleepers: prev.num_sleepers.checked_add(1).unwrap(),
34 }
35 }
36
37 pub fn dec_sleepers_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
38 // The number of sleepers is stored in the low bits of the futex word,
39 // so we can decrement the whole word.
40
41 // Ideally we'd just use an atomic op on the "sleepers" part of the
42 // larger word, but that sort of aliasing breaks loom's analysis.
43 let prev = FutexWord::from(self.0.fetch_sub(1, ord));
44
45 // We'll panic here if we've underflowed the "sleepers" half of the word,
46 // leaving the lock in a bad state. This shouldn't be possible assuming
47 // SelfContainedMutex itself isn't buggy.
48 FutexWord {
49 lock_state: prev.lock_state,
50 num_sleepers: prev.num_sleepers.checked_sub(1).unwrap(),
51 }
52 }
53
54 pub fn unlock_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
55 // We avoid having to synchronize the number of sleepers by using fetch_sub
56 // instead of a compare and swap.
57 debug_assert_eq!(UNLOCKED, 0);
58 let prev = FutexWord::from(self.0.fetch_sub(
59 u32::from(FutexWord {
60 lock_state: LOCKED,
61 num_sleepers: 0,
62 }),
63 ord,
64 ));
65 assert_eq!(prev.lock_state, LOCKED);
66 FutexWord {
67 lock_state: UNLOCKED,
68 num_sleepers: prev.num_sleepers,
69 }
70 }
71
72 pub fn disconnect(&self, ord: sync::atomic::Ordering) {
73 // We avoid having to synchronize the number of sleepers by using fetch_add
74 // instead of a compare and swap.
75 //
76 // We'll panic here if we've somehow underflowed the word. This
77 // shouldn't be possible assuming SelfContainedMutex itself isn't buggy.
78 let to_add = LOCKED_DISCONNECTED.checked_sub(LOCKED).unwrap();
79 let prev = FutexWord::from(self.0.fetch_add(
80 u32::from(FutexWord {
81 lock_state: to_add,
82 num_sleepers: 0,
83 }),
84 ord,
85 ));
86 assert_eq!(prev.lock_state, LOCKED);
87 }
88
89 pub fn load(&self, ord: sync::atomic::Ordering) -> FutexWord {
90 self.0.load(ord).into()
91 }
92
93 pub fn compare_exchange(
94 &self,
95 current: FutexWord,
96 new: FutexWord,
97 success: sync::atomic::Ordering,
98 failure: sync::atomic::Ordering,
99 ) -> Result<FutexWord, FutexWord> {
100 let raw_res = self
101 .0
102 .compare_exchange(current.into(), new.into(), success, failure);
103 raw_res.map(FutexWord::from).map_err(FutexWord::from)
104 }
105}
106
107#[repr(C)]
108#[derive(Copy, Clone, Debug, Eq, PartialEq)]
109struct FutexWord {
110 lock_state: u16,
111 num_sleepers: u16,
112}
113
114impl FutexWord {
115 const fn to_u32(self) -> u32 {
116 ((self.lock_state as u32) << 16) | (self.num_sleepers as u32)
117 }
118}
119
120impl From<u32> for FutexWord {
121 fn from(val: u32) -> Self {
122 Self {
123 lock_state: (val >> 16).try_into().unwrap(),
124 num_sleepers: (val & 0xff_ff).try_into().unwrap(),
125 }
126 }
127}
128
129impl From<FutexWord> for u32 {
130 fn from(val: FutexWord) -> Self {
131 val.to_u32()
132 }
133}
134
135/// Simple mutex that is suitable for use in shared memory:
136///
137/// * It has a fixed layout (repr(C))
138/// * It's self-contained; e.g. isn't boxed and doesn't refer
139/// to global lock-state in this process's address space.
140/// * Works across processes (e.g. doesn't use FUTEX_PRIVATE_FLAG)
141///
142/// Performance is optimized primarily for low-contention scenarios.
143#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
144#[repr(C)]
145pub struct SelfContainedMutex<T> {
146 futex: AtomicFutexWord,
147 val: sync::UnsafeCell<T>,
148}
149
150unsafe impl<T> Send for SelfContainedMutex<T> where T: Send {}
151unsafe impl<T> Sync for SelfContainedMutex<T> where T: Send {}
152
153const UNLOCKED: u16 = 0;
154const LOCKED: u16 = 1;
155const LOCKED_DISCONNECTED: u16 = 2;
156
157impl<T> SelfContainedMutex<T> {
158 // TODO: merge with `new` when `AtomicFutexWord` supports a const `new`.
159 #[cfg(not(loom))]
160 pub const fn const_new(val: T) -> Self {
161 Self {
162 futex: AtomicFutexWord::const_new(FutexWord {
163 lock_state: UNLOCKED,
164 num_sleepers: 0,
165 }),
166 val: sync::UnsafeCell::new(val),
167 }
168 }
169
170 pub fn new(val: T) -> Self {
171 Self {
172 futex: AtomicFutexWord::new(FutexWord {
173 lock_state: UNLOCKED,
174 num_sleepers: 0,
175 }),
176 val: sync::UnsafeCell::new(val),
177 }
178 }
179
180 pub fn lock(&self) -> SelfContainedMutexGuard<T> {
181 // On first attempt, optimistically assume the lock is uncontended.
182 let mut current = FutexWord {
183 lock_state: UNLOCKED,
184 num_sleepers: 0,
185 };
186 loop {
187 if current.lock_state == UNLOCKED {
188 // Try to take the lock.
189 let current_res = self.futex.compare_exchange(
190 current,
191 FutexWord {
192 lock_state: LOCKED,
193 num_sleepers: current.num_sleepers,
194 },
195 sync::Ordering::Acquire,
196 sync::Ordering::Relaxed,
197 );
198 current = match current_res {
199 Ok(_) => {
200 // We successfully took the lock.
201 break;
202 }
203 // We weren't able to take the lock.
204 Err(i) => i,
205 };
206 }
207
208 // If the lock is available, try again now that we've sync'd the
209 // rest of the futex word (num_sleepers).
210 if current.lock_state == UNLOCKED {
211 continue;
212 }
213
214 // Try to sleep on the futex.
215
216 // Since incrementing is a read-modify-write operation, this does
217 // not break the release sequence since the last unlock.
218 current = self.futex.inc_sleepers_and_fetch(sync::Ordering::Relaxed);
219 loop {
220 // We may now see an UNLOCKED state from having done the increment
221 // above, or the load below.
222 if current.lock_state == UNLOCKED {
223 break;
224 }
225 match sync::futex_wait(&self.futex.0, current.into()) {
226 Ok(_) | Err(rustix::io::Errno::INTR) => break,
227 Err(rustix::io::Errno::AGAIN) => {
228 // We may have gotten this because another thread is
229 // also trying to sleep on the futex, and just
230 // incremented the sleeper count. If we naively
231 // decremented the sleeper count and ran the whole lock
232 // loop again, both threads could theoretically end up
233 // in a live-lock where neither ever gets to sleep on
234 // the futex.
235 //
236 // To avoid that, we update our current view of the
237 // atomic and consider trying again before removing
238 // ourselves from the sleeper count.
239 current = self.futex.load(sync::Ordering::Relaxed)
240 }
241 Err(e) => panic!("Unexpected futex error {:?}", e),
242 };
243 }
244 // Since decrementing is a read-modify-write operation, this does
245 // not break the release sequence since the last unlock.
246 current = self.futex.dec_sleepers_and_fetch(sync::Ordering::Relaxed);
247 }
248 SelfContainedMutexGuard {
249 mutex: Some(self),
250 ptr: Some(self.val.get_mut()),
251 _phantom: PhantomData,
252 }
253 }
254
255 pub fn lock_pinned(self: Pin<&Self>) -> Pin<SelfContainedMutexGuard<'_, T>> {
256 // SAFETY: `SelfContainedMutexGuard` doesn't provide DerefMut when `T`
257 // is `!Unpin`.
258 unsafe { Pin::new_unchecked(self.get_ref().lock()) }
259 }
260
261 fn unlock(&self) {
262 let current = self.futex.unlock_and_fetch(sync::Ordering::Release);
263
264 // Only perform a FUTEX_WAKE operation if other threads are actually
265 // sleeping on the lock.
266 if current.num_sleepers > 0 {
267 sync::futex_wake_one(&self.futex.0).unwrap();
268 }
269 }
270}
271
272pub struct SelfContainedMutexGuard<'a, T> {
273 mutex: Option<&'a SelfContainedMutex<T>>,
274 ptr: Option<sync::MutPtr<T>>,
275 // For purposes of deriving Send, Sync, etc.,
276 // this type should act as `&mut T`.
277 _phantom: PhantomData<&'a mut T>,
278}
279
280impl<'a, T> SelfContainedMutexGuard<'a, T> {
281 /// Drops the guard *without releasing the lock*.
282 ///
283 /// This is useful when a lock must be held across some span of code within
284 /// a single thread, but it's difficult to pass the the guard between the
285 /// two parts of the code.
286 pub fn disconnect(mut self) {
287 self.mutex
288 .unwrap()
289 .futex
290 .disconnect(sync::Ordering::Relaxed);
291 self.mutex.take();
292 self.ptr.take();
293 }
294
295 /// Reconstitutes a guard that was previously disposed of via `disconnect`.
296 ///
297 /// Panics if the lock is not disconnected (i.e. if `reconnect` was
298 /// already called).
299 ///
300 /// Ok to reconnect from a different thread,though some external
301 /// synchronization may be needed to ensure the mutex is disconnected before
302 /// it tries to do so.
303 pub fn reconnect(mutex: &'a SelfContainedMutex<T>) -> Self {
304 let mut current = FutexWord {
305 lock_state: LOCKED_DISCONNECTED,
306 num_sleepers: 0,
307 };
308 loop {
309 assert_eq!(current.lock_state, LOCKED_DISCONNECTED);
310 let current_res = mutex.futex.compare_exchange(
311 current,
312 FutexWord {
313 lock_state: LOCKED,
314 num_sleepers: current.num_sleepers,
315 },
316 sync::Ordering::Relaxed,
317 sync::Ordering::Relaxed,
318 );
319 match current_res {
320 Ok(_) => {
321 // Done.
322 return Self {
323 mutex: Some(mutex),
324 ptr: Some(mutex.val.get_mut()),
325 _phantom: PhantomData,
326 };
327 }
328 Err(c) => {
329 // Try again with updated state
330 current = c;
331 }
332 }
333 }
334 }
335
336 /// Map the guard into a function of Pin<&mut T>.
337 ///
338 /// When T implements `Unpin`, the caller can just use deref_mut instead.
339 ///
340 // We can't provide an API that simply returns a Pin<&mut T>, since the Pin
341 // API doesn't provide a way to get to the inner guard without consuming the outer Pin.
342 pub fn map_pinned<F, O>(guard: Pin<Self>, f: F) -> O
343 where
344 F: FnOnce(Pin<&mut T>) -> O,
345 {
346 // SAFETY: We ensure that the &mut T made available from the unpinned guard isn't
347 // moved-from, by only giving `f` access to a Pin<&mut T>.
348 let guard: SelfContainedMutexGuard<T> = unsafe { Pin::into_inner_unchecked(guard) };
349 // SAFETY: The pointer is valid because it came from the mutex, which we know is live.
350 // The mutex ensures there can be no other live references to the internal data.
351 let ref_t = unsafe { guard.ptr.as_ref().unwrap().deref() };
352 // SAFETY: We know the original data is pinned, since the guard was Pin<Self>.
353 let pinned_t: Pin<&mut T> = unsafe { Pin::new_unchecked(ref_t) };
354 f(pinned_t)
355 }
356}
357
358impl<T> Drop for SelfContainedMutexGuard<'_, T> {
359 fn drop(&mut self) {
360 if let Some(mutex) = self.mutex {
361 // We have to drop this pointer before unlocking when running
362 // under loom, which could otherwise detect multiple mutable
363 // references to the underlying cell. Under non loom, the drop
364 // has no effect.
365 #[allow(clippy::drop_non_drop)]
366 drop(self.ptr.take());
367 mutex.unlock();
368 }
369 }
370}
371
372impl<T> core::ops::Deref for SelfContainedMutexGuard<'_, T> {
373 type Target = T;
374
375 fn deref(&self) -> &Self::Target {
376 // We can't call self.ptr.as_ref().unwrap().deref() here, since that
377 // would create a `&mut T`, and there could already exist a `&T`
378 // borrowed from `&self`.
379 // https://github.com/tokio-rs/loom/issues/293
380 self.ptr.as_ref().unwrap().with(|p| unsafe { &*p })
381 }
382}
383
384/// When T is Unpin, we can implement DerefMut. Otherwise it's unsafe
385/// to do so, since SelfContainedMutex is an Archive type.
386impl<T> core::ops::DerefMut for SelfContainedMutexGuard<'_, T>
387where
388 T: Unpin,
389{
390 fn deref_mut(&mut self) -> &mut Self::Target {
391 unsafe { self.ptr.as_ref().unwrap().deref() }
392 }
393}
394
395// For unit tests see tests/scmutex-tests.rs