1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
use core::{marker::PhantomData, pin::Pin};

use vasi::VirtualAddressSpaceIndependent;

use crate::sync;

#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
#[repr(transparent)]
struct AtomicFutexWord(sync::atomic::AtomicU32);

impl AtomicFutexWord {
    // TODO: merge with `new` if and when loom's `AtomicU32` supports a const `new`.
    #[cfg(not(loom))]
    pub const fn const_new(val: FutexWord) -> Self {
        Self(crate::sync::atomic::AtomicU32::new(val.to_u32()))
    }

    pub fn new(val: FutexWord) -> Self {
        Self(crate::sync::atomic::AtomicU32::new(val.to_u32()))
    }

    pub fn inc_sleepers_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
        // The number of sleepers is stored in the low bits of the futex word,
        // so we can increment the whole word.
        let prev = FutexWord::from(self.0.fetch_add(1, ord));

        // We'll panic here if we've overflowed she "sleepers" half of the word,
        // leaving the lock in a bad state. Since UNLOCKED is 0, this will never
        // cause a spurious unlock, but still-live threads using the lock
        // will likely panic or deadlock.
        FutexWord {
            lock_state: prev.lock_state,
            num_sleepers: prev.num_sleepers.checked_add(1).unwrap(),
        }
    }

    pub fn dec_sleepers_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
        // The number of sleepers is stored in the low bits of the futex word,
        // so we can decrement the whole word.

        // Ideally we'd just use an atomic op on the "sleepers" part of the
        // larger word, but that sort of aliasing breaks loom's analysis.
        let prev = FutexWord::from(self.0.fetch_sub(1, ord));

        // We'll panic here if we've underflowed the "sleepers" half of the word,
        // leaving the lock in a bad state. This shouldn't be possible assuming
        // SelfContainedMutex itself isn't buggy.
        FutexWord {
            lock_state: prev.lock_state,
            num_sleepers: prev.num_sleepers.checked_sub(1).unwrap(),
        }
    }

    pub fn unlock_and_fetch(&self, ord: sync::atomic::Ordering) -> FutexWord {
        // We avoid having to synchronize the number of sleepers by using fetch_sub
        // instead of a compare and swap.
        debug_assert_eq!(UNLOCKED, 0);
        let prev = FutexWord::from(self.0.fetch_sub(
            u32::from(FutexWord {
                lock_state: LOCKED,
                num_sleepers: 0,
            }),
            ord,
        ));
        assert_eq!(prev.lock_state, LOCKED);
        FutexWord {
            lock_state: UNLOCKED,
            num_sleepers: prev.num_sleepers,
        }
    }

    pub fn disconnect(&self, ord: sync::atomic::Ordering) {
        // We avoid having to synchronize the number of sleepers by using fetch_add
        // instead of a compare and swap.
        //
        // We'll panic here if we've somehow underflowed the word. This
        // shouldn't be possible assuming SelfContainedMutex itself isn't buggy.
        let to_add = LOCKED_DISCONNECTED.checked_sub(LOCKED).unwrap();
        let prev = FutexWord::from(self.0.fetch_add(
            u32::from(FutexWord {
                lock_state: to_add,
                num_sleepers: 0,
            }),
            ord,
        ));
        assert_eq!(prev.lock_state, LOCKED);
    }

    pub fn load(&self, ord: sync::atomic::Ordering) -> FutexWord {
        self.0.load(ord).into()
    }

    pub fn compare_exchange(
        &self,
        current: FutexWord,
        new: FutexWord,
        success: sync::atomic::Ordering,
        failure: sync::atomic::Ordering,
    ) -> Result<FutexWord, FutexWord> {
        let raw_res = self
            .0
            .compare_exchange(current.into(), new.into(), success, failure);
        raw_res.map(FutexWord::from).map_err(FutexWord::from)
    }
}

#[repr(C)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct FutexWord {
    lock_state: u16,
    num_sleepers: u16,
}

impl FutexWord {
    const fn to_u32(self) -> u32 {
        ((self.lock_state as u32) << 16) | (self.num_sleepers as u32)
    }
}

impl From<u32> for FutexWord {
    fn from(val: u32) -> Self {
        Self {
            lock_state: (val >> 16).try_into().unwrap(),
            num_sleepers: (val & 0xff_ff).try_into().unwrap(),
        }
    }
}

impl From<FutexWord> for u32 {
    fn from(val: FutexWord) -> Self {
        val.to_u32()
    }
}

