crossbeam_channel/flavors/
list.rs

1//! Unbounded channel implemented as a linked list.
2
3use std::boxed::Box;
4use std::cell::UnsafeCell;
5use std::marker::PhantomData;
6use std::mem::MaybeUninit;
7use std::ptr;
8use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9use std::time::Instant;
10
11use crossbeam_utils::{Backoff, CachePadded};
12
13use crate::context::Context;
14use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
15use crate::select::{Operation, SelectHandle, Selected, Token};
16use crate::waker::SyncWaker;
17
18// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
19// following changes by @kleimkuhler:
20//
21// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
22// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
23
24// Bits indicating the state of a slot:
25// * If a message has been written into the slot, `WRITE` is set.
26// * If a message has been read from the slot, `READ` is set.
27// * If the block is being destroyed, `DESTROY` is set.
28const WRITE: usize = 1;
29const READ: usize = 2;
30const DESTROY: usize = 4;
31
32// Each block covers one "lap" of indices.
33const LAP: usize = 32;
34// The maximum number of messages a block can hold.
35const BLOCK_CAP: usize = LAP - 1;
36// How many lower bits are reserved for metadata.
37const SHIFT: usize = 1;
38// Has two different purposes:
39// * If set in head, indicates that the block is not the last one.
40// * If set in tail, indicates that the channel is disconnected.
41const MARK_BIT: usize = 1;
42
43/// A slot in a block.
44struct Slot<T> {
45    /// The message.
46    msg: UnsafeCell<MaybeUninit<T>>,
47
48    /// The state of the slot.
49    state: AtomicUsize,
50}
51
52impl<T> Slot<T> {
53    const UNINIT: Self = Self {
54        msg: UnsafeCell::new(MaybeUninit::uninit()),
55        state: AtomicUsize::new(0),
56    };
57
58    /// Waits until a message is written into the slot.
59    fn wait_write(&self) {
60        let backoff = Backoff::new();
61        while self.state.load(Ordering::Acquire) & WRITE == 0 {
62            backoff.snooze();
63        }
64    }
65}
66
67/// A block in a linked list.
68///
69/// Each block in the list can hold up to `BLOCK_CAP` messages.
70struct Block<T> {
71    /// The next block in the linked list.
72    next: AtomicPtr<Block<T>>,
73
74    /// Slots for messages.
75    slots: [Slot<T>; BLOCK_CAP],
76}
77
78impl<T> Block<T> {
79    /// Creates an empty block.
80    fn new() -> Block<T> {
81        Self {
82            next: AtomicPtr::new(ptr::null_mut()),
83            slots: [Slot::UNINIT; BLOCK_CAP],
84        }
85    }
86
87    /// Waits until the next pointer is set.
88    fn wait_next(&self) -> *mut Block<T> {
89        let backoff = Backoff::new();
90        loop {
91            let next = self.next.load(Ordering::Acquire);
92            if !next.is_null() {
93                return next;
94            }
95            backoff.snooze();
96        }
97    }
98
99    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
100    unsafe fn destroy(this: *mut Block<T>, start: usize) {
101        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
102        // begun destruction of the block.
103        for i in start..BLOCK_CAP - 1 {
104            let slot = (*this).slots.get_unchecked(i);
105
106            // Mark the `DESTROY` bit if a thread is still using the slot.
107            if slot.state.load(Ordering::Acquire) & READ == 0
108                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
109            {
110                // If a thread is still using the slot, it will continue destruction of the block.
111                return;
112            }
113        }
114
115        // No thread is using the block, now it is safe to destroy it.
116        drop(Box::from_raw(this));
117    }
118}
119
120/// A position in a channel.
121#[derive(Debug)]
122struct Position<T> {
123    /// The index in the channel.
124    index: AtomicUsize,
125
126    /// The block in the linked list.
127    block: AtomicPtr<Block<T>>,
128}
129
130/// The token type for the list flavor.
131#[derive(Debug)]
132pub(crate) struct ListToken {
133    /// The block of slots.
134    block: *const u8,
135
136    /// The offset into the block.
137    offset: usize,
138}
139
140impl Default for ListToken {
141    #[inline]
142    fn default() -> Self {
143        ListToken {
144            block: ptr::null(),
145            offset: 0,
146        }
147    }
148}
149
150/// Unbounded channel implemented as a linked list.
151///
152/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
153/// represented as numbers of type `usize` and wrap on overflow.
154///
155/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
156/// improve cache efficiency.
157pub(crate) struct Channel<T> {
158    /// The head of the channel.
159    head: CachePadded<Position<T>>,
160
161    /// The tail of the channel.
162    tail: CachePadded<Position<T>>,
163
164    /// Receivers waiting while the channel is empty and not disconnected.
165    receivers: SyncWaker,
166
167    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
168    _marker: PhantomData<T>,
169}
170
171impl<T> Channel<T> {
172    /// Creates a new unbounded channel.
173    pub(crate) fn new() -> Self {
174        Channel {
175            head: CachePadded::new(Position {
176                block: AtomicPtr::new(ptr::null_mut()),
177                index: AtomicUsize::new(0),
178            }),
179            tail: CachePadded::new(Position {
180                block: AtomicPtr::new(ptr::null_mut()),
181                index: AtomicUsize::new(0),
182            }),
183            receivers: SyncWaker::new(),
184            _marker: PhantomData,
185        }
186    }
187
188    /// Returns a receiver handle to the channel.
189    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
190        Receiver(self)
191    }
192
193    /// Returns a sender handle to the channel.
194    pub(crate) fn sender(&self) -> Sender<'_, T> {
195        Sender(self)
196    }
197
198    /// Attempts to reserve a slot for sending a message.
199    fn start_send(&self, token: &mut Token) -> bool {
200        let backoff = Backoff::new();
201        let mut tail = self.tail.index.load(Ordering::Acquire);
202        let mut block = self.tail.block.load(Ordering::Acquire);
203        let mut next_block = None;
204
205        loop {
206            // Check if the channel is disconnected.
207            if tail & MARK_BIT != 0 {
208                token.list.block = ptr::null();
209                return true;
210            }
211
212            // Calculate the offset of the index into the block.
213            let offset = (tail >> SHIFT) % LAP;
214
215            // If we reached the end of the block, wait until the next one is installed.
216            if offset == BLOCK_CAP {
217                backoff.snooze();
218                tail = self.tail.index.load(Ordering::Acquire);
219                block = self.tail.block.load(Ordering::Acquire);
220                continue;
221            }
222
223            // If we're going to have to install the next block, allocate it in advance in order to
224            // make the wait for other threads as short as possible.
225            if offset + 1 == BLOCK_CAP && next_block.is_none() {
226                next_block = Some(Box::new(Block::<T>::new()));
227            }
228
229            // If this is the first message to be sent into the channel, we need to allocate the
230            // first block and install it.
231            if block.is_null() {
232                let new = Box::into_raw(Box::new(Block::<T>::new()));
233
234                if self
235                    .tail
236                    .block
237                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
238                    .is_ok()
239                {
240                    self.head.block.store(new, Ordering::Release);
241                    block = new;
242                } else {
243                    next_block = unsafe { Some(Box::from_raw(new)) };
244                    tail = self.tail.index.load(Ordering::Acquire);
245                    block = self.tail.block.load(Ordering::Acquire);
246                    continue;
247                }
248            }
249
250            let new_tail = tail + (1 << SHIFT);
251
252            // Try advancing the tail forward.
253            match self.tail.index.compare_exchange_weak(
254                tail,
255                new_tail,
256                Ordering::SeqCst,
257                Ordering::Acquire,
258            ) {
259                Ok(_) => unsafe {
260                    // If we've reached the end of the block, install the next one.
261                    if offset + 1 == BLOCK_CAP {
262                        let next_block = Box::into_raw(next_block.unwrap());
263                        self.tail.block.store(next_block, Ordering::Release);
264                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
265                        (*block).next.store(next_block, Ordering::Release);
266                    }
267
268                    token.list.block = block as *const u8;
269                    token.list.offset = offset;
270                    return true;
271                },
272                Err(t) => {
273                    tail = t;
274                    block = self.tail.block.load(Ordering::Acquire);
275                    backoff.spin();
276                }
277            }
278        }
279    }
280
281    /// Writes a message into the channel.
282    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
283        // If there is no slot, the channel is disconnected.
284        if token.list.block.is_null() {
285            return Err(msg);
286        }
287
288        // Write the message into the slot.
289        let block = token.list.block.cast::<Block<T>>();
290        let offset = token.list.offset;
291        let slot = (*block).slots.get_unchecked(offset);
292        slot.msg.get().write(MaybeUninit::new(msg));
293        slot.state.fetch_or(WRITE, Ordering::Release);
294
295        // Wake a sleeping receiver.
296        self.receivers.notify();
297        Ok(())
298    }
299
300    /// Attempts to reserve a slot for receiving a message.
301    fn start_recv(&self, token: &mut Token) -> bool {
302        let backoff = Backoff::new();
303        let mut head = self.head.index.load(Ordering::Acquire);
304        let mut block = self.head.block.load(Ordering::Acquire);
305
306        loop {
307            // Calculate the offset of the index into the block.
308            let offset = (head >> SHIFT) % LAP;
309
310            // If we reached the end of the block, wait until the next one is installed.
311            if offset == BLOCK_CAP {
312                backoff.snooze();
313                head = self.head.index.load(Ordering::Acquire);
314                block = self.head.block.load(Ordering::Acquire);
315                continue;
316            }
317
318            let mut new_head = head + (1 << SHIFT);
319
320            if new_head & MARK_BIT == 0 {
321                atomic::fence(Ordering::SeqCst);
322                let tail = self.tail.index.load(Ordering::Relaxed);
323
324                // If the tail equals the head, that means the channel is empty.
325                if head >> SHIFT == tail >> SHIFT {
326                    // If the channel is disconnected...
327                    if tail & MARK_BIT != 0 {
328                        // ...then receive an error.
329                        token.list.block = ptr::null();
330                        return true;
331                    } else {
332                        // Otherwise, the receive operation is not ready.
333                        return false;
334                    }
335                }
336
337                // If head and tail are not in the same block, set `MARK_BIT` in head.
338                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
339                    new_head |= MARK_BIT;
340                }
341            }
342
343            // The block can be null here only if the first message is being sent into the channel.
344            // In that case, just wait until it gets initialized.
345            if block.is_null() {
346                backoff.snooze();
347                head = self.head.index.load(Ordering::Acquire);
348                block = self.head.block.load(Ordering::Acquire);
349                continue;
350            }
351
352            // Try moving the head index forward.
353            match self.head.index.compare_exchange_weak(
354                head,
355                new_head,
356                Ordering::SeqCst,
357                Ordering::Acquire,
358            ) {
359                Ok(_) => unsafe {
360                    // If we've reached the end of the block, move to the next one.
361                    if offset + 1 == BLOCK_CAP {
362                        let next = (*block).wait_next();
363                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
364                        if !(*next).next.load(Ordering::Relaxed).is_null() {
365                            next_index |= MARK_BIT;
366                        }
367
368                        self.head.block.store(next, Ordering::Release);
369                        self.head.index.store(next_index, Ordering::Release);
370                    }
371
372                    token.list.block = block as *const u8;
373                    token.list.offset = offset;
374                    return true;
375                },
376                Err(h) => {
377                    head = h;
378                    block = self.head.block.load(Ordering::Acquire);
379                    backoff.spin();
380                }
381            }
382        }
383    }
384
385    /// Reads a message from the channel.
386    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
387        if token.list.block.is_null() {
388            // The channel is disconnected.
389            return Err(());
390        }
391
392        // Read the message.
393        let block = token.list.block as *mut Block<T>;
394        let offset = token.list.offset;
395        let slot = (*block).slots.get_unchecked(offset);
396        slot.wait_write();
397        let msg = slot.msg.get().read().assume_init();
398
399        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
400        // couldn't because we were busy reading from the slot.
401        if offset + 1 == BLOCK_CAP {
402            Block::destroy(block, 0);
403        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
404            Block::destroy(block, offset + 1);
405        }
406
407        Ok(msg)
408    }
409
410    /// Attempts to send a message into the channel.
411    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
412        self.send(msg, None).map_err(|err| match err {
413            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
414            SendTimeoutError::Timeout(_) => unreachable!(),
415        })
416    }
417
418    /// Sends a message into the channel.
419    pub(crate) fn send(
420        &self,
421        msg: T,
422        _deadline: Option<Instant>,
423    ) -> Result<(), SendTimeoutError<T>> {
424        let token = &mut Token::default();
425        assert!(self.start_send(token));
426        unsafe {
427            self.write(token, msg)
428                .map_err(SendTimeoutError::Disconnected)
429        }
430    }
431
432    /// Attempts to receive a message without blocking.
433    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
434        let token = &mut Token::default();
435
436        if self.start_recv(token) {
437            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
438        } else {
439            Err(TryRecvError::Empty)
440        }
441    }
442
443    /// Receives a message from the channel.
444    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
445        let token = &mut Token::default();
446        loop {
447            // Try receiving a message several times.
448            let backoff = Backoff::new();
449            loop {
450                if self.start_recv(token) {
451                    unsafe {
452                        return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
453                    }
454                }
455
456                if backoff.is_completed() {
457                    break;
458                } else {
459                    backoff.snooze();
460                }
461            }
462
463            if let Some(d) = deadline {
464                if Instant::now() >= d {
465                    return Err(RecvTimeoutError::Timeout);
466                }
467            }
468
469            // Prepare for blocking until a sender wakes us up.
470            Context::with(|cx| {
471                let oper = Operation::hook(token);
472                self.receivers.register(oper, cx);
473
474                // Has the channel become ready just now?
475                if !self.is_empty() || self.is_disconnected() {
476                    let _ = cx.try_select(Selected::Aborted);
477                }
478
479                // Block the current thread.
480                let sel = cx.wait_until(deadline);
481
482                match sel {
483                    Selected::Waiting => unreachable!(),
484                    Selected::Aborted | Selected::Disconnected => {
485                        self.receivers.unregister(oper).unwrap();
486                        // If the channel was disconnected, we still have to check for remaining
487                        // messages.
488                    }
489                    Selected::Operation(_) => {}
490                }
491            });
492        }
493    }
494
495    /// Returns the current number of messages inside the channel.
496    pub(crate) fn len(&self) -> usize {
497        loop {
498            // Load the tail index, then load the head index.
499            let mut tail = self.tail.index.load(Ordering::SeqCst);
500            let mut head = self.head.index.load(Ordering::SeqCst);
501
502            // If the tail index didn't change, we've got consistent indices to work with.
503            if self.tail.index.load(Ordering::SeqCst) == tail {
504                // Erase the lower bits.
505                tail &= !((1 << SHIFT) - 1);
506                head &= !((1 << SHIFT) - 1);
507
508                // Fix up indices if they fall onto block ends.
509                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
510                    tail = tail.wrapping_add(1 << SHIFT);
511                }
512                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
513                    head = head.wrapping_add(1 << SHIFT);
514                }
515
516                // Rotate indices so that head falls into the first block.
517                let lap = (head >> SHIFT) / LAP;
518                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
519                head = head.wrapping_sub((lap * LAP) << SHIFT);
520
521                // Remove the lower bits.
522                tail >>= SHIFT;
523                head >>= SHIFT;
524
525                // Return the difference minus the number of blocks between tail and head.
526                return tail - head - tail / LAP;
527            }
528        }
529    }
530
531    /// Returns the capacity of the channel.
532    pub(crate) fn capacity(&self) -> Option<usize> {
533        None
534    }
535
536    /// Disconnects senders and wakes up all blocked receivers.
537    ///
538    /// Returns `true` if this call disconnected the channel.
539    pub(crate) fn disconnect_senders(&self) -> bool {
540        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
541
542        if tail & MARK_BIT == 0 {
543            self.receivers.disconnect();
544            true
545        } else {
546            false
547        }
548    }
549
550    /// Disconnects receivers.
551    ///
552    /// Returns `true` if this call disconnected the channel.
553    pub(crate) fn disconnect_receivers(&self) -> bool {
554        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
555
556        if tail & MARK_BIT == 0 {
557            // If receivers are dropped first, discard all messages to free
558            // memory eagerly.
559            self.discard_all_messages();
560            true
561        } else {
562            false
563        }
564    }
565
566    /// Discards all messages.
567    ///
568    /// This method should only be called when all receivers are dropped.
569    fn discard_all_messages(&self) {
570        let backoff = Backoff::new();
571        let mut tail = self.tail.index.load(Ordering::Acquire);
572        loop {
573            let offset = (tail >> SHIFT) % LAP;
574            if offset != BLOCK_CAP {
575                break;
576            }
577
578            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
579            // at boundary. We need to wait for the updates take affect otherwise there
580            // can be memory leaks.
581            backoff.snooze();
582            tail = self.tail.index.load(Ordering::Acquire);
583        }
584
585        let mut head = self.head.index.load(Ordering::Acquire);
586        // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
587        // to initialize the first block before noticing that the receivers disconnected. Late allocations
588        // will be deallocated by the sender in Drop
589        let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
590
591        // If we're going to be dropping messages we need to synchronize with initialization
592        if head >> SHIFT != tail >> SHIFT {
593            // The block can be null here only if a sender is in the process of initializing the
594            // channel while another sender managed to send a message by inserting it into the
595            // semi-initialized channel and advanced the tail.
596            // In that case, just wait until it gets initialized.
597            while block.is_null() {
598                backoff.snooze();
599                block = self.head.block.load(Ordering::Acquire);
600            }
601        }
602
603        unsafe {
604            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
605            while head >> SHIFT != tail >> SHIFT {
606                let offset = (head >> SHIFT) % LAP;
607
608                if offset < BLOCK_CAP {
609                    // Drop the message in the slot.
610                    let slot = (*block).slots.get_unchecked(offset);
611                    slot.wait_write();
612                    (*slot.msg.get()).assume_init_drop();
613                } else {
614                    (*block).wait_next();
615                    // Deallocate the block and move to the next one.
616                    let next = (*block).next.load(Ordering::Acquire);
617                    drop(Box::from_raw(block));
618                    block = next;
619                }
620
621                head = head.wrapping_add(1 << SHIFT);
622            }
623
624            // Deallocate the last remaining block.
625            if !block.is_null() {
626                drop(Box::from_raw(block));
627            }
628        }
629        head &= !MARK_BIT;
630        self.head.index.store(head, Ordering::Release);
631    }
632
633    /// Returns `true` if the channel is disconnected.
634    pub(crate) fn is_disconnected(&self) -> bool {
635        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
636    }
637
638    /// Returns `true` if the channel is empty.
639    pub(crate) fn is_empty(&self) -> bool {
640        let head = self.head.index.load(Ordering::SeqCst);
641        let tail = self.tail.index.load(Ordering::SeqCst);
642        head >> SHIFT == tail >> SHIFT
643    }
644
645    /// Returns `true` if the channel is full.
646    pub(crate) fn is_full(&self) -> bool {
647        false
648    }
649}
650
651impl<T> Drop for Channel<T> {
652    fn drop(&mut self) {
653        let mut head = *self.head.index.get_mut();
654        let mut tail = *self.tail.index.get_mut();
655        let mut block = *self.head.block.get_mut();
656
657        // Erase the lower bits.
658        head &= !((1 << SHIFT) - 1);
659        tail &= !((1 << SHIFT) - 1);
660
661        unsafe {
662            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
663            while head != tail {
664                let offset = (head >> SHIFT) % LAP;
665
666                if offset < BLOCK_CAP {
667                    // Drop the message in the slot.
668                    let slot = (*block).slots.get_unchecked(offset);
669                    (*slot.msg.get()).assume_init_drop();
670                } else {
671                    // Deallocate the block and move to the next one.
672                    let next = *(*block).next.get_mut();
673                    drop(Box::from_raw(block));
674                    block = next;
675                }
676
677                head = head.wrapping_add(1 << SHIFT);
678            }
679
680            // Deallocate the last remaining block.
681            if !block.is_null() {
682                drop(Box::from_raw(block));
683            }
684        }
685    }
686}
687
688/// Receiver handle to a channel.
689pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
690
691/// Sender handle to a channel.
692pub(crate) struct Sender<'a, T>(&'a Channel<T>);
693
694impl<T> SelectHandle for Receiver<'_, T> {
695    fn try_select(&self, token: &mut Token) -> bool {
696        self.0.start_recv(token)
697    }
698
699    fn deadline(&self) -> Option<Instant> {
700        None
701    }
702
703    fn register(&self, oper: Operation, cx: &Context) -> bool {
704        self.0.receivers.register(oper, cx);
705        self.is_ready()
706    }
707
708    fn unregister(&self, oper: Operation) {
709        self.0.receivers.unregister(oper);
710    }
711
712    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
713        self.try_select(token)
714    }
715
716    fn is_ready(&self) -> bool {
717        !self.0.is_empty() || self.0.is_disconnected()
718    }
719
720    fn watch(&self, oper: Operation, cx: &Context) -> bool {
721        self.0.receivers.watch(oper, cx);
722        self.is_ready()
723    }
724
725    fn unwatch(&self, oper: Operation) {
726        self.0.receivers.unwatch(oper);
727    }
728}
729
730impl<T> SelectHandle for Sender<'_, T> {
731    fn try_select(&self, token: &mut Token) -> bool {
732        self.0.start_send(token)
733    }
734
735    fn deadline(&self) -> Option<Instant> {
736        None
737    }
738
739    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
740        self.is_ready()
741    }
742
743    fn unregister(&self, _oper: Operation) {}
744
745    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
746        self.try_select(token)
747    }
748
749    fn is_ready(&self) -> bool {
750        true
751    }
752
753    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
754        self.is_ready()
755    }
756
757    fn unwatch(&self, _oper: Operation) {}
758}