crossbeam_queue/seg_queue.rs
1use alloc::boxed::Box;
2use core::cell::UnsafeCell;
3use core::fmt;
4use core::marker::PhantomData;
5use core::mem::MaybeUninit;
6use core::panic::{RefUnwindSafe, UnwindSafe};
7use core::ptr;
8use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12// Bits indicating the state of a slot:
13// * If a value has been written into the slot, `WRITE` is set.
14// * If a value has been read from the slot, `READ` is set.
15// * If the block is being destroyed, `DESTROY` is set.
16const WRITE: usize = 1;
17const READ: usize = 2;
18const DESTROY: usize = 4;
19
20// Each block covers one "lap" of indices.
21const LAP: usize = 32;
22// The maximum number of values a block can hold.
23const BLOCK_CAP: usize = LAP - 1;
24// How many lower bits are reserved for metadata.
25const SHIFT: usize = 1;
26// Indicates that the block is not the last one.
27const HAS_NEXT: usize = 1;
28
29/// A slot in a block.
30struct Slot<T> {
31 /// The value.
32 value: UnsafeCell<MaybeUninit<T>>,
33
34 /// The state of the slot.
35 state: AtomicUsize,
36}
37
38impl<T> Slot<T> {
39 const UNINIT: Self = Self {
40 value: UnsafeCell::new(MaybeUninit::uninit()),
41 state: AtomicUsize::new(0),
42 };
43
44 /// Waits until a value is written into the slot.
45 fn wait_write(&self) {
46 let backoff = Backoff::new();
47 while self.state.load(Ordering::Acquire) & WRITE == 0 {
48 backoff.snooze();
49 }
50 }
51}
52
53/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` values.
56struct Block<T> {
57 /// The next block in the linked list.
58 next: AtomicPtr<Block<T>>,
59
60 /// Slots for values.
61 slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65 /// Creates an empty block that starts at `start_index`.
66 fn new() -> Block<T> {
67 Self {
68 next: AtomicPtr::new(ptr::null_mut()),
69 slots: [Slot::UNINIT; BLOCK_CAP],
70 }
71 }
72
73 /// Waits until the next pointer is set.
74 fn wait_next(&self) -> *mut Block<T> {
75 let backoff = Backoff::new();
76 loop {
77 let next = self.next.load(Ordering::Acquire);
78 if !next.is_null() {
79 return next;
80 }
81 backoff.snooze();
82 }
83 }
84
85 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
86 unsafe fn destroy(this: *mut Block<T>, start: usize) {
87 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
88 // begun destruction of the block.
89 for i in start..BLOCK_CAP - 1 {
90 let slot = (*this).slots.get_unchecked(i);
91
92 // Mark the `DESTROY` bit if a thread is still using the slot.
93 if slot.state.load(Ordering::Acquire) & READ == 0
94 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
95 {
96 // If a thread is still using the slot, it will continue destruction of the block.
97 return;
98 }
99 }
100
101 // No thread is using the block, now it is safe to destroy it.
102 drop(Box::from_raw(this));
103 }
104}
105
106/// A position in a queue.
107struct Position<T> {
108 /// The index in the queue.
109 index: AtomicUsize,
110
111 /// The block in the linked list.
112 block: AtomicPtr<Block<T>>,
113}
114
115/// An unbounded multi-producer multi-consumer queue.
116///
117/// This queue is implemented as a linked list of segments, where each segment is a small buffer
118/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
119/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
120/// this queue is somewhat slower than [`ArrayQueue`].
121///
122/// [`ArrayQueue`]: super::ArrayQueue
123///
124/// # Examples
125///
126/// ```
127/// use crossbeam_queue::SegQueue;
128///
129/// let q = SegQueue::new();
130///
131/// q.push('a');
132/// q.push('b');
133///
134/// assert_eq!(q.pop(), Some('a'));
135/// assert_eq!(q.pop(), Some('b'));
136/// assert!(q.pop().is_none());
137/// ```
138pub struct SegQueue<T> {
139 /// The head of the queue.
140 head: CachePadded<Position<T>>,
141
142 /// The tail of the queue.
143 tail: CachePadded<Position<T>>,
144
145 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
146 _marker: PhantomData<T>,
147}
148
149unsafe impl<T: Send> Send for SegQueue<T> {}
150unsafe impl<T: Send> Sync for SegQueue<T> {}
151
152impl<T> UnwindSafe for SegQueue<T> {}
153impl<T> RefUnwindSafe for SegQueue<T> {}
154
155impl<T> SegQueue<T> {
156 /// Creates a new unbounded queue.
157 ///
158 /// # Examples
159 ///
160 /// ```
161 /// use crossbeam_queue::SegQueue;
162 ///
163 /// let q = SegQueue::<i32>::new();
164 /// ```
165 pub const fn new() -> SegQueue<T> {
166 SegQueue {
167 head: CachePadded::new(Position {
168 block: AtomicPtr::new(ptr::null_mut()),
169 index: AtomicUsize::new(0),
170 }),
171 tail: CachePadded::new(Position {
172 block: AtomicPtr::new(ptr::null_mut()),
173 index: AtomicUsize::new(0),
174 }),
175 _marker: PhantomData,
176 }
177 }
178
179 /// Pushes an element into the queue.
180 ///
181 /// # Examples
182 ///
183 /// ```
184 /// use crossbeam_queue::SegQueue;
185 ///
186 /// let q = SegQueue::new();
187 ///
188 /// q.push(10);
189 /// q.push(20);
190 /// ```
191 pub fn push(&self, value: T) {
192 let backoff = Backoff::new();
193 let mut tail = self.tail.index.load(Ordering::Acquire);
194 let mut block = self.tail.block.load(Ordering::Acquire);
195 let mut next_block = None;
196
197 loop {
198 // Calculate the offset of the index into the block.
199 let offset = (tail >> SHIFT) % LAP;
200
201 // If we reached the end of the block, wait until the next one is installed.
202 if offset == BLOCK_CAP {
203 backoff.snooze();
204 tail = self.tail.index.load(Ordering::Acquire);
205 block = self.tail.block.load(Ordering::Acquire);
206 continue;
207 }
208
209 // If we're going to have to install the next block, allocate it in advance in order to
210 // make the wait for other threads as short as possible.
211 if offset + 1 == BLOCK_CAP && next_block.is_none() {
212 next_block = Some(Box::new(Block::<T>::new()));
213 }
214
215 // If this is the first push operation, we need to allocate the first block.
216 if block.is_null() {
217 let new = Box::into_raw(Box::new(Block::<T>::new()));
218
219 if self
220 .tail
221 .block
222 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
223 .is_ok()
224 {
225 self.head.block.store(new, Ordering::Release);
226 block = new;
227 } else {
228 next_block = unsafe { Some(Box::from_raw(new)) };
229 tail = self.tail.index.load(Ordering::Acquire);
230 block = self.tail.block.load(Ordering::Acquire);
231 continue;
232 }
233 }
234
235 let new_tail = tail + (1 << SHIFT);
236
237 // Try advancing the tail forward.
238 match self.tail.index.compare_exchange_weak(
239 tail,
240 new_tail,
241 Ordering::SeqCst,
242 Ordering::Acquire,
243 ) {
244 Ok(_) => unsafe {
245 // If we've reached the end of the block, install the next one.
246 if offset + 1 == BLOCK_CAP {
247 let next_block = Box::into_raw(next_block.unwrap());
248 let next_index = new_tail.wrapping_add(1 << SHIFT);
249
250 self.tail.block.store(next_block, Ordering::Release);
251 self.tail.index.store(next_index, Ordering::Release);
252 (*block).next.store(next_block, Ordering::Release);
253 }
254
255 // Write the value into the slot.
256 let slot = (*block).slots.get_unchecked(offset);
257 slot.value.get().write(MaybeUninit::new(value));
258 slot.state.fetch_or(WRITE, Ordering::Release);
259
260 return;
261 },
262 Err(t) => {
263 tail = t;
264 block = self.tail.block.load(Ordering::Acquire);
265 backoff.spin();
266 }
267 }
268 }
269 }
270
271 /// Pops an element from the queue.
272 ///
273 /// If the queue is empty, `None` is returned.
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// use crossbeam_queue::SegQueue;
279 ///
280 /// let q = SegQueue::new();
281 ///
282 /// q.push(10);
283 /// assert_eq!(q.pop(), Some(10));
284 /// assert!(q.pop().is_none());
285 /// ```
286 pub fn pop(&self) -> Option<T> {
287 let backoff = Backoff::new();
288 let mut head = self.head.index.load(Ordering::Acquire);
289 let mut block = self.head.block.load(Ordering::Acquire);
290
291 loop {
292 // Calculate the offset of the index into the block.
293 let offset = (head >> SHIFT) % LAP;
294
295 // If we reached the end of the block, wait until the next one is installed.
296 if offset == BLOCK_CAP {
297 backoff.snooze();
298 head = self.head.index.load(Ordering::Acquire);
299 block = self.head.block.load(Ordering::Acquire);
300 continue;
301 }
302
303 let mut new_head = head + (1 << SHIFT);
304
305 if new_head & HAS_NEXT == 0 {
306 atomic::fence(Ordering::SeqCst);
307 let tail = self.tail.index.load(Ordering::Relaxed);
308
309 // If the tail equals the head, that means the queue is empty.
310 if head >> SHIFT == tail >> SHIFT {
311 return None;
312 }
313
314 // If head and tail are not in the same block, set `HAS_NEXT` in head.
315 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
316 new_head |= HAS_NEXT;
317 }
318 }
319
320 // The block can be null here only if the first push operation is in progress. In that
321 // case, just wait until it gets initialized.
322 if block.is_null() {
323 backoff.snooze();
324 head = self.head.index.load(Ordering::Acquire);
325 block = self.head.block.load(Ordering::Acquire);
326 continue;
327 }
328
329 // Try moving the head index forward.
330 match self.head.index.compare_exchange_weak(
331 head,
332 new_head,
333 Ordering::SeqCst,
334 Ordering::Acquire,
335 ) {
336 Ok(_) => unsafe {
337 // If we've reached the end of the block, move to the next one.
338 if offset + 1 == BLOCK_CAP {
339 let next = (*block).wait_next();
340 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
341 if !(*next).next.load(Ordering::Relaxed).is_null() {
342 next_index |= HAS_NEXT;
343 }
344
345 self.head.block.store(next, Ordering::Release);
346 self.head.index.store(next_index, Ordering::Release);
347 }
348
349 // Read the value.
350 let slot = (*block).slots.get_unchecked(offset);
351 slot.wait_write();
352 let value = slot.value.get().read().assume_init();
353
354 // Destroy the block if we've reached the end, or if another thread wanted to
355 // destroy but couldn't because we were busy reading from the slot.
356 if offset + 1 == BLOCK_CAP {
357 Block::destroy(block, 0);
358 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
359 Block::destroy(block, offset + 1);
360 }
361
362 return Some(value);
363 },
364 Err(h) => {
365 head = h;
366 block = self.head.block.load(Ordering::Acquire);
367 backoff.spin();
368 }
369 }
370 }
371 }
372
373 /// Returns `true` if the queue is empty.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use crossbeam_queue::SegQueue;
379 ///
380 /// let q = SegQueue::new();
381 ///
382 /// assert!(q.is_empty());
383 /// q.push(1);
384 /// assert!(!q.is_empty());
385 /// ```
386 pub fn is_empty(&self) -> bool {
387 let head = self.head.index.load(Ordering::SeqCst);
388 let tail = self.tail.index.load(Ordering::SeqCst);
389 head >> SHIFT == tail >> SHIFT
390 }
391
392 /// Returns the number of elements in the queue.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use crossbeam_queue::SegQueue;
398 ///
399 /// let q = SegQueue::new();
400 /// assert_eq!(q.len(), 0);
401 ///
402 /// q.push(10);
403 /// assert_eq!(q.len(), 1);
404 ///
405 /// q.push(20);
406 /// assert_eq!(q.len(), 2);
407 /// ```
408 pub fn len(&self) -> usize {
409 loop {
410 // Load the tail index, then load the head index.
411 let mut tail = self.tail.index.load(Ordering::SeqCst);
412 let mut head = self.head.index.load(Ordering::SeqCst);
413
414 // If the tail index didn't change, we've got consistent indices to work with.
415 if self.tail.index.load(Ordering::SeqCst) == tail {
416 // Erase the lower bits.
417 tail &= !((1 << SHIFT) - 1);
418 head &= !((1 << SHIFT) - 1);
419
420 // Fix up indices if they fall onto block ends.
421 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
422 tail = tail.wrapping_add(1 << SHIFT);
423 }
424 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
425 head = head.wrapping_add(1 << SHIFT);
426 }
427
428 // Rotate indices so that head falls into the first block.
429 let lap = (head >> SHIFT) / LAP;
430 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
431 head = head.wrapping_sub((lap * LAP) << SHIFT);
432
433 // Remove the lower bits.
434 tail >>= SHIFT;
435 head >>= SHIFT;
436
437 // Return the difference minus the number of blocks between tail and head.
438 return tail - head - tail / LAP;
439 }
440 }
441 }
442}
443
444impl<T> Drop for SegQueue<T> {
445 fn drop(&mut self) {
446 let mut head = *self.head.index.get_mut();
447 let mut tail = *self.tail.index.get_mut();
448 let mut block = *self.head.block.get_mut();
449
450 // Erase the lower bits.
451 head &= !((1 << SHIFT) - 1);
452 tail &= !((1 << SHIFT) - 1);
453
454 unsafe {
455 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
456 while head != tail {
457 let offset = (head >> SHIFT) % LAP;
458
459 if offset < BLOCK_CAP {
460 // Drop the value in the slot.
461 let slot = (*block).slots.get_unchecked(offset);
462 (*slot.value.get()).assume_init_drop();
463 } else {
464 // Deallocate the block and move to the next one.
465 let next = *(*block).next.get_mut();
466 drop(Box::from_raw(block));
467 block = next;
468 }
469
470 head = head.wrapping_add(1 << SHIFT);
471 }
472
473 // Deallocate the last remaining block.
474 if !block.is_null() {
475 drop(Box::from_raw(block));
476 }
477 }
478 }
479}
480
481impl<T> fmt::Debug for SegQueue<T> {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483 f.pad("SegQueue { .. }")
484 }
485}
486
487impl<T> Default for SegQueue<T> {
488 fn default() -> SegQueue<T> {
489 SegQueue::new()
490 }
491}
492
493impl<T> IntoIterator for SegQueue<T> {
494 type Item = T;
495
496 type IntoIter = IntoIter<T>;
497
498 fn into_iter(self) -> Self::IntoIter {
499 IntoIter { value: self }
500 }
501}
502
503#[derive(Debug)]
504pub struct IntoIter<T> {
505 value: SegQueue<T>,
506}
507
508impl<T> Iterator for IntoIter<T> {
509 type Item = T;
510
511 fn next(&mut self) -> Option<Self::Item> {
512 let value = &mut self.value;
513 let head = *value.head.index.get_mut();
514 let tail = *value.tail.index.get_mut();
515 if head >> SHIFT == tail >> SHIFT {
516 None
517 } else {
518 let block = *value.head.block.get_mut();
519 let offset = (head >> SHIFT) % LAP;
520
521 // SAFETY: We have mutable access to this, so we can read without
522 // worrying about concurrency. Furthermore, we know this is
523 // initialized because it is the value pointed at by `value.head`
524 // and this is a non-empty queue.
525 let item = unsafe {
526 let slot = (*block).slots.get_unchecked(offset);
527 slot.value.get().read().assume_init()
528 };
529 if offset + 1 == BLOCK_CAP {
530 // Deallocate the block and move to the next one.
531 // SAFETY: The block is initialized because we've been reading
532 // from it this entire time. We can drop it b/c everything has
533 // been read out of it, so nothing is pointing to it anymore.
534 unsafe {
535 let next = *(*block).next.get_mut();
536 drop(Box::from_raw(block));
537 *value.head.block.get_mut() = next;
538 }
539 // The last value in a block is empty, so skip it
540 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
541 // Double-check that we're pointing to the first item in a block.
542 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
543 } else {
544 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
545 }
546 Some(item)
547 }
548 }
549}