/// Simple mutex that is suitable for use in shared memory:
///
/// * It has a fixed layout (repr(C))
/// * It's self-contained; e.g. isn't boxed and doesn't refer
///   to global lock-state in this process's address space.
/// * Works across processes (e.g. doesn't use FUTEX_PRIVATE_FLAG)
///
/// Performance is optimized primarily for low-contention scenarios.
#[cfg_attr(not(loom), derive(VirtualAddressSpaceIndependent))]
#[repr(C)]
pub struct SelfContainedMutex<T> {
    futex: AtomicFutexWord,
    val: sync::UnsafeCell<T>,
}

unsafe impl<T> Send for SelfContainedMutex<T> where T: Send {}
unsafe impl<T> Sync for SelfContainedMutex<T> where T: Send {}

const UNLOCKED: u16 = 0;
const LOCKED: u16 = 1;
const LOCKED_DISCONNECTED: u16 = 2;

impl<T> SelfContainedMutex<T> {
    // TODO: merge with `new` when `AtomicFutexWord` supports a const `new`.
    #[cfg(not(loom))]
    pub const fn const_new(val: T) -> Self {
        Self {
            futex: AtomicFutexWord::const_new(FutexWord {
                lock_state: UNLOCKED,
                num_sleepers: 0,
            }),
            val: sync::UnsafeCell::new(val),
        }
    }

    pub fn new(val: T) -> Self {
        Self {
            futex: AtomicFutexWord::new(FutexWord {
                lock_state: UNLOCKED,
                num_sleepers: 0,
            }),
            val: sync::UnsafeCell::new(val),
        }
    }

    pub fn lock(&self) -> SelfContainedMutexGuard<T> {
        // On first attempt, optimistically assume the lock is uncontended.
        let mut current = FutexWord {
            lock_state: UNLOCKED,
            num_sleepers: 0,
        };
        loop {
            if current.lock_state == UNLOCKED {
                // Try to take the lock.
                let current_res = self.futex.compare_exchange(
                    current,
                    FutexWord {
                        lock_state: LOCKED,
                        num_sleepers: current.num_sleepers,
                    },
                    sync::Ordering::Acquire,
                    sync::Ordering::Relaxed,
                );
                current = match current_res {
                    Ok(_) => {
                        // We successfully took the lock.
                        break;
                    }
                    // We weren't able to take the lock.
                    Err(i) => i,
                };
            }

            // If the lock is available, try again now that we've sync'd the
            // rest of the futex word (num_sleepers).
            if current.lock_state == UNLOCKED {
                continue;
            }

            // Try to sleep on the futex.

            // Since incrementing is a read-modify-write operation, this does
            // not break the release sequence since the last unlock.
            current = self.futex.inc_sleepers_and_fetch(sync::Ordering::Relaxed);
            loop {
                // We may now see an UNLOCKED state from having done the increment
                // above, or the load below.
                if current.lock_state == UNLOCKED {
                    break;
                }
                match sync::futex_wait(&self.futex.0, current.into()) {
                    Ok(_) | Err(rustix::io::Errno::INTR) => break,
                    Err(rustix::io::Errno::AGAIN) => {
                        // We may have gotten this because another thread is
                        // also trying to sleep on the futex, and just
                        // incremented the sleeper count. If we naively
                        // decremented the sleeper count and ran the whole lock
                        // loop again, both threads could theoretically end up
                        // in a live-lock where neither ever gets to sleep on
                        // the futex.
                        //
                        // To avoid that, we update our current view of the
                        // atomic and consider trying again before removing
                        // ourselves from the sleeper count.
                        current = self.futex.load(sync::Ordering::Relaxed)
                    }
                    Err(e) => panic!("Unexpected futex error {:?}", e),
                };
            }
            // Since decrementing is a read-modify-write operation, this does
            // not break the release sequence since the last unlock.
            current = self.futex.dec_sleepers_and_fetch(sync::Ordering::Relaxed);
        }
        SelfContainedMutexGuard {
            mutex: Some(self),
            ptr: Some(self.val.get_mut()),
            _phantom: PhantomData,
        }
    }

    pub fn lock_pinned<'a>(self: Pin<&'a Self>) -> Pin<SelfContainedMutexGuard<'a, T>> {
        // SAFETY: `SelfContainedMutexGuard` doesn't provide DerefMut when `T`
        // is `!Unpin`.
        unsafe { Pin::new_unchecked(self.get_ref().lock()) }
    }

    fn unlock(&self) {
        let current = self.futex.unlock_and_fetch(sync::Ordering::Release);

        // Only perform a FUTEX_WAKE operation if other threads are actually
        // sleeping on the lock.
        if current.num_sleepers > 0 {
            sync::futex_wake_one(&self.futex.0).unwrap();
        }
    }
}

