crossbeam_queue/
seg_queue.rs

1use alloc::boxed::Box;
2use core::cell::UnsafeCell;
3use core::fmt;
4use core::marker::PhantomData;
5use core::mem::MaybeUninit;
6use core::panic::{RefUnwindSafe, UnwindSafe};
7use core::ptr;
8use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12// Bits indicating the state of a slot:
13// * If a value has been written into the slot, `WRITE` is set.
14// * If a value has been read from the slot, `READ` is set.
15// * If the block is being destroyed, `DESTROY` is set.
16const WRITE: usize = 1;
17const READ: usize = 2;
18const DESTROY: usize = 4;
19
20// Each block covers one "lap" of indices.
21const LAP: usize = 32;
22// The maximum number of values a block can hold.
23const BLOCK_CAP: usize = LAP - 1;
24// How many lower bits are reserved for metadata.
25const SHIFT: usize = 1;
26// Indicates that the block is not the last one.
27const HAS_NEXT: usize = 1;
28
29/// A slot in a block.
30struct Slot<T> {
31    /// The value.
32    value: UnsafeCell<MaybeUninit<T>>,
33
34    /// The state of the slot.
35    state: AtomicUsize,
36}
37
38impl<T> Slot<T> {
39    const UNINIT: Self = Self {
40        value: UnsafeCell::new(MaybeUninit::uninit()),
41        state: AtomicUsize::new(0),
42    };
43
44    /// Waits until a value is written into the slot.
45    fn wait_write(&self) {
46        let backoff = Backoff::new();
47        while self.state.load(Ordering::Acquire) & WRITE == 0 {
48            backoff.snooze();
49        }
50    }
51}
52
53/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` values.
56struct Block<T> {
57    /// The next block in the linked list.
58    next: AtomicPtr<Block<T>>,
59
60    /// Slots for values.
61    slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65    /// Creates an empty block that starts at `start_index`.
66    fn new() -> Block<T> {
67        Self {
68            next: AtomicPtr::new(ptr::null_mut()),
69            slots: [Slot::UNINIT; BLOCK_CAP],
70        }
71    }
72
73    /// Waits until the next pointer is set.
74    fn wait_next(&self) -> *mut Block<T> {
75        let backoff = Backoff::new();
76        loop {
77            let next = self.next.load(Ordering::Acquire);
78            if !next.is_null() {
79                return next;
80            }
81            backoff.snooze();
82        }
83    }
84
85    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
86    unsafe fn destroy(this: *mut Block<T>, start: usize) {
87        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
88        // begun destruction of the block.
89        for i in start..BLOCK_CAP - 1 {
90            let slot = (*this).slots.get_unchecked(i);
91
92            // Mark the `DESTROY` bit if a thread is still using the slot.
93            if slot.state.load(Ordering::Acquire) & READ == 0
94                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
95            {
96                // If a thread is still using the slot, it will continue destruction of the block.
97                return;
98            }
99        }
100
101        // No thread is using the block, now it is safe to destroy it.
102        drop(Box::from_raw(this));
103    }
104}
105
106/// A position in a queue.
107struct Position<T> {
108    /// The index in the queue.
109    index: AtomicUsize,
110
111    /// The block in the linked list.
112    block: AtomicPtr<Block<T>>,
113}
114
115/// An unbounded multi-producer multi-consumer queue.
116///
117/// This queue is implemented as a linked list of segments, where each segment is a small buffer
118/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
119/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
120/// this queue is somewhat slower than [`ArrayQueue`].
121///
122/// [`ArrayQueue`]: super::ArrayQueue
123///
124/// # Examples
125///
126/// ```
127/// use crossbeam_queue::SegQueue;
128///
129/// let q = SegQueue::new();
130///
131/// q.push('a');
132/// q.push('b');
133///
134/// assert_eq!(q.pop(), Some('a'));
135/// assert_eq!(q.pop(), Some('b'));
136/// assert!(q.pop().is_none());
137/// ```
138pub struct SegQueue<T> {
139    /// The head of the queue.
140    head: CachePadded<Position<T>>,
141
142    /// The tail of the queue.
143    tail: CachePadded<Position<T>>,
144
145    /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
146    _marker: PhantomData<T>,
147}
148
149unsafe impl<T: Send> Send for SegQueue<T> {}
150unsafe impl<T: Send> Sync for SegQueue<T> {}
151
152impl<T> UnwindSafe for SegQueue<T> {}
153impl<T> RefUnwindSafe for SegQueue<T> {}
154
155impl<T> SegQueue<T> {
156    /// Creates a new unbounded queue.
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// use crossbeam_queue::SegQueue;
162    ///
163    /// let q = SegQueue::<i32>::new();
164    /// ```
165    pub const fn new() -> SegQueue<T> {
166        SegQueue {
167            head: CachePadded::new(Position {
168                block: AtomicPtr::new(ptr::null_mut()),
169                index: AtomicUsize::new(0),
170            }),
171            tail: CachePadded::new(Position {
172                block: AtomicPtr::new(ptr::null_mut()),
173                index: AtomicUsize::new(0),
174            }),
175            _marker: PhantomData,
176        }
177    }
178
179    /// Pushes an element into the queue.
180    ///
181    /// # Examples
182    ///
183    /// ```
184    /// use crossbeam_queue::SegQueue;
185    ///
186    /// let q = SegQueue::new();
187    ///
188    /// q.push(10);
189    /// q.push(20);
190    /// ```
191    pub fn push(&self, value: T) {
192        let backoff = Backoff::new();
193        let mut tail = self.tail.index.load(Ordering::Acquire);
194        let mut block = self.tail.block.load(Ordering::Acquire);
195        let mut next_block = None;
196
197        loop {
198            // Calculate the offset of the index into the block.
199            let offset = (tail >> SHIFT) % LAP;
200
201            // If we reached the end of the block, wait until the next one is installed.
202            if offset == BLOCK_CAP {
203                backoff.snooze();
204                tail = self.tail.index.load(Ordering::Acquire);
205                block = self.tail.block.load(Ordering::Acquire);
206                continue;
207            }
208
209            // If we're going to have to install the next block, allocate it in advance in order to
210            // make the wait for other threads as short as possible.
211            if offset + 1 == BLOCK_CAP && next_block.is_none() {
212                next_block = Some(Box::new(Block::<T>::new()));
213            }
214
215            // If this is the first push operation, we need to allocate the first block.
216            if block.is_null() {
217                let new = Box::into_raw(Box::new(Block::<T>::new()));
218
219                if self
220                    .tail
221                    .block
222                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
223                    .is_ok()
224                {
225                    self.head.block.store(new, Ordering::Release);
226                    block = new;
227                } else {
228                    next_block = unsafe { Some(Box::from_raw(new)) };
229                    tail = self.tail.index.load(Ordering::Acquire);
230                    block = self.tail.block.load(Ordering::Acquire);
231                    continue;
232                }
233            }
234
235            let new_tail = tail + (1 << SHIFT);
236
237            // Try advancing the tail forward.
238            match self.tail.index.compare_exchange_weak(
239                tail,
240                new_tail,
241                Ordering::SeqCst,
242                Ordering::Acquire,
243            ) {
244                Ok(_) => unsafe {
245                    // If we've reached the end of the block, install the next one.
246                    if offset + 1 == BLOCK_CAP {
247                        let next_block = Box::into_raw(next_block.unwrap());
248                        let next_index = new_tail.wrapping_add(1 << SHIFT);
249
250                        self.tail.block.store(next_block, Ordering::Release);
251                        self.tail.index.store(next_index, Ordering::Release);
252                        (*block).next.store(next_block, Ordering::Release);
253                    }
254
255                    // Write the value into the slot.
256                    let slot = (*block).slots.get_unchecked(offset);
257                    slot.value.get().write(MaybeUninit::new(value));
258                    slot.state.fetch_or(WRITE, Ordering::Release);
259
260                    return;
261                },
262                Err(t) => {
263                    tail = t;
264                    block = self.tail.block.load(Ordering::Acquire);
265                    backoff.spin();
266                }
267            }
268        }
269    }
270
271    /// Pops an element from the queue.
272    ///
273    /// If the queue is empty, `None` is returned.
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// use crossbeam_queue::SegQueue;
279    ///
280    /// let q = SegQueue::new();
281    ///
282    /// q.push(10);
283    /// assert_eq!(q.pop(), Some(10));
284    /// assert!(q.pop().is_none());
285    /// ```
286    pub fn pop(&self) -> Option<T> {
287        let backoff = Backoff::new();
288        let mut head = self.head.index.load(Ordering::Acquire);
289        let mut block = self.head.block.load(Ordering::Acquire);
290
291        loop {
292            // Calculate the offset of the index into the block.
293            let offset = (head >> SHIFT) % LAP;
294
295            // If we reached the end of the block, wait until the next one is installed.
296            if offset == BLOCK_CAP {
297                backoff.snooze();
298                head = self.head.index.load(Ordering::Acquire);
299                block = self.head.block.load(Ordering::Acquire);
300                continue;
301            }
302
303            let mut new_head = head + (1 << SHIFT);
304
305            if new_head & HAS_NEXT == 0 {
306                atomic::fence(Ordering::SeqCst);
307                let tail = self.tail.index.load(Ordering::Relaxed);
308
309                // If the tail equals the head, that means the queue is empty.
310                if head >> SHIFT == tail >> SHIFT {
311                    return None;
312                }
313
314                // If head and tail are not in the same block, set `HAS_NEXT` in head.
315                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
316                    new_head |= HAS_NEXT;
317                }
318            }
319
320            // The block can be null here only if the first push operation is in progress. In that
321            // case, just wait until it gets initialized.
322            if block.is_null() {
323                backoff.snooze();
324                head = self.head.index.load(Ordering::Acquire);
325                block = self.head.block.load(Ordering::Acquire);
326                continue;
327            }
328
329            // Try moving the head index forward.
330            match self.head.index.compare_exchange_weak(
331                head,
332                new_head,
333                Ordering::SeqCst,
334                Ordering::Acquire,
335            ) {
336                Ok(_) => unsafe {
337                    // If we've reached the end of the block, move to the next one.
338                    if offset + 1 == BLOCK_CAP {
339                        let next = (*block).wait_next();
340                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
341                        if !(*next).next.load(Ordering::Relaxed).is_null() {
342                            next_index |= HAS_NEXT;
343                        }
344
345                        self.head.block.store(next, Ordering::Release);
346                        self.head.index.store(next_index, Ordering::Release);
347                    }
348
349                    // Read the value.
350                    let slot = (*block).slots.get_unchecked(offset);
351                    slot.wait_write();
352                    let value = slot.value.get().read().assume_init();
353
354                    // Destroy the block if we've reached the end, or if another thread wanted to
355                    // destroy but couldn't because we were busy reading from the slot.
356                    if offset + 1 == BLOCK_CAP {
357                        Block::destroy(block, 0);
358                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
359                        Block::destroy(block, offset + 1);
360                    }
361
362                    return Some(value);
363                },
364                Err(h) => {
365                    head = h;
366                    block = self.head.block.load(Ordering::Acquire);
367                    backoff.spin();
368                }
369            }
370        }
371    }
372
373    /// Returns `true` if the queue is empty.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// use crossbeam_queue::SegQueue;
379    ///
380    /// let q = SegQueue::new();
381    ///
382    /// assert!(q.is_empty());
383    /// q.push(1);
384    /// assert!(!q.is_empty());
385    /// ```
386    pub fn is_empty(&self) -> bool {
387        let head = self.head.index.load(Ordering::SeqCst);
388        let tail = self.tail.index.load(Ordering::SeqCst);
389        head >> SHIFT == tail >> SHIFT
390    }
391
392    /// Returns the number of elements in the queue.
393    ///
394    /// # Examples
395    ///
396    /// ```
397    /// use crossbeam_queue::SegQueue;
398    ///
399    /// let q = SegQueue::new();
400    /// assert_eq!(q.len(), 0);
401    ///
402    /// q.push(10);
403    /// assert_eq!(q.len(), 1);
404    ///
405    /// q.push(20);
406    /// assert_eq!(q.len(), 2);
407    /// ```
408    pub fn len(&self) -> usize {
409        loop {
410            // Load the tail index, then load the head index.
411            let mut tail = self.tail.index.load(Ordering::SeqCst);
412            let mut head = self.head.index.load(Ordering::SeqCst);
413
414            // If the tail index didn't change, we've got consistent indices to work with.
415            if self.tail.index.load(Ordering::SeqCst) == tail {
416                // Erase the lower bits.
417                tail &= !((1 << SHIFT) - 1);
418                head &= !((1 << SHIFT) - 1);
419
420                // Fix up indices if they fall onto block ends.
421                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
422                    tail = tail.wrapping_add(1 << SHIFT);
423                }
424                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
425                    head = head.wrapping_add(1 << SHIFT);
426                }
427
428                // Rotate indices so that head falls into the first block.
429                let lap = (head >> SHIFT) / LAP;
430                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
431                head = head.wrapping_sub((lap * LAP) << SHIFT);
432
433                // Remove the lower bits.
434                tail >>= SHIFT;
435                head >>= SHIFT;
436
437                // Return the difference minus the number of blocks between tail and head.
438                return tail - head - tail / LAP;
439            }
440        }
441    }
442}
443
444impl<T> Drop for SegQueue<T> {
445    fn drop(&mut self) {
446        let mut head = *self.head.index.get_mut();
447        let mut tail = *self.tail.index.get_mut();
448        let mut block = *self.head.block.get_mut();
449
450        // Erase the lower bits.
451        head &= !((1 << SHIFT) - 1);
452        tail &= !((1 << SHIFT) - 1);
453
454        unsafe {
455            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
456            while head != tail {
457                let offset = (head >> SHIFT) % LAP;
458
459                if offset < BLOCK_CAP {
460                    // Drop the value in the slot.
461                    let slot = (*block).slots.get_unchecked(offset);
462                    (*slot.value.get()).assume_init_drop();
463                } else {
464                    // Deallocate the block and move to the next one.
465                    let next = *(*block).next.get_mut();
466                    drop(Box::from_raw(block));
467                    block = next;
468                }
469
470                head = head.wrapping_add(1 << SHIFT);
471            }
472
473            // Deallocate the last remaining block.
474            if !block.is_null() {
475                drop(Box::from_raw(block));
476            }
477        }
478    }
479}
480
481impl<T> fmt::Debug for SegQueue<T> {
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483        f.pad("SegQueue { .. }")
484    }
485}
486
487impl<T> Default for SegQueue<T> {
488    fn default() -> SegQueue<T> {
489        SegQueue::new()
490    }
491}
492
493impl<T> IntoIterator for SegQueue<T> {
494    type Item = T;
495
496    type IntoIter = IntoIter<T>;
497
498    fn into_iter(self) -> Self::IntoIter {
499        IntoIter { value: self }
500    }
501}
502
503#[derive(Debug)]
504pub struct IntoIter<T> {
505    value: SegQueue<T>,
506}
507
508impl<T> Iterator for IntoIter<T> {
509    type Item = T;
510
511    fn next(&mut self) -> Option<Self::Item> {
512        let value = &mut self.value;
513        let head = *value.head.index.get_mut();
514        let tail = *value.tail.index.get_mut();
515        if head >> SHIFT == tail >> SHIFT {
516            None
517        } else {
518            let block = *value.head.block.get_mut();
519            let offset = (head >> SHIFT) % LAP;
520
521            // SAFETY: We have mutable access to this, so we can read without
522            // worrying about concurrency. Furthermore, we know this is
523            // initialized because it is the value pointed at by `value.head`
524            // and this is a non-empty queue.
525            let item = unsafe {
526                let slot = (*block).slots.get_unchecked(offset);
527                slot.value.get().read().assume_init()
528            };
529            if offset + 1 == BLOCK_CAP {
530                // Deallocate the block and move to the next one.
531                // SAFETY: The block is initialized because we've been reading
532                // from it this entire time. We can drop it b/c everything has
533                // been read out of it, so nothing is pointing to it anymore.
534                unsafe {
535                    let next = *(*block).next.get_mut();
536                    drop(Box::from_raw(block));
537                    *value.head.block.get_mut() = next;
538                }
539                // The last value in a block is empty, so skip it
540                *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
541                // Double-check that we're pointing to the first item in a block.
542                debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
543            } else {
544                *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
545            }
546            Some(item)
547        }
548    }
549}