crossbeam_deque/
deque.rs

1use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem::{self, MaybeUninit};
6use std::ptr;
7use std::slice;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crossbeam_epoch::{self as epoch, Atomic, Owned};
12use crossbeam_utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27    /// Pointer to the allocated memory.
28    ptr: *mut T,
29
30    /// Capacity of the buffer. Always a power of two.
31    cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37    /// Allocates a new buffer with the specified capacity.
38    fn alloc(cap: usize) -> Buffer<T> {
39        debug_assert_eq!(cap, cap.next_power_of_two());
40
41        let ptr = Box::into_raw(
42            (0..cap)
43                .map(|_| MaybeUninit::<T>::uninit())
44                .collect::<Box<[_]>>(),
45        )
46        .cast::<T>();
47
48        Buffer { ptr, cap }
49    }
50
51    /// Deallocates the buffer.
52    unsafe fn dealloc(self) {
53        drop(Box::from_raw(slice::from_raw_parts_mut(
54            self.ptr.cast::<MaybeUninit<T>>(),
55            self.cap,
56        )));
57    }
58
59    /// Returns a pointer to the task at the specified `index`.
60    unsafe fn at(&self, index: isize) -> *mut T {
61        // `self.cap` is always a power of two.
62        // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
63        // don't actually have the right to access this memory.
64        self.ptr.offset(index & (self.cap - 1) as isize)
65    }
66
67    /// Writes `task` into the specified `index`.
68    ///
69    /// This method might be concurrently called with another `read` at the same index, which is
70    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
71    /// that would be more expensive and difficult to implement generically for all types `T`.
72    /// Hence, as a hack, we use a volatile write instead.
73    unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
74        ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
75    }
76
77    /// Reads a task from the specified `index`.
78    ///
79    /// This method might be concurrently called with another `write` at the same index, which is
80    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
81    /// that would be more expensive and difficult to implement generically for all types `T`.
82    /// Hence, as a hack, we use a volatile load instead.
83    unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
84        ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
85    }
86}
87
88impl<T> Clone for Buffer<T> {
89    fn clone(&self) -> Buffer<T> {
90        *self
91    }
92}
93
94impl<T> Copy for Buffer<T> {}
95
96/// Internal queue data shared between the worker and stealers.
97///
98/// The implementation is based on the following work:
99///
100/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
101/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
102///    PPoPP 2013.][weak-mem]
103/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
104///    atomics. OOPSLA 2013.][checker]
105///
106/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
107/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
108/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
109struct Inner<T> {
110    /// The front index.
111    front: AtomicIsize,
112
113    /// The back index.
114    back: AtomicIsize,
115
116    /// The underlying buffer.
117    buffer: CachePadded<Atomic<Buffer<T>>>,
118}
119
120impl<T> Drop for Inner<T> {
121    fn drop(&mut self) {
122        // Load the back index, front index, and buffer.
123        let b = *self.back.get_mut();
124        let f = *self.front.get_mut();
125
126        unsafe {
127            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
128
129            // Go through the buffer from front to back and drop all tasks in the queue.
130            let mut i = f;
131            while i != b {
132                buffer.deref().at(i).drop_in_place();
133                i = i.wrapping_add(1);
134            }
135
136            // Free the memory allocated by the buffer.
137            buffer.into_owned().into_box().dealloc();
138        }
139    }
140}
141
142/// Worker queue flavor: FIFO or LIFO.
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144enum Flavor {
145    /// The first-in first-out flavor.
146    Fifo,
147
148    /// The last-in first-out flavor.
149    Lifo,
150}
151
152/// A worker queue.
153///
154/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
155/// tasks from it. Task schedulers typically create a single worker queue per thread.
156///
157/// # Examples
158///
159/// A FIFO worker:
160///
161/// ```
162/// use crossbeam_deque::{Steal, Worker};
163///
164/// let w = Worker::new_fifo();
165/// let s = w.stealer();
166///
167/// w.push(1);
168/// w.push(2);
169/// w.push(3);
170///
171/// assert_eq!(s.steal(), Steal::Success(1));
172/// assert_eq!(w.pop(), Some(2));
173/// assert_eq!(w.pop(), Some(3));
174/// ```
175///
176/// A LIFO worker:
177///
178/// ```
179/// use crossbeam_deque::{Steal, Worker};
180///
181/// let w = Worker::new_lifo();
182/// let s = w.stealer();
183///
184/// w.push(1);
185/// w.push(2);
186/// w.push(3);
187///
188/// assert_eq!(s.steal(), Steal::Success(1));
189/// assert_eq!(w.pop(), Some(3));
190/// assert_eq!(w.pop(), Some(2));
191/// ```
192pub struct Worker<T> {
193    /// A reference to the inner representation of the queue.
194    inner: Arc<CachePadded<Inner<T>>>,
195
196    /// A copy of `inner.buffer` for quick access.
197    buffer: Cell<Buffer<T>>,
198
199    /// The flavor of the queue.
200    flavor: Flavor,
201
202    /// Indicates that the worker cannot be shared among threads.
203    _marker: PhantomData<*mut ()>, // !Send + !Sync
204}
205
206unsafe impl<T: Send> Send for Worker<T> {}
207
208impl<T> Worker<T> {
209    /// Creates a FIFO worker queue.
210    ///
211    /// Tasks are pushed and popped from opposite ends.
212    ///
213    /// # Examples
214    ///
215    /// ```
216    /// use crossbeam_deque::Worker;
217    ///
218    /// let w = Worker::<i32>::new_fifo();
219    /// ```
220    pub fn new_fifo() -> Worker<T> {
221        let buffer = Buffer::alloc(MIN_CAP);
222
223        let inner = Arc::new(CachePadded::new(Inner {
224            front: AtomicIsize::new(0),
225            back: AtomicIsize::new(0),
226            buffer: CachePadded::new(Atomic::new(buffer)),
227        }));
228
229        Worker {
230            inner,
231            buffer: Cell::new(buffer),
232            flavor: Flavor::Fifo,
233            _marker: PhantomData,
234        }
235    }
236
237    /// Creates a LIFO worker queue.
238    ///
239    /// Tasks are pushed and popped from the same end.
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// use crossbeam_deque::Worker;
245    ///
246    /// let w = Worker::<i32>::new_lifo();
247    /// ```
248    pub fn new_lifo() -> Worker<T> {
249        let buffer = Buffer::alloc(MIN_CAP);
250
251        let inner = Arc::new(CachePadded::new(Inner {
252            front: AtomicIsize::new(0),
253            back: AtomicIsize::new(0),
254            buffer: CachePadded::new(Atomic::new(buffer)),
255        }));
256
257        Worker {
258            inner,
259            buffer: Cell::new(buffer),
260            flavor: Flavor::Lifo,
261            _marker: PhantomData,
262        }
263    }
264
265    /// Creates a stealer for this queue.
266    ///
267    /// The returned stealer can be shared among threads and cloned.
268    ///
269    /// # Examples
270    ///
271    /// ```
272    /// use crossbeam_deque::Worker;
273    ///
274    /// let w = Worker::<i32>::new_lifo();
275    /// let s = w.stealer();
276    /// ```
277    pub fn stealer(&self) -> Stealer<T> {
278        Stealer {
279            inner: self.inner.clone(),
280            flavor: self.flavor,
281        }
282    }
283
284    /// Resizes the internal buffer to the new capacity of `new_cap`.
285    #[cold]
286    unsafe fn resize(&self, new_cap: usize) {
287        // Load the back index, front index, and buffer.
288        let b = self.inner.back.load(Ordering::Relaxed);
289        let f = self.inner.front.load(Ordering::Relaxed);
290        let buffer = self.buffer.get();
291
292        // Allocate a new buffer and copy data from the old buffer to the new one.
293        let new = Buffer::alloc(new_cap);
294        let mut i = f;
295        while i != b {
296            ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
297            i = i.wrapping_add(1);
298        }
299
300        let guard = &epoch::pin();
301
302        // Replace the old buffer with the new one.
303        self.buffer.replace(new);
304        let old =
305            self.inner
306                .buffer
307                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
308
309        // Destroy the old buffer later.
310        guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
311
312        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
313        // it as soon as possible.
314        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
315            guard.flush();
316        }
317    }
318
319    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
320    /// buffer.
321    fn reserve(&self, reserve_cap: usize) {
322        if reserve_cap > 0 {
323            // Compute the current length.
324            let b = self.inner.back.load(Ordering::Relaxed);
325            let f = self.inner.front.load(Ordering::SeqCst);
326            let len = b.wrapping_sub(f) as usize;
327
328            // The current capacity.
329            let cap = self.buffer.get().cap;
330
331            // Is there enough capacity to push `reserve_cap` tasks?
332            if cap - len < reserve_cap {
333                // Keep doubling the capacity as much as is needed.
334                let mut new_cap = cap * 2;
335                while new_cap - len < reserve_cap {
336                    new_cap *= 2;
337                }
338
339                // Resize the buffer.
340                unsafe {
341                    self.resize(new_cap);
342                }
343            }
344        }
345    }
346
347    /// Returns `true` if the queue is empty.
348    ///
349    /// ```
350    /// use crossbeam_deque::Worker;
351    ///
352    /// let w = Worker::new_lifo();
353    ///
354    /// assert!(w.is_empty());
355    /// w.push(1);
356    /// assert!(!w.is_empty());
357    /// ```
358    pub fn is_empty(&self) -> bool {
359        let b = self.inner.back.load(Ordering::Relaxed);
360        let f = self.inner.front.load(Ordering::SeqCst);
361        b.wrapping_sub(f) <= 0
362    }
363
364    /// Returns the number of tasks in the deque.
365    ///
366    /// ```
367    /// use crossbeam_deque::Worker;
368    ///
369    /// let w = Worker::new_lifo();
370    ///
371    /// assert_eq!(w.len(), 0);
372    /// w.push(1);
373    /// assert_eq!(w.len(), 1);
374    /// w.push(1);
375    /// assert_eq!(w.len(), 2);
376    /// ```
377    pub fn len(&self) -> usize {
378        let b = self.inner.back.load(Ordering::Relaxed);
379        let f = self.inner.front.load(Ordering::SeqCst);
380        b.wrapping_sub(f).max(0) as usize
381    }
382
383    /// Pushes a task into the queue.
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// use crossbeam_deque::Worker;
389    ///
390    /// let w = Worker::new_lifo();
391    /// w.push(1);
392    /// w.push(2);
393    /// ```
394    pub fn push(&self, task: T) {
395        // Load the back index, front index, and buffer.
396        let b = self.inner.back.load(Ordering::Relaxed);
397        let f = self.inner.front.load(Ordering::Acquire);
398        let mut buffer = self.buffer.get();
399
400        // Calculate the length of the queue.
401        let len = b.wrapping_sub(f);
402
403        // Is the queue full?
404        if len >= buffer.cap as isize {
405            // Yes. Grow the underlying buffer.
406            unsafe {
407                self.resize(2 * buffer.cap);
408            }
409            buffer = self.buffer.get();
410        }
411
412        // Write `task` into the slot.
413        unsafe {
414            buffer.write(b, MaybeUninit::new(task));
415        }
416
417        atomic::fence(Ordering::Release);
418
419        // Increment the back index.
420        //
421        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
422        // races because it doesn't understand fences.
423        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
424    }
425
426    /// Pops a task from the queue.
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// use crossbeam_deque::Worker;
432    ///
433    /// let w = Worker::new_fifo();
434    /// w.push(1);
435    /// w.push(2);
436    ///
437    /// assert_eq!(w.pop(), Some(1));
438    /// assert_eq!(w.pop(), Some(2));
439    /// assert_eq!(w.pop(), None);
440    /// ```
441    pub fn pop(&self) -> Option<T> {
442        // Load the back and front index.
443        let b = self.inner.back.load(Ordering::Relaxed);
444        let f = self.inner.front.load(Ordering::Relaxed);
445
446        // Calculate the length of the queue.
447        let len = b.wrapping_sub(f);
448
449        // Is the queue empty?
450        if len <= 0 {
451            return None;
452        }
453
454        match self.flavor {
455            // Pop from the front of the queue.
456            Flavor::Fifo => {
457                // Try incrementing the front index to pop the task.
458                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
459                let new_f = f.wrapping_add(1);
460
461                if b.wrapping_sub(new_f) < 0 {
462                    self.inner.front.store(f, Ordering::Relaxed);
463                    return None;
464                }
465
466                unsafe {
467                    // Read the popped task.
468                    let buffer = self.buffer.get();
469                    let task = buffer.read(f).assume_init();
470
471                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
472                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
473                        self.resize(buffer.cap / 2);
474                    }
475
476                    Some(task)
477                }
478            }
479
480            // Pop from the back of the queue.
481            Flavor::Lifo => {
482                // Decrement the back index.
483                let b = b.wrapping_sub(1);
484                self.inner.back.store(b, Ordering::Relaxed);
485
486                atomic::fence(Ordering::SeqCst);
487
488                // Load the front index.
489                let f = self.inner.front.load(Ordering::Relaxed);
490
491                // Compute the length after the back index was decremented.
492                let len = b.wrapping_sub(f);
493
494                if len < 0 {
495                    // The queue is empty. Restore the back index to the original task.
496                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
497                    None
498                } else {
499                    // Read the task to be popped.
500                    let buffer = self.buffer.get();
501                    let mut task = unsafe { Some(buffer.read(b)) };
502
503                    // Are we popping the last task from the queue?
504                    if len == 0 {
505                        // Try incrementing the front index.
506                        if self
507                            .inner
508                            .front
509                            .compare_exchange(
510                                f,
511                                f.wrapping_add(1),
512                                Ordering::SeqCst,
513                                Ordering::Relaxed,
514                            )
515                            .is_err()
516                        {
517                            // Failed. We didn't pop anything. Reset to `None`.
518                            task.take();
519                        }
520
521                        // Restore the back index to the original task.
522                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
523                    } else {
524                        // Shrink the buffer if `len` is less than one fourth of the capacity.
525                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
526                            unsafe {
527                                self.resize(buffer.cap / 2);
528                            }
529                        }
530                    }
531
532                    task.map(|t| unsafe { t.assume_init() })
533                }
534            }
535        }
536    }
537}
538
539impl<T> fmt::Debug for Worker<T> {
540    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
541        f.pad("Worker { .. }")
542    }
543}
544
545/// A stealer handle of a worker queue.
546///
547/// Stealers can be shared among threads.
548///
549/// Task schedulers typically have a single worker queue per worker thread.
550///
551/// # Examples
552///
553/// ```
554/// use crossbeam_deque::{Steal, Worker};
555///
556/// let w = Worker::new_lifo();
557/// w.push(1);
558/// w.push(2);
559///
560/// let s = w.stealer();
561/// assert_eq!(s.steal(), Steal::Success(1));
562/// assert_eq!(s.steal(), Steal::Success(2));
563/// assert_eq!(s.steal(), Steal::Empty);
564/// ```
565pub struct Stealer<T> {
566    /// A reference to the inner representation of the queue.
567    inner: Arc<CachePadded<Inner<T>>>,
568
569    /// The flavor of the queue.
570    flavor: Flavor,
571}
572
573unsafe impl<T: Send> Send for Stealer<T> {}
574unsafe impl<T: Send> Sync for Stealer<T> {}
575
576impl<T> Stealer<T> {
577    /// Returns `true` if the queue is empty.
578    ///
579    /// ```
580    /// use crossbeam_deque::Worker;
581    ///
582    /// let w = Worker::new_lifo();
583    /// let s = w.stealer();
584    ///
585    /// assert!(s.is_empty());
586    /// w.push(1);
587    /// assert!(!s.is_empty());
588    /// ```
589    pub fn is_empty(&self) -> bool {
590        let f = self.inner.front.load(Ordering::Acquire);
591        atomic::fence(Ordering::SeqCst);
592        let b = self.inner.back.load(Ordering::Acquire);
593        b.wrapping_sub(f) <= 0
594    }
595
596    /// Returns the number of tasks in the deque.
597    ///
598    /// ```
599    /// use crossbeam_deque::Worker;
600    ///
601    /// let w = Worker::new_lifo();
602    /// let s = w.stealer();
603    ///
604    /// assert_eq!(s.len(), 0);
605    /// w.push(1);
606    /// assert_eq!(s.len(), 1);
607    /// w.push(2);
608    /// assert_eq!(s.len(), 2);
609    /// ```
610    pub fn len(&self) -> usize {
611        let f = self.inner.front.load(Ordering::Acquire);
612        atomic::fence(Ordering::SeqCst);
613        let b = self.inner.back.load(Ordering::Acquire);
614        b.wrapping_sub(f).max(0) as usize
615    }
616
617    /// Steals a task from the queue.
618    ///
619    /// # Examples
620    ///
621    /// ```
622    /// use crossbeam_deque::{Steal, Worker};
623    ///
624    /// let w = Worker::new_lifo();
625    /// w.push(1);
626    /// w.push(2);
627    ///
628    /// let s = w.stealer();
629    /// assert_eq!(s.steal(), Steal::Success(1));
630    /// assert_eq!(s.steal(), Steal::Success(2));
631    /// ```
632    pub fn steal(&self) -> Steal<T> {
633        // Load the front index.
634        let f = self.inner.front.load(Ordering::Acquire);
635
636        // A SeqCst fence is needed here.
637        //
638        // If the current thread is already pinned (reentrantly), we must manually issue the
639        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
640        // have to.
641        if epoch::is_pinned() {
642            atomic::fence(Ordering::SeqCst);
643        }
644
645        let guard = &epoch::pin();
646
647        // Load the back index.
648        let b = self.inner.back.load(Ordering::Acquire);
649
650        // Is the queue empty?
651        if b.wrapping_sub(f) <= 0 {
652            return Steal::Empty;
653        }
654
655        // Load the buffer and read the task at the front.
656        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
657        let task = unsafe { buffer.deref().read(f) };
658
659        // Try incrementing the front index to steal the task.
660        // If the buffer has been swapped or the increment fails, we retry.
661        if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
662            || self
663                .inner
664                .front
665                .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
666                .is_err()
667        {
668            // We didn't steal this task, forget it.
669            return Steal::Retry;
670        }
671
672        // Return the stolen task.
673        Steal::Success(unsafe { task.assume_init() })
674    }
675
676    /// Steals a batch of tasks and pushes them into another worker.
677    ///
678    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
679    /// steal around half of the tasks in the queue, but also not more than some constant limit.
680    ///
681    /// # Examples
682    ///
683    /// ```
684    /// use crossbeam_deque::Worker;
685    ///
686    /// let w1 = Worker::new_fifo();
687    /// w1.push(1);
688    /// w1.push(2);
689    /// w1.push(3);
690    /// w1.push(4);
691    ///
692    /// let s = w1.stealer();
693    /// let w2 = Worker::new_fifo();
694    ///
695    /// let _ = s.steal_batch(&w2);
696    /// assert_eq!(w2.pop(), Some(1));
697    /// assert_eq!(w2.pop(), Some(2));
698    /// ```
699    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
700        self.steal_batch_with_limit(dest, MAX_BATCH)
701    }
702
703    /// Steals no more than `limit` of tasks and pushes them into another worker.
704    ///
705    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
706    /// steal around half of the tasks in the queue, but also not more than the given limit.
707    ///
708    /// # Examples
709    ///
710    /// ```
711    /// use crossbeam_deque::Worker;
712    ///
713    /// let w1 = Worker::new_fifo();
714    /// w1.push(1);
715    /// w1.push(2);
716    /// w1.push(3);
717    /// w1.push(4);
718    /// w1.push(5);
719    /// w1.push(6);
720    ///
721    /// let s = w1.stealer();
722    /// let w2 = Worker::new_fifo();
723    ///
724    /// let _ = s.steal_batch_with_limit(&w2, 2);
725    /// assert_eq!(w2.pop(), Some(1));
726    /// assert_eq!(w2.pop(), Some(2));
727    /// assert_eq!(w2.pop(), None);
728    ///
729    /// w1.push(7);
730    /// w1.push(8);
731    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
732    /// // half of the elements are currently popped, but the number of popped elements is considered
733    /// // an implementation detail that may be changed in the future.
734    /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
735    /// assert_eq!(w2.len(), 3);
736    /// ```
737    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
738        assert!(limit > 0);
739        if Arc::ptr_eq(&self.inner, &dest.inner) {
740            if dest.is_empty() {
741                return Steal::Empty;
742            } else {
743                return Steal::Success(());
744            }
745        }
746
747        // Load the front index.
748        let mut f = self.inner.front.load(Ordering::Acquire);
749
750        // A SeqCst fence is needed here.
751        //
752        // If the current thread is already pinned (reentrantly), we must manually issue the
753        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
754        // have to.
755        if epoch::is_pinned() {
756            atomic::fence(Ordering::SeqCst);
757        }
758
759        let guard = &epoch::pin();
760
761        // Load the back index.
762        let b = self.inner.back.load(Ordering::Acquire);
763
764        // Is the queue empty?
765        let len = b.wrapping_sub(f);
766        if len <= 0 {
767            return Steal::Empty;
768        }
769
770        // Reserve capacity for the stolen batch.
771        let batch_size = cmp::min((len as usize + 1) / 2, limit);
772        dest.reserve(batch_size);
773        let mut batch_size = batch_size as isize;
774
775        // Get the destination buffer and back index.
776        let dest_buffer = dest.buffer.get();
777        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
778
779        // Load the buffer.
780        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
781
782        match self.flavor {
783            // Steal a batch of tasks from the front at once.
784            Flavor::Fifo => {
785                // Copy the batch from the source to the destination buffer.
786                match dest.flavor {
787                    Flavor::Fifo => {
788                        for i in 0..batch_size {
789                            unsafe {
790                                let task = buffer.deref().read(f.wrapping_add(i));
791                                dest_buffer.write(dest_b.wrapping_add(i), task);
792                            }
793                        }
794                    }
795                    Flavor::Lifo => {
796                        for i in 0..batch_size {
797                            unsafe {
798                                let task = buffer.deref().read(f.wrapping_add(i));
799                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
800                            }
801                        }
802                    }
803                }
804
805                // Try incrementing the front index to steal the batch.
806                // If the buffer has been swapped or the increment fails, we retry.
807                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
808                    || self
809                        .inner
810                        .front
811                        .compare_exchange(
812                            f,
813                            f.wrapping_add(batch_size),
814                            Ordering::SeqCst,
815                            Ordering::Relaxed,
816                        )
817                        .is_err()
818                {
819                    return Steal::Retry;
820                }
821
822                dest_b = dest_b.wrapping_add(batch_size);
823            }
824
825            // Steal a batch of tasks from the front one by one.
826            Flavor::Lifo => {
827                // This loop may modify the batch_size, which triggers a clippy lint warning.
828                // Use a new variable to avoid the warning, and to make it clear we aren't
829                // modifying the loop exit condition during iteration.
830                let original_batch_size = batch_size;
831
832                for i in 0..original_batch_size {
833                    // If this is not the first steal, check whether the queue is empty.
834                    if i > 0 {
835                        // We've already got the current front index. Now execute the fence to
836                        // synchronize with other threads.
837                        atomic::fence(Ordering::SeqCst);
838
839                        // Load the back index.
840                        let b = self.inner.back.load(Ordering::Acquire);
841
842                        // Is the queue empty?
843                        if b.wrapping_sub(f) <= 0 {
844                            batch_size = i;
845                            break;
846                        }
847                    }
848
849                    // Read the task at the front.
850                    let task = unsafe { buffer.deref().read(f) };
851
852                    // Try incrementing the front index to steal the task.
853                    // If the buffer has been swapped or the increment fails, we retry.
854                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
855                        || self
856                            .inner
857                            .front
858                            .compare_exchange(
859                                f,
860                                f.wrapping_add(1),
861                                Ordering::SeqCst,
862                                Ordering::Relaxed,
863                            )
864                            .is_err()
865                    {
866                        // We didn't steal this task, forget it and break from the loop.
867                        batch_size = i;
868                        break;
869                    }
870
871                    // Write the stolen task into the destination buffer.
872                    unsafe {
873                        dest_buffer.write(dest_b, task);
874                    }
875
876                    // Move the source front index and the destination back index one step forward.
877                    f = f.wrapping_add(1);
878                    dest_b = dest_b.wrapping_add(1);
879                }
880
881                // If we didn't steal anything, the operation needs to be retried.
882                if batch_size == 0 {
883                    return Steal::Retry;
884                }
885
886                // If stealing into a FIFO queue, stolen tasks need to be reversed.
887                if dest.flavor == Flavor::Fifo {
888                    for i in 0..batch_size / 2 {
889                        unsafe {
890                            let i1 = dest_b.wrapping_sub(batch_size - i);
891                            let i2 = dest_b.wrapping_sub(i + 1);
892                            let t1 = dest_buffer.read(i1);
893                            let t2 = dest_buffer.read(i2);
894                            dest_buffer.write(i1, t2);
895                            dest_buffer.write(i2, t1);
896                        }
897                    }
898                }
899            }
900        }
901
902        atomic::fence(Ordering::Release);
903
904        // Update the back index in the destination queue.
905        //
906        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
907        // races because it doesn't understand fences.
908        dest.inner.back.store(dest_b, Ordering::Release);
909
910        // Return with success.
911        Steal::Success(())
912    }
913
914    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
915    ///
916    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
917    /// steal around half of the tasks in the queue, but also not more than some constant limit.
918    ///
919    /// # Examples
920    ///
921    /// ```
922    /// use crossbeam_deque::{Steal, Worker};
923    ///
924    /// let w1 = Worker::new_fifo();
925    /// w1.push(1);
926    /// w1.push(2);
927    /// w1.push(3);
928    /// w1.push(4);
929    ///
930    /// let s = w1.stealer();
931    /// let w2 = Worker::new_fifo();
932    ///
933    /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
934    /// assert_eq!(w2.pop(), Some(2));
935    /// ```
936    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
937        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
938    }
939
940    /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
941    /// that worker.
942    ///
943    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
944    /// steal around half of the tasks in the queue, but also not more than the given limit.
945    ///
946    /// # Examples
947    ///
948    /// ```
949    /// use crossbeam_deque::{Steal, Worker};
950    ///
951    /// let w1 = Worker::new_fifo();
952    /// w1.push(1);
953    /// w1.push(2);
954    /// w1.push(3);
955    /// w1.push(4);
956    /// w1.push(5);
957    /// w1.push(6);
958    ///
959    /// let s = w1.stealer();
960    /// let w2 = Worker::new_fifo();
961    ///
962    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
963    /// assert_eq!(w2.pop(), Some(2));
964    /// assert_eq!(w2.pop(), None);
965    ///
966    /// w1.push(7);
967    /// w1.push(8);
968    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
969    /// // half of the elements are currently popped, but the number of popped elements is considered
970    /// // an implementation detail that may be changed in the future.
971    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
972    /// assert_eq!(w2.pop(), Some(4));
973    /// assert_eq!(w2.pop(), Some(5));
974    /// assert_eq!(w2.pop(), None);
975    /// ```
976    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
977        assert!(limit > 0);
978        if Arc::ptr_eq(&self.inner, &dest.inner) {
979            match dest.pop() {
980                None => return Steal::Empty,
981                Some(task) => return Steal::Success(task),
982            }
983        }
984
985        // Load the front index.
986        let mut f = self.inner.front.load(Ordering::Acquire);
987
988        // A SeqCst fence is needed here.
989        //
990        // If the current thread is already pinned (reentrantly), we must manually issue the
991        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
992        // have to.
993        if epoch::is_pinned() {
994            atomic::fence(Ordering::SeqCst);
995        }
996
997        let guard = &epoch::pin();
998
999        // Load the back index.
1000        let b = self.inner.back.load(Ordering::Acquire);
1001
1002        // Is the queue empty?
1003        let len = b.wrapping_sub(f);
1004        if len <= 0 {
1005            return Steal::Empty;
1006        }
1007
1008        // Reserve capacity for the stolen batch.
1009        let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1010        dest.reserve(batch_size);
1011        let mut batch_size = batch_size as isize;
1012
1013        // Get the destination buffer and back index.
1014        let dest_buffer = dest.buffer.get();
1015        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1016
1017        // Load the buffer
1018        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1019
1020        // Read the task at the front.
1021        let mut task = unsafe { buffer.deref().read(f) };
1022
1023        match self.flavor {
1024            // Steal a batch of tasks from the front at once.
1025            Flavor::Fifo => {
1026                // Copy the batch from the source to the destination buffer.
1027                match dest.flavor {
1028                    Flavor::Fifo => {
1029                        for i in 0..batch_size {
1030                            unsafe {
1031                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1032                                dest_buffer.write(dest_b.wrapping_add(i), task);
1033                            }
1034                        }
1035                    }
1036                    Flavor::Lifo => {
1037                        for i in 0..batch_size {
1038                            unsafe {
1039                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1040                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1041                            }
1042                        }
1043                    }
1044                }
1045
1046                // Try incrementing the front index to steal the task.
1047                // If the buffer has been swapped or the increment fails, we retry.
1048                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1049                    || self
1050                        .inner
1051                        .front
1052                        .compare_exchange(
1053                            f,
1054                            f.wrapping_add(batch_size + 1),
1055                            Ordering::SeqCst,
1056                            Ordering::Relaxed,
1057                        )
1058                        .is_err()
1059                {
1060                    // We didn't steal this task, forget it.
1061                    return Steal::Retry;
1062                }
1063
1064                dest_b = dest_b.wrapping_add(batch_size);
1065            }
1066
1067            // Steal a batch of tasks from the front one by one.
1068            Flavor::Lifo => {
1069                // Try incrementing the front index to steal the task.
1070                if self
1071                    .inner
1072                    .front
1073                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1074                    .is_err()
1075                {
1076                    // We didn't steal this task, forget it.
1077                    return Steal::Retry;
1078                }
1079
1080                // Move the front index one step forward.
1081                f = f.wrapping_add(1);
1082
1083                // Repeat the same procedure for the batch steals.
1084                //
1085                // This loop may modify the batch_size, which triggers a clippy lint warning.
1086                // Use a new variable to avoid the warning, and to make it clear we aren't
1087                // modifying the loop exit condition during iteration.
1088                let original_batch_size = batch_size;
1089                for i in 0..original_batch_size {
1090                    // We've already got the current front index. Now execute the fence to
1091                    // synchronize with other threads.
1092                    atomic::fence(Ordering::SeqCst);
1093
1094                    // Load the back index.
1095                    let b = self.inner.back.load(Ordering::Acquire);
1096
1097                    // Is the queue empty?
1098                    if b.wrapping_sub(f) <= 0 {
1099                        batch_size = i;
1100                        break;
1101                    }
1102
1103                    // Read the task at the front.
1104                    let tmp = unsafe { buffer.deref().read(f) };
1105
1106                    // Try incrementing the front index to steal the task.
1107                    // If the buffer has been swapped or the increment fails, we retry.
1108                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1109                        || self
1110                            .inner
1111                            .front
1112                            .compare_exchange(
1113                                f,
1114                                f.wrapping_add(1),
1115                                Ordering::SeqCst,
1116                                Ordering::Relaxed,
1117                            )
1118                            .is_err()
1119                    {
1120                        // We didn't steal this task, forget it and break from the loop.
1121                        batch_size = i;
1122                        break;
1123                    }
1124
1125                    // Write the previously stolen task into the destination buffer.
1126                    unsafe {
1127                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1128                    }
1129
1130                    // Move the source front index and the destination back index one step forward.
1131                    f = f.wrapping_add(1);
1132                    dest_b = dest_b.wrapping_add(1);
1133                }
1134
1135                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1136                if dest.flavor == Flavor::Fifo {
1137                    for i in 0..batch_size / 2 {
1138                        unsafe {
1139                            let i1 = dest_b.wrapping_sub(batch_size - i);
1140                            let i2 = dest_b.wrapping_sub(i + 1);
1141                            let t1 = dest_buffer.read(i1);
1142                            let t2 = dest_buffer.read(i2);
1143                            dest_buffer.write(i1, t2);
1144                            dest_buffer.write(i2, t1);
1145                        }
1146                    }
1147                }
1148            }
1149        }
1150
1151        atomic::fence(Ordering::Release);
1152
1153        // Update the back index in the destination queue.
1154        //
1155        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1156        // races because it doesn't understand fences.
1157        dest.inner.back.store(dest_b, Ordering::Release);
1158
1159        // Return with success.
1160        Steal::Success(unsafe { task.assume_init() })
1161    }
1162}
1163
1164impl<T> Clone for Stealer<T> {
1165    fn clone(&self) -> Stealer<T> {
1166        Stealer {
1167            inner: self.inner.clone(),
1168            flavor: self.flavor,
1169        }
1170    }
1171}
1172
1173impl<T> fmt::Debug for Stealer<T> {
1174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1175        f.pad("Stealer { .. }")
1176    }
1177}
1178
1179// Bits indicating the state of a slot:
1180// * If a task has been written into the slot, `WRITE` is set.
1181// * If a task has been read from the slot, `READ` is set.
1182// * If the block is being destroyed, `DESTROY` is set.
1183const WRITE: usize = 1;
1184const READ: usize = 2;
1185const DESTROY: usize = 4;
1186
1187// Each block covers one "lap" of indices.
1188const LAP: usize = 64;
1189// The maximum number of values a block can hold.
1190const BLOCK_CAP: usize = LAP - 1;
1191// How many lower bits are reserved for metadata.
1192const SHIFT: usize = 1;
1193// Indicates that the block is not the last one.
1194const HAS_NEXT: usize = 1;
1195
1196/// A slot in a block.
1197struct Slot<T> {
1198    /// The task.
1199    task: UnsafeCell<MaybeUninit<T>>,
1200
1201    /// The state of the slot.
1202    state: AtomicUsize,
1203}
1204
1205impl<T> Slot<T> {
1206    const UNINIT: Self = Self {
1207        task: UnsafeCell::new(MaybeUninit::uninit()),
1208        state: AtomicUsize::new(0),
1209    };
1210
1211    /// Waits until a task is written into the slot.
1212    fn wait_write(&self) {
1213        let backoff = Backoff::new();
1214        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1215            backoff.snooze();
1216        }
1217    }
1218}
1219
1220/// A block in a linked list.
1221///
1222/// Each block in the list can hold up to `BLOCK_CAP` values.
1223struct Block<T> {
1224    /// The next block in the linked list.
1225    next: AtomicPtr<Block<T>>,
1226
1227    /// Slots for values.
1228    slots: [Slot<T>; BLOCK_CAP],
1229}
1230
1231impl<T> Block<T> {
1232    /// Creates an empty block that starts at `start_index`.
1233    fn new() -> Block<T> {
1234        Self {
1235            next: AtomicPtr::new(ptr::null_mut()),
1236            slots: [Slot::UNINIT; BLOCK_CAP],
1237        }
1238    }
1239
1240    /// Waits until the next pointer is set.
1241    fn wait_next(&self) -> *mut Block<T> {
1242        let backoff = Backoff::new();
1243        loop {
1244            let next = self.next.load(Ordering::Acquire);
1245            if !next.is_null() {
1246                return next;
1247            }
1248            backoff.snooze();
1249        }
1250    }
1251
1252    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1253    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1254        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1255        // begun destruction of the block.
1256        for i in (0..count).rev() {
1257            let slot = (*this).slots.get_unchecked(i);
1258
1259            // Mark the `DESTROY` bit if a thread is still using the slot.
1260            if slot.state.load(Ordering::Acquire) & READ == 0
1261                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1262            {
1263                // If a thread is still using the slot, it will continue destruction of the block.
1264                return;
1265            }
1266        }
1267
1268        // No thread is using the block, now it is safe to destroy it.
1269        drop(Box::from_raw(this));
1270    }
1271}
1272
1273/// A position in a queue.
1274struct Position<T> {
1275    /// The index in the queue.
1276    index: AtomicUsize,
1277
1278    /// The block in the linked list.
1279    block: AtomicPtr<Block<T>>,
1280}
1281
1282/// An injector queue.
1283///
1284/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1285/// a single injector queue, which is the entry point for new tasks.
1286///
1287/// # Examples
1288///
1289/// ```
1290/// use crossbeam_deque::{Injector, Steal};
1291///
1292/// let q = Injector::new();
1293/// q.push(1);
1294/// q.push(2);
1295///
1296/// assert_eq!(q.steal(), Steal::Success(1));
1297/// assert_eq!(q.steal(), Steal::Success(2));
1298/// assert_eq!(q.steal(), Steal::Empty);
1299/// ```
1300pub struct Injector<T> {
1301    /// The head of the queue.
1302    head: CachePadded<Position<T>>,
1303
1304    /// The tail of the queue.
1305    tail: CachePadded<Position<T>>,
1306
1307    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1308    _marker: PhantomData<T>,
1309}
1310
1311unsafe impl<T: Send> Send for Injector<T> {}
1312unsafe impl<T: Send> Sync for Injector<T> {}
1313
1314impl<T> Default for Injector<T> {
1315    fn default() -> Self {
1316        let block = Box::into_raw(Box::new(Block::<T>::new()));
1317        Self {
1318            head: CachePadded::new(Position {
1319                block: AtomicPtr::new(block),
1320                index: AtomicUsize::new(0),
1321            }),
1322            tail: CachePadded::new(Position {
1323                block: AtomicPtr::new(block),
1324                index: AtomicUsize::new(0),
1325            }),
1326            _marker: PhantomData,
1327        }
1328    }
1329}
1330
1331impl<T> Injector<T> {
1332    /// Creates a new injector queue.
1333    ///
1334    /// # Examples
1335    ///
1336    /// ```
1337    /// use crossbeam_deque::Injector;
1338    ///
1339    /// let q = Injector::<i32>::new();
1340    /// ```
1341    pub fn new() -> Injector<T> {
1342        Self::default()
1343    }
1344
1345    /// Pushes a task into the queue.
1346    ///
1347    /// # Examples
1348    ///
1349    /// ```
1350    /// use crossbeam_deque::Injector;
1351    ///
1352    /// let w = Injector::new();
1353    /// w.push(1);
1354    /// w.push(2);
1355    /// ```
1356    pub fn push(&self, task: T) {
1357        let backoff = Backoff::new();
1358        let mut tail = self.tail.index.load(Ordering::Acquire);
1359        let mut block = self.tail.block.load(Ordering::Acquire);
1360        let mut next_block = None;
1361
1362        loop {
1363            // Calculate the offset of the index into the block.
1364            let offset = (tail >> SHIFT) % LAP;
1365
1366            // If we reached the end of the block, wait until the next one is installed.
1367            if offset == BLOCK_CAP {
1368                backoff.snooze();
1369                tail = self.tail.index.load(Ordering::Acquire);
1370                block = self.tail.block.load(Ordering::Acquire);
1371                continue;
1372            }
1373
1374            // If we're going to have to install the next block, allocate it in advance in order to
1375            // make the wait for other threads as short as possible.
1376            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1377                next_block = Some(Box::new(Block::<T>::new()));
1378            }
1379
1380            let new_tail = tail + (1 << SHIFT);
1381
1382            // Try advancing the tail forward.
1383            match self.tail.index.compare_exchange_weak(
1384                tail,
1385                new_tail,
1386                Ordering::SeqCst,
1387                Ordering::Acquire,
1388            ) {
1389                Ok(_) => unsafe {
1390                    // If we've reached the end of the block, install the next one.
1391                    if offset + 1 == BLOCK_CAP {
1392                        let next_block = Box::into_raw(next_block.unwrap());
1393                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1394
1395                        self.tail.block.store(next_block, Ordering::Release);
1396                        self.tail.index.store(next_index, Ordering::Release);
1397                        (*block).next.store(next_block, Ordering::Release);
1398                    }
1399
1400                    // Write the task into the slot.
1401                    let slot = (*block).slots.get_unchecked(offset);
1402                    slot.task.get().write(MaybeUninit::new(task));
1403                    slot.state.fetch_or(WRITE, Ordering::Release);
1404
1405                    return;
1406                },
1407                Err(t) => {
1408                    tail = t;
1409                    block = self.tail.block.load(Ordering::Acquire);
1410                    backoff.spin();
1411                }
1412            }
1413        }
1414    }
1415
1416    /// Steals a task from the queue.
1417    ///
1418    /// # Examples
1419    ///
1420    /// ```
1421    /// use crossbeam_deque::{Injector, Steal};
1422    ///
1423    /// let q = Injector::new();
1424    /// q.push(1);
1425    /// q.push(2);
1426    ///
1427    /// assert_eq!(q.steal(), Steal::Success(1));
1428    /// assert_eq!(q.steal(), Steal::Success(2));
1429    /// assert_eq!(q.steal(), Steal::Empty);
1430    /// ```
1431    pub fn steal(&self) -> Steal<T> {
1432        let mut head;
1433        let mut block;
1434        let mut offset;
1435
1436        let backoff = Backoff::new();
1437        loop {
1438            head = self.head.index.load(Ordering::Acquire);
1439            block = self.head.block.load(Ordering::Acquire);
1440
1441            // Calculate the offset of the index into the block.
1442            offset = (head >> SHIFT) % LAP;
1443
1444            // If we reached the end of the block, wait until the next one is installed.
1445            if offset == BLOCK_CAP {
1446                backoff.snooze();
1447            } else {
1448                break;
1449            }
1450        }
1451
1452        let mut new_head = head + (1 << SHIFT);
1453
1454        if new_head & HAS_NEXT == 0 {
1455            atomic::fence(Ordering::SeqCst);
1456            let tail = self.tail.index.load(Ordering::Relaxed);
1457
1458            // If the tail equals the head, that means the queue is empty.
1459            if head >> SHIFT == tail >> SHIFT {
1460                return Steal::Empty;
1461            }
1462
1463            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1464            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1465                new_head |= HAS_NEXT;
1466            }
1467        }
1468
1469        // Try moving the head index forward.
1470        if self
1471            .head
1472            .index
1473            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1474            .is_err()
1475        {
1476            return Steal::Retry;
1477        }
1478
1479        unsafe {
1480            // If we've reached the end of the block, move to the next one.
1481            if offset + 1 == BLOCK_CAP {
1482                let next = (*block).wait_next();
1483                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1484                if !(*next).next.load(Ordering::Relaxed).is_null() {
1485                    next_index |= HAS_NEXT;
1486                }
1487
1488                self.head.block.store(next, Ordering::Release);
1489                self.head.index.store(next_index, Ordering::Release);
1490            }
1491
1492            // Read the task.
1493            let slot = (*block).slots.get_unchecked(offset);
1494            slot.wait_write();
1495            let task = slot.task.get().read().assume_init();
1496
1497            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1498            // but couldn't because we were busy reading from the slot.
1499            if (offset + 1 == BLOCK_CAP)
1500                || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1501            {
1502                Block::destroy(block, offset);
1503            }
1504
1505            Steal::Success(task)
1506        }
1507    }
1508
1509    /// Steals a batch of tasks and pushes them into a worker.
1510    ///
1511    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1512    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1513    ///
1514    /// # Examples
1515    ///
1516    /// ```
1517    /// use crossbeam_deque::{Injector, Worker};
1518    ///
1519    /// let q = Injector::new();
1520    /// q.push(1);
1521    /// q.push(2);
1522    /// q.push(3);
1523    /// q.push(4);
1524    ///
1525    /// let w = Worker::new_fifo();
1526    /// let _ = q.steal_batch(&w);
1527    /// assert_eq!(w.pop(), Some(1));
1528    /// assert_eq!(w.pop(), Some(2));
1529    /// ```
1530    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1531        self.steal_batch_with_limit(dest, MAX_BATCH)
1532    }
1533
1534    /// Steals no more than of tasks and pushes them into a worker.
1535    ///
1536    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1537    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1538    ///
1539    /// # Examples
1540    ///
1541    /// ```
1542    /// use crossbeam_deque::{Injector, Worker};
1543    ///
1544    /// let q = Injector::new();
1545    /// q.push(1);
1546    /// q.push(2);
1547    /// q.push(3);
1548    /// q.push(4);
1549    /// q.push(5);
1550    /// q.push(6);
1551    ///
1552    /// let w = Worker::new_fifo();
1553    /// let _ = q.steal_batch_with_limit(&w, 2);
1554    /// assert_eq!(w.pop(), Some(1));
1555    /// assert_eq!(w.pop(), Some(2));
1556    /// assert_eq!(w.pop(), None);
1557    ///
1558    /// q.push(7);
1559    /// q.push(8);
1560    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1561    /// // half of the elements are currently popped, but the number of popped elements is considered
1562    /// // an implementation detail that may be changed in the future.
1563    /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1564    /// assert_eq!(w.len(), 3);
1565    /// ```
1566    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1567        assert!(limit > 0);
1568        let mut head;
1569        let mut block;
1570        let mut offset;
1571
1572        let backoff = Backoff::new();
1573        loop {
1574            head = self.head.index.load(Ordering::Acquire);
1575            block = self.head.block.load(Ordering::Acquire);
1576
1577            // Calculate the offset of the index into the block.
1578            offset = (head >> SHIFT) % LAP;
1579
1580            // If we reached the end of the block, wait until the next one is installed.
1581            if offset == BLOCK_CAP {
1582                backoff.snooze();
1583            } else {
1584                break;
1585            }
1586        }
1587
1588        let mut new_head = head;
1589        let advance;
1590
1591        if new_head & HAS_NEXT == 0 {
1592            atomic::fence(Ordering::SeqCst);
1593            let tail = self.tail.index.load(Ordering::Relaxed);
1594
1595            // If the tail equals the head, that means the queue is empty.
1596            if head >> SHIFT == tail >> SHIFT {
1597                return Steal::Empty;
1598            }
1599
1600            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1601            // the right batch size to steal.
1602            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1603                new_head |= HAS_NEXT;
1604                // We can steal all tasks till the end of the block.
1605                advance = (BLOCK_CAP - offset).min(limit);
1606            } else {
1607                let len = (tail - head) >> SHIFT;
1608                // Steal half of the available tasks.
1609                advance = ((len + 1) / 2).min(limit);
1610            }
1611        } else {
1612            // We can steal all tasks till the end of the block.
1613            advance = (BLOCK_CAP - offset).min(limit);
1614        }
1615
1616        new_head += advance << SHIFT;
1617        let new_offset = offset + advance;
1618
1619        // Try moving the head index forward.
1620        if self
1621            .head
1622            .index
1623            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1624            .is_err()
1625        {
1626            return Steal::Retry;
1627        }
1628
1629        // Reserve capacity for the stolen batch.
1630        let batch_size = new_offset - offset;
1631        dest.reserve(batch_size);
1632
1633        // Get the destination buffer and back index.
1634        let dest_buffer = dest.buffer.get();
1635        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1636
1637        unsafe {
1638            // If we've reached the end of the block, move to the next one.
1639            if new_offset == BLOCK_CAP {
1640                let next = (*block).wait_next();
1641                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1642                if !(*next).next.load(Ordering::Relaxed).is_null() {
1643                    next_index |= HAS_NEXT;
1644                }
1645
1646                self.head.block.store(next, Ordering::Release);
1647                self.head.index.store(next_index, Ordering::Release);
1648            }
1649
1650            // Copy values from the injector into the destination queue.
1651            match dest.flavor {
1652                Flavor::Fifo => {
1653                    for i in 0..batch_size {
1654                        // Read the task.
1655                        let slot = (*block).slots.get_unchecked(offset + i);
1656                        slot.wait_write();
1657                        let task = slot.task.get().read();
1658
1659                        // Write it into the destination queue.
1660                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1661                    }
1662                }
1663
1664                Flavor::Lifo => {
1665                    for i in 0..batch_size {
1666                        // Read the task.
1667                        let slot = (*block).slots.get_unchecked(offset + i);
1668                        slot.wait_write();
1669                        let task = slot.task.get().read();
1670
1671                        // Write it into the destination queue.
1672                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1673                    }
1674                }
1675            }
1676
1677            atomic::fence(Ordering::Release);
1678
1679            // Update the back index in the destination queue.
1680            //
1681            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1682            // data races because it doesn't understand fences.
1683            dest.inner
1684                .back
1685                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1686
1687            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1688            // but couldn't because we were busy reading from the slot.
1689            if new_offset == BLOCK_CAP {
1690                Block::destroy(block, offset);
1691            } else {
1692                for i in offset..new_offset {
1693                    let slot = (*block).slots.get_unchecked(i);
1694
1695                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1696                        Block::destroy(block, offset);
1697                        break;
1698                    }
1699                }
1700            }
1701
1702            Steal::Success(())
1703        }
1704    }
1705
1706    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1707    ///
1708    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1709    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1710    ///
1711    /// # Examples
1712    ///
1713    /// ```
1714    /// use crossbeam_deque::{Injector, Steal, Worker};
1715    ///
1716    /// let q = Injector::new();
1717    /// q.push(1);
1718    /// q.push(2);
1719    /// q.push(3);
1720    /// q.push(4);
1721    ///
1722    /// let w = Worker::new_fifo();
1723    /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1724    /// assert_eq!(w.pop(), Some(2));
1725    /// ```
1726    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1727        // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1728        // better, but we may change it in the future to be compatible with the same method in Stealer.
1729        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1730    }
1731
1732    /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1733    ///
1734    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1735    /// steal around half of the tasks in the queue, but also not more than the given limit.
1736    ///
1737    /// # Examples
1738    ///
1739    /// ```
1740    /// use crossbeam_deque::{Injector, Steal, Worker};
1741    ///
1742    /// let q = Injector::new();
1743    /// q.push(1);
1744    /// q.push(2);
1745    /// q.push(3);
1746    /// q.push(4);
1747    /// q.push(5);
1748    /// q.push(6);
1749    ///
1750    /// let w = Worker::new_fifo();
1751    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1752    /// assert_eq!(w.pop(), Some(2));
1753    /// assert_eq!(w.pop(), None);
1754    ///
1755    /// q.push(7);
1756    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1757    /// // half of the elements are currently popped, but the number of popped elements is considered
1758    /// // an implementation detail that may be changed in the future.
1759    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1760    /// assert_eq!(w.pop(), Some(4));
1761    /// assert_eq!(w.pop(), Some(5));
1762    /// assert_eq!(w.pop(), None);
1763    /// ```
1764    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1765        assert!(limit > 0);
1766        let mut head;
1767        let mut block;
1768        let mut offset;
1769
1770        let backoff = Backoff::new();
1771        loop {
1772            head = self.head.index.load(Ordering::Acquire);
1773            block = self.head.block.load(Ordering::Acquire);
1774
1775            // Calculate the offset of the index into the block.
1776            offset = (head >> SHIFT) % LAP;
1777
1778            // If we reached the end of the block, wait until the next one is installed.
1779            if offset == BLOCK_CAP {
1780                backoff.snooze();
1781            } else {
1782                break;
1783            }
1784        }
1785
1786        let mut new_head = head;
1787        let advance;
1788
1789        if new_head & HAS_NEXT == 0 {
1790            atomic::fence(Ordering::SeqCst);
1791            let tail = self.tail.index.load(Ordering::Relaxed);
1792
1793            // If the tail equals the head, that means the queue is empty.
1794            if head >> SHIFT == tail >> SHIFT {
1795                return Steal::Empty;
1796            }
1797
1798            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1799            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1800                new_head |= HAS_NEXT;
1801                // We can steal all tasks till the end of the block.
1802                advance = (BLOCK_CAP - offset).min(limit);
1803            } else {
1804                let len = (tail - head) >> SHIFT;
1805                // Steal half of the available tasks.
1806                advance = ((len + 1) / 2).min(limit);
1807            }
1808        } else {
1809            // We can steal all tasks till the end of the block.
1810            advance = (BLOCK_CAP - offset).min(limit);
1811        }
1812
1813        new_head += advance << SHIFT;
1814        let new_offset = offset + advance;
1815
1816        // Try moving the head index forward.
1817        if self
1818            .head
1819            .index
1820            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1821            .is_err()
1822        {
1823            return Steal::Retry;
1824        }
1825
1826        // Reserve capacity for the stolen batch.
1827        let batch_size = new_offset - offset - 1;
1828        dest.reserve(batch_size);
1829
1830        // Get the destination buffer and back index.
1831        let dest_buffer = dest.buffer.get();
1832        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1833
1834        unsafe {
1835            // If we've reached the end of the block, move to the next one.
1836            if new_offset == BLOCK_CAP {
1837                let next = (*block).wait_next();
1838                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1839                if !(*next).next.load(Ordering::Relaxed).is_null() {
1840                    next_index |= HAS_NEXT;
1841                }
1842
1843                self.head.block.store(next, Ordering::Release);
1844                self.head.index.store(next_index, Ordering::Release);
1845            }
1846
1847            // Read the task.
1848            let slot = (*block).slots.get_unchecked(offset);
1849            slot.wait_write();
1850            let task = slot.task.get().read();
1851
1852            match dest.flavor {
1853                Flavor::Fifo => {
1854                    // Copy values from the injector into the destination queue.
1855                    for i in 0..batch_size {
1856                        // Read the task.
1857                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1858                        slot.wait_write();
1859                        let task = slot.task.get().read();
1860
1861                        // Write it into the destination queue.
1862                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1863                    }
1864                }
1865
1866                Flavor::Lifo => {
1867                    // Copy values from the injector into the destination queue.
1868                    for i in 0..batch_size {
1869                        // Read the task.
1870                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1871                        slot.wait_write();
1872                        let task = slot.task.get().read();
1873
1874                        // Write it into the destination queue.
1875                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1876                    }
1877                }
1878            }
1879
1880            atomic::fence(Ordering::Release);
1881
1882            // Update the back index in the destination queue.
1883            //
1884            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1885            // data races because it doesn't understand fences.
1886            dest.inner
1887                .back
1888                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1889
1890            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1891            // but couldn't because we were busy reading from the slot.
1892            if new_offset == BLOCK_CAP {
1893                Block::destroy(block, offset);
1894            } else {
1895                for i in offset..new_offset {
1896                    let slot = (*block).slots.get_unchecked(i);
1897
1898                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1899                        Block::destroy(block, offset);
1900                        break;
1901                    }
1902                }
1903            }
1904
1905            Steal::Success(task.assume_init())
1906        }
1907    }
1908
1909    /// Returns `true` if the queue is empty.
1910    ///
1911    /// # Examples
1912    ///
1913    /// ```
1914    /// use crossbeam_deque::Injector;
1915    ///
1916    /// let q = Injector::new();
1917    ///
1918    /// assert!(q.is_empty());
1919    /// q.push(1);
1920    /// assert!(!q.is_empty());
1921    /// ```
1922    pub fn is_empty(&self) -> bool {
1923        let head = self.head.index.load(Ordering::SeqCst);
1924        let tail = self.tail.index.load(Ordering::SeqCst);
1925        head >> SHIFT == tail >> SHIFT
1926    }
1927
1928    /// Returns the number of tasks in the queue.
1929    ///
1930    /// # Examples
1931    ///
1932    /// ```
1933    /// use crossbeam_deque::Injector;
1934    ///
1935    /// let q = Injector::new();
1936    ///
1937    /// assert_eq!(q.len(), 0);
1938    /// q.push(1);
1939    /// assert_eq!(q.len(), 1);
1940    /// q.push(1);
1941    /// assert_eq!(q.len(), 2);
1942    /// ```
1943    pub fn len(&self) -> usize {
1944        loop {
1945            // Load the tail index, then load the head index.
1946            let mut tail = self.tail.index.load(Ordering::SeqCst);
1947            let mut head = self.head.index.load(Ordering::SeqCst);
1948
1949            // If the tail index didn't change, we've got consistent indices to work with.
1950            if self.tail.index.load(Ordering::SeqCst) == tail {
1951                // Erase the lower bits.
1952                tail &= !((1 << SHIFT) - 1);
1953                head &= !((1 << SHIFT) - 1);
1954
1955                // Fix up indices if they fall onto block ends.
1956                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1957                    tail = tail.wrapping_add(1 << SHIFT);
1958                }
1959                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1960                    head = head.wrapping_add(1 << SHIFT);
1961                }
1962
1963                // Rotate indices so that head falls into the first block.
1964                let lap = (head >> SHIFT) / LAP;
1965                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1966                head = head.wrapping_sub((lap * LAP) << SHIFT);
1967
1968                // Remove the lower bits.
1969                tail >>= SHIFT;
1970                head >>= SHIFT;
1971
1972                // Return the difference minus the number of blocks between tail and head.
1973                return tail - head - tail / LAP;
1974            }
1975        }
1976    }
1977}
1978
1979impl<T> Drop for Injector<T> {
1980    fn drop(&mut self) {
1981        let mut head = *self.head.index.get_mut();
1982        let mut tail = *self.tail.index.get_mut();
1983        let mut block = *self.head.block.get_mut();
1984
1985        // Erase the lower bits.
1986        head &= !((1 << SHIFT) - 1);
1987        tail &= !((1 << SHIFT) - 1);
1988
1989        unsafe {
1990            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1991            while head != tail {
1992                let offset = (head >> SHIFT) % LAP;
1993
1994                if offset < BLOCK_CAP {
1995                    // Drop the task in the slot.
1996                    let slot = (*block).slots.get_unchecked(offset);
1997                    (*slot.task.get()).assume_init_drop();
1998                } else {
1999                    // Deallocate the block and move to the next one.
2000                    let next = *(*block).next.get_mut();
2001                    drop(Box::from_raw(block));
2002                    block = next;
2003                }
2004
2005                head = head.wrapping_add(1 << SHIFT);
2006            }
2007
2008            // Deallocate the last remaining block.
2009            drop(Box::from_raw(block));
2010        }
2011    }
2012}
2013
2014impl<T> fmt::Debug for Injector<T> {
2015    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2016        f.pad("Worker { .. }")
2017    }
2018}
2019
2020/// Possible outcomes of a steal operation.
2021///
2022/// # Examples
2023///
2024/// There are lots of ways to chain results of steal operations together:
2025///
2026/// ```
2027/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2028///
2029/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2030///
2031/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2032/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2033/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2034///
2035/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2036/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2037/// ```
2038#[must_use]
2039#[derive(PartialEq, Eq, Copy, Clone)]
2040pub enum Steal<T> {
2041    /// The queue was empty at the time of stealing.
2042    Empty,
2043
2044    /// At least one task was successfully stolen.
2045    Success(T),
2046
2047    /// The steal operation needs to be retried.
2048    Retry,
2049}
2050
2051impl<T> Steal<T> {
2052    /// Returns `true` if the queue was empty at the time of stealing.
2053    ///
2054    /// # Examples
2055    ///
2056    /// ```
2057    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2058    ///
2059    /// assert!(!Success(7).is_empty());
2060    /// assert!(!Retry::<i32>.is_empty());
2061    ///
2062    /// assert!(Empty::<i32>.is_empty());
2063    /// ```
2064    pub fn is_empty(&self) -> bool {
2065        match self {
2066            Steal::Empty => true,
2067            _ => false,
2068        }
2069    }
2070
2071    /// Returns `true` if at least one task was stolen.
2072    ///
2073    /// # Examples
2074    ///
2075    /// ```
2076    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2077    ///
2078    /// assert!(!Empty::<i32>.is_success());
2079    /// assert!(!Retry::<i32>.is_success());
2080    ///
2081    /// assert!(Success(7).is_success());
2082    /// ```
2083    pub fn is_success(&self) -> bool {
2084        match self {
2085            Steal::Success(_) => true,
2086            _ => false,
2087        }
2088    }
2089
2090    /// Returns `true` if the steal operation needs to be retried.
2091    ///
2092    /// # Examples
2093    ///
2094    /// ```
2095    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2096    ///
2097    /// assert!(!Empty::<i32>.is_retry());
2098    /// assert!(!Success(7).is_retry());
2099    ///
2100    /// assert!(Retry::<i32>.is_retry());
2101    /// ```
2102    pub fn is_retry(&self) -> bool {
2103        match self {
2104            Steal::Retry => true,
2105            _ => false,
2106        }
2107    }
2108
2109    /// Returns the result of the operation, if successful.
2110    ///
2111    /// # Examples
2112    ///
2113    /// ```
2114    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2115    ///
2116    /// assert_eq!(Empty::<i32>.success(), None);
2117    /// assert_eq!(Retry::<i32>.success(), None);
2118    ///
2119    /// assert_eq!(Success(7).success(), Some(7));
2120    /// ```
2121    pub fn success(self) -> Option<T> {
2122        match self {
2123            Steal::Success(res) => Some(res),
2124            _ => None,
2125        }
2126    }
2127
2128    /// If no task was stolen, attempts another steal operation.
2129    ///
2130    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2131    ///
2132    /// * If the second steal resulted in `Success`, it is returned.
2133    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2134    /// * If both resulted in `None`, then `None` is returned.
2135    ///
2136    /// # Examples
2137    ///
2138    /// ```
2139    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2140    ///
2141    /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2142    /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2143    ///
2144    /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2145    /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2146    ///
2147    /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2148    /// ```
2149    pub fn or_else<F>(self, f: F) -> Steal<T>
2150    where
2151        F: FnOnce() -> Steal<T>,
2152    {
2153        match self {
2154            Steal::Empty => f(),
2155            Steal::Success(_) => self,
2156            Steal::Retry => {
2157                if let Steal::Success(res) = f() {
2158                    Steal::Success(res)
2159                } else {
2160                    Steal::Retry
2161                }
2162            }
2163        }
2164    }
2165}
2166
2167impl<T> fmt::Debug for Steal<T> {
2168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2169        match self {
2170            Steal::Empty => f.pad("Empty"),
2171            Steal::Success(_) => f.pad("Success(..)"),
2172            Steal::Retry => f.pad("Retry"),
2173        }
2174    }
2175}
2176
2177impl<T> FromIterator<Steal<T>> for Steal<T> {
2178    /// Consumes items until a `Success` is found and returns it.
2179    ///
2180    /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2181    /// Otherwise, `Empty` is returned.
2182    fn from_iter<I>(iter: I) -> Steal<T>
2183    where
2184        I: IntoIterator<Item = Steal<T>>,
2185    {
2186        let mut retry = false;
2187        for s in iter {
2188            match &s {
2189                Steal::Empty => {}
2190                Steal::Success(_) => return s,
2191                Steal::Retry => retry = true,
2192            }
2193        }
2194
2195        if retry {
2196            Steal::Retry
2197        } else {
2198            Steal::Empty
2199        }
2200    }
2201}