pub struct SelfContainedMutexGuard<'a, T> {
    mutex: Option<&'a SelfContainedMutex<T>>,
    ptr: Option<sync::MutPtr<T>>,
    // For purposes of deriving Send, Sync, etc.,
    // this type should act as `&mut T`.
    _phantom: PhantomData<&'a mut T>,
}

impl<'a, T> SelfContainedMutexGuard<'a, T> {
    /// Drops the guard *without releasing the lock*.
    ///
    /// This is useful when a lock must be held across some span of code within
    /// a single thread, but it's difficult to pass the the guard between the
    /// two parts of the code.
    pub fn disconnect(mut self) {
        self.mutex
            .unwrap()
            .futex
            .disconnect(sync::Ordering::Relaxed);
        self.mutex.take();
        self.ptr.take();
    }

    /// Reconstitutes a guard that was previously disposed of via `disconnect`.
    ///
    /// Panics if the lock is not disconnected (i.e. if `reconnect` was
    /// already called).
    ///
    /// Ok to reconnect from a different thread,though some external
    /// synchronization may be needed to ensure the mutex is disconnected before
    /// it tries to do so.
    pub fn reconnect(mutex: &'a SelfContainedMutex<T>) -> Self {
        let mut current = FutexWord {
            lock_state: LOCKED_DISCONNECTED,
            num_sleepers: 0,
        };
        loop {
            assert_eq!(current.lock_state, LOCKED_DISCONNECTED);
            let current_res = mutex.futex.compare_exchange(
                current,
                FutexWord {
                    lock_state: LOCKED,
                    num_sleepers: current.num_sleepers,
                },
                sync::Ordering::Relaxed,
                sync::Ordering::Relaxed,
            );
            match current_res {
                Ok(_) => {
                    // Done.
                    return Self {
                        mutex: Some(mutex),
                        ptr: Some(mutex.val.get_mut()),
                        _phantom: PhantomData,
                    };
                }
                Err(c) => {
                    // Try again with updated state
                    current = c;
                }
            }
        }
    }

    /// Map the guard into a function of Pin<&mut T>.
    ///
    /// When T implements `Unpin`, the caller can just use deref_mut instead.
    ///
    // We can't provide an API that simply returns a Pin<&mut T>, since the Pin
    // API doesn't provide a way to get to the inner guard without consuming the outer Pin.
    pub fn map_pinned<F, O>(guard: Pin<Self>, f: F) -> O
    where
        F: FnOnce(Pin<&mut T>) -> O,
    {
        // SAFETY: We ensure that the &mut T made available from the unpinned guard isn't
        // moved-from, by only giving `f` access to a Pin<&mut T>.
        let guard: SelfContainedMutexGuard<T> = unsafe { Pin::into_inner_unchecked(guard) };
        // SAFETY: The pointer is valid because it came from the mutex, which we know is live.
        // The mutex ensures there can be no other live references to the internal data.
        let ref_t = unsafe { guard.ptr.as_ref().unwrap().deref() };
        // SAFETY: We know the original data is pinned, since the guard was Pin<Self>.
        let pinned_t: Pin<&mut T> = unsafe { Pin::new_unchecked(ref_t) };
        f(pinned_t)
    }
}

impl<'a, T> Drop for SelfContainedMutexGuard<'a, T> {
    fn drop(&mut self) {
        if let Some(mutex) = self.mutex {
            // We have to drop this pointer before unlocking when running
            // under loom, which could otherwise detect multiple mutable
            // references to the underlying cell. Under non loom, the drop
            // has no effect.
            #[allow(clippy::drop_non_drop)]
            drop(self.ptr.take());
            mutex.unlock();
        }
    }
}

impl<'a, T> core::ops::Deref for SelfContainedMutexGuard<'a, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        // We can't call self.ptr.as_ref().unwrap().deref() here, since that
        // would create a `&mut T`, and there could already exist a `&T`
        // borrowed from `&self`.
        // https://github.com/tokio-rs/loom/issues/293
        self.ptr.as_ref().unwrap().with(|p| unsafe { &*p })
    }
}

/// When T is Unpin, we can implement DerefMut. Otherwise it's unsafe
/// to do so, since SelfContainedMutex is an Archive type.
impl<'a, T> core::ops::DerefMut for SelfContainedMutexGuard<'a, T>
where
    T: Unpin,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        unsafe { self.ptr.as_ref().unwrap().deref() }
    }
}

// For unit tests see tests/scmutex-tests.rs