1//! Unbounded channel implemented as a linked list.
23use std::boxed::Box;
4use std::cell::UnsafeCell;
5use std::marker::PhantomData;
6use std::mem::MaybeUninit;
7use std::ptr;
8use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9use std::time::Instant;
1011use crossbeam_utils::{Backoff, CachePadded};
1213use crate::context::Context;
14use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
15use crate::select::{Operation, SelectHandle, Selected, Token};
16use crate::waker::SyncWaker;
1718// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
19// following changes by @kleimkuhler:
20//
21// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
22// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
2324// Bits indicating the state of a slot:
25// * If a message has been written into the slot, `WRITE` is set.
26// * If a message has been read from the slot, `READ` is set.
27// * If the block is being destroyed, `DESTROY` is set.
28const WRITE: usize = 1;
29const READ: usize = 2;
30const DESTROY: usize = 4;
3132// Each block covers one "lap" of indices.
33const LAP: usize = 32;
34// The maximum number of messages a block can hold.
35const BLOCK_CAP: usize = LAP - 1;
36// How many lower bits are reserved for metadata.
37const SHIFT: usize = 1;
38// Has two different purposes:
39// * If set in head, indicates that the block is not the last one.
40// * If set in tail, indicates that the channel is disconnected.
41const MARK_BIT: usize = 1;
4243/// A slot in a block.
44struct Slot<T> {
45/// The message.
46msg: UnsafeCell<MaybeUninit<T>>,
4748/// The state of the slot.
49state: AtomicUsize,
50}
5152impl<T> Slot<T> {
53const UNINIT: Self = Self {
54 msg: UnsafeCell::new(MaybeUninit::uninit()),
55 state: AtomicUsize::new(0),
56 };
5758/// Waits until a message is written into the slot.
59fn wait_write(&self) {
60let backoff = Backoff::new();
61while self.state.load(Ordering::Acquire) & WRITE == 0 {
62 backoff.snooze();
63 }
64 }
65}
6667/// A block in a linked list.
68///
69/// Each block in the list can hold up to `BLOCK_CAP` messages.
70struct Block<T> {
71/// The next block in the linked list.
72next: AtomicPtr<Block<T>>,
7374/// Slots for messages.
75slots: [Slot<T>; BLOCK_CAP],
76}
7778impl<T> Block<T> {
79/// Creates an empty block.
80fn new() -> Block<T> {
81Self {
82 next: AtomicPtr::new(ptr::null_mut()),
83 slots: [Slot::UNINIT; BLOCK_CAP],
84 }
85 }
8687/// Waits until the next pointer is set.
88fn wait_next(&self) -> *mut Block<T> {
89let backoff = Backoff::new();
90loop {
91let next = self.next.load(Ordering::Acquire);
92if !next.is_null() {
93return next;
94 }
95 backoff.snooze();
96 }
97 }
9899/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
100unsafe fn destroy(this: *mut Block<T>, start: usize) {
101// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
102 // begun destruction of the block.
103for i in start..BLOCK_CAP - 1 {
104let slot = (*this).slots.get_unchecked(i);
105106// Mark the `DESTROY` bit if a thread is still using the slot.
107if slot.state.load(Ordering::Acquire) & READ == 0
108&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
109{
110// If a thread is still using the slot, it will continue destruction of the block.
111return;
112 }
113 }
114115// No thread is using the block, now it is safe to destroy it.
116drop(Box::from_raw(this));
117 }
118}
119120/// A position in a channel.
121#[derive(Debug)]
122struct Position<T> {
123/// The index in the channel.
124index: AtomicUsize,
125126/// The block in the linked list.
127block: AtomicPtr<Block<T>>,
128}
129130/// The token type for the list flavor.
131#[derive(Debug)]
132pub(crate) struct ListToken {
133/// The block of slots.
134block: *const u8,
135136/// The offset into the block.
137offset: usize,
138}
139140impl Default for ListToken {
141#[inline]
142fn default() -> Self {
143 ListToken {
144 block: ptr::null(),
145 offset: 0,
146 }
147 }
148}
149150/// Unbounded channel implemented as a linked list.
151///
152/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
153/// represented as numbers of type `usize` and wrap on overflow.
154///
155/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
156/// improve cache efficiency.
157pub(crate) struct Channel<T> {
158/// The head of the channel.
159head: CachePadded<Position<T>>,
160161/// The tail of the channel.
162tail: CachePadded<Position<T>>,
163164/// Receivers waiting while the channel is empty and not disconnected.
165receivers: SyncWaker,
166167/// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
168_marker: PhantomData<T>,
169}
170171impl<T> Channel<T> {
172/// Creates a new unbounded channel.
173pub(crate) fn new() -> Self {
174 Channel {
175 head: CachePadded::new(Position {
176 block: AtomicPtr::new(ptr::null_mut()),
177 index: AtomicUsize::new(0),
178 }),
179 tail: CachePadded::new(Position {
180 block: AtomicPtr::new(ptr::null_mut()),
181 index: AtomicUsize::new(0),
182 }),
183 receivers: SyncWaker::new(),
184 _marker: PhantomData,
185 }
186 }
187188/// Returns a receiver handle to the channel.
189pub(crate) fn receiver(&self) -> Receiver<'_, T> {
190 Receiver(self)
191 }
192193/// Returns a sender handle to the channel.
194pub(crate) fn sender(&self) -> Sender<'_, T> {
195 Sender(self)
196 }
197198/// Attempts to reserve a slot for sending a message.
199fn start_send(&self, token: &mut Token) -> bool {
200let backoff = Backoff::new();
201let mut tail = self.tail.index.load(Ordering::Acquire);
202let mut block = self.tail.block.load(Ordering::Acquire);
203let mut next_block = None;
204205loop {
206// Check if the channel is disconnected.
207if tail & MARK_BIT != 0 {
208 token.list.block = ptr::null();
209return true;
210 }
211212// Calculate the offset of the index into the block.
213let offset = (tail >> SHIFT) % LAP;
214215// If we reached the end of the block, wait until the next one is installed.
216if offset == BLOCK_CAP {
217 backoff.snooze();
218 tail = self.tail.index.load(Ordering::Acquire);
219 block = self.tail.block.load(Ordering::Acquire);
220continue;
221 }
222223// If we're going to have to install the next block, allocate it in advance in order to
224 // make the wait for other threads as short as possible.
225if offset + 1 == BLOCK_CAP && next_block.is_none() {
226 next_block = Some(Box::new(Block::<T>::new()));
227 }
228229// If this is the first message to be sent into the channel, we need to allocate the
230 // first block and install it.
231if block.is_null() {
232let new = Box::into_raw(Box::new(Block::<T>::new()));
233234if self
235.tail
236 .block
237 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
238 .is_ok()
239 {
240self.head.block.store(new, Ordering::Release);
241 block = new;
242 } else {
243 next_block = unsafe { Some(Box::from_raw(new)) };
244 tail = self.tail.index.load(Ordering::Acquire);
245 block = self.tail.block.load(Ordering::Acquire);
246continue;
247 }
248 }
249250let new_tail = tail + (1 << SHIFT);
251252// Try advancing the tail forward.
253match self.tail.index.compare_exchange_weak(
254 tail,
255 new_tail,
256 Ordering::SeqCst,
257 Ordering::Acquire,
258 ) {
259Ok(_) => unsafe {
260// If we've reached the end of the block, install the next one.
261if offset + 1 == BLOCK_CAP {
262let next_block = Box::into_raw(next_block.unwrap());
263self.tail.block.store(next_block, Ordering::Release);
264self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
265 (*block).next.store(next_block, Ordering::Release);
266 }
267268 token.list.block = block as *const u8;
269 token.list.offset = offset;
270return true;
271 },
272Err(t) => {
273 tail = t;
274 block = self.tail.block.load(Ordering::Acquire);
275 backoff.spin();
276 }
277 }
278 }
279 }
280281/// Writes a message into the channel.
282pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
283// If there is no slot, the channel is disconnected.
284if token.list.block.is_null() {
285return Err(msg);
286 }
287288// Write the message into the slot.
289let block = token.list.block.cast::<Block<T>>();
290let offset = token.list.offset;
291let slot = (*block).slots.get_unchecked(offset);
292 slot.msg.get().write(MaybeUninit::new(msg));
293 slot.state.fetch_or(WRITE, Ordering::Release);
294295// Wake a sleeping receiver.
296self.receivers.notify();
297Ok(())
298 }
299300/// Attempts to reserve a slot for receiving a message.
301fn start_recv(&self, token: &mut Token) -> bool {
302let backoff = Backoff::new();
303let mut head = self.head.index.load(Ordering::Acquire);
304let mut block = self.head.block.load(Ordering::Acquire);
305306loop {
307// Calculate the offset of the index into the block.
308let offset = (head >> SHIFT) % LAP;
309310// If we reached the end of the block, wait until the next one is installed.
311if offset == BLOCK_CAP {
312 backoff.snooze();
313 head = self.head.index.load(Ordering::Acquire);
314 block = self.head.block.load(Ordering::Acquire);
315continue;
316 }
317318let mut new_head = head + (1 << SHIFT);
319320if new_head & MARK_BIT == 0 {
321 atomic::fence(Ordering::SeqCst);
322let tail = self.tail.index.load(Ordering::Relaxed);
323324// If the tail equals the head, that means the channel is empty.
325if head >> SHIFT == tail >> SHIFT {
326// If the channel is disconnected...
327if tail & MARK_BIT != 0 {
328// ...then receive an error.
329token.list.block = ptr::null();
330return true;
331 } else {
332// Otherwise, the receive operation is not ready.
333return false;
334 }
335 }
336337// If head and tail are not in the same block, set `MARK_BIT` in head.
338if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
339 new_head |= MARK_BIT;
340 }
341 }
342343// The block can be null here only if the first message is being sent into the channel.
344 // In that case, just wait until it gets initialized.
345if block.is_null() {
346 backoff.snooze();
347 head = self.head.index.load(Ordering::Acquire);
348 block = self.head.block.load(Ordering::Acquire);
349continue;
350 }
351352// Try moving the head index forward.
353match self.head.index.compare_exchange_weak(
354 head,
355 new_head,
356 Ordering::SeqCst,
357 Ordering::Acquire,
358 ) {
359Ok(_) => unsafe {
360// If we've reached the end of the block, move to the next one.
361if offset + 1 == BLOCK_CAP {
362let next = (*block).wait_next();
363let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
364if !(*next).next.load(Ordering::Relaxed).is_null() {
365 next_index |= MARK_BIT;
366 }
367368self.head.block.store(next, Ordering::Release);
369self.head.index.store(next_index, Ordering::Release);
370 }
371372 token.list.block = block as *const u8;
373 token.list.offset = offset;
374return true;
375 },
376Err(h) => {
377 head = h;
378 block = self.head.block.load(Ordering::Acquire);
379 backoff.spin();
380 }
381 }
382 }
383 }
384385/// Reads a message from the channel.
386pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
387if token.list.block.is_null() {
388// The channel is disconnected.
389return Err(());
390 }
391392// Read the message.
393let block = token.list.block as *mut Block<T>;
394let offset = token.list.offset;
395let slot = (*block).slots.get_unchecked(offset);
396 slot.wait_write();
397let msg = slot.msg.get().read().assume_init();
398399// Destroy the block if we've reached the end, or if another thread wanted to destroy but
400 // couldn't because we were busy reading from the slot.
401if offset + 1 == BLOCK_CAP {
402 Block::destroy(block, 0);
403 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
404 Block::destroy(block, offset + 1);
405 }
406407Ok(msg)
408 }
409410/// Attempts to send a message into the channel.
411pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
412self.send(msg, None).map_err(|err| match err {
413 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
414 SendTimeoutError::Timeout(_) => unreachable!(),
415 })
416 }
417418/// Sends a message into the channel.
419pub(crate) fn send(
420&self,
421 msg: T,
422 _deadline: Option<Instant>,
423 ) -> Result<(), SendTimeoutError<T>> {
424let token = &mut Token::default();
425assert!(self.start_send(token));
426unsafe {
427self.write(token, msg)
428 .map_err(SendTimeoutError::Disconnected)
429 }
430 }
431432/// Attempts to receive a message without blocking.
433pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
434let token = &mut Token::default();
435436if self.start_recv(token) {
437unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
438 } else {
439Err(TryRecvError::Empty)
440 }
441 }
442443/// Receives a message from the channel.
444pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
445let token = &mut Token::default();
446loop {
447// Try receiving a message several times.
448let backoff = Backoff::new();
449loop {
450if self.start_recv(token) {
451unsafe {
452return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
453 }
454 }
455456if backoff.is_completed() {
457break;
458 } else {
459 backoff.snooze();
460 }
461 }
462463if let Some(d) = deadline {
464if Instant::now() >= d {
465return Err(RecvTimeoutError::Timeout);
466 }
467 }
468469// Prepare for blocking until a sender wakes us up.
470Context::with(|cx| {
471let oper = Operation::hook(token);
472self.receivers.register(oper, cx);
473474// Has the channel become ready just now?
475if !self.is_empty() || self.is_disconnected() {
476let _ = cx.try_select(Selected::Aborted);
477 }
478479// Block the current thread.
480let sel = cx.wait_until(deadline);
481482match sel {
483 Selected::Waiting => unreachable!(),
484 Selected::Aborted | Selected::Disconnected => {
485self.receivers.unregister(oper).unwrap();
486// If the channel was disconnected, we still have to check for remaining
487 // messages.
488}
489 Selected::Operation(_) => {}
490 }
491 });
492 }
493 }
494495/// Returns the current number of messages inside the channel.
496pub(crate) fn len(&self) -> usize {
497loop {
498// Load the tail index, then load the head index.
499let mut tail = self.tail.index.load(Ordering::SeqCst);
500let mut head = self.head.index.load(Ordering::SeqCst);
501502// If the tail index didn't change, we've got consistent indices to work with.
503if self.tail.index.load(Ordering::SeqCst) == tail {
504// Erase the lower bits.
505tail &= !((1 << SHIFT) - 1);
506 head &= !((1 << SHIFT) - 1);
507508// Fix up indices if they fall onto block ends.
509if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
510 tail = tail.wrapping_add(1 << SHIFT);
511 }
512if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
513 head = head.wrapping_add(1 << SHIFT);
514 }
515516// Rotate indices so that head falls into the first block.
517let lap = (head >> SHIFT) / LAP;
518 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
519 head = head.wrapping_sub((lap * LAP) << SHIFT);
520521// Remove the lower bits.
522tail >>= SHIFT;
523 head >>= SHIFT;
524525// Return the difference minus the number of blocks between tail and head.
526return tail - head - tail / LAP;
527 }
528 }
529 }
530531/// Returns the capacity of the channel.
532pub(crate) fn capacity(&self) -> Option<usize> {
533None
534}
535536/// Disconnects senders and wakes up all blocked receivers.
537 ///
538 /// Returns `true` if this call disconnected the channel.
539pub(crate) fn disconnect_senders(&self) -> bool {
540let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
541542if tail & MARK_BIT == 0 {
543self.receivers.disconnect();
544true
545} else {
546false
547}
548 }
549550/// Disconnects receivers.
551 ///
552 /// Returns `true` if this call disconnected the channel.
553pub(crate) fn disconnect_receivers(&self) -> bool {
554let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
555556if tail & MARK_BIT == 0 {
557// If receivers are dropped first, discard all messages to free
558 // memory eagerly.
559self.discard_all_messages();
560true
561} else {
562false
563}
564 }
565566/// Discards all messages.
567 ///
568 /// This method should only be called when all receivers are dropped.
569fn discard_all_messages(&self) {
570let backoff = Backoff::new();
571let mut tail = self.tail.index.load(Ordering::Acquire);
572loop {
573let offset = (tail >> SHIFT) % LAP;
574if offset != BLOCK_CAP {
575break;
576 }
577578// New updates to tail will be rejected by MARK_BIT and aborted unless it's
579 // at boundary. We need to wait for the updates take affect otherwise there
580 // can be memory leaks.
581backoff.snooze();
582 tail = self.tail.index.load(Ordering::Acquire);
583 }
584585let mut head = self.head.index.load(Ordering::Acquire);
586// The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
587 // to initialize the first block before noticing that the receivers disconnected. Late allocations
588 // will be deallocated by the sender in Drop
589let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
590591// If we're going to be dropping messages we need to synchronize with initialization
592if head >> SHIFT != tail >> SHIFT {
593// The block can be null here only if a sender is in the process of initializing the
594 // channel while another sender managed to send a message by inserting it into the
595 // semi-initialized channel and advanced the tail.
596 // In that case, just wait until it gets initialized.
597while block.is_null() {
598 backoff.snooze();
599 block = self.head.block.load(Ordering::Acquire);
600 }
601 }
602603unsafe {
604// Drop all messages between head and tail and deallocate the heap-allocated blocks.
605while head >> SHIFT != tail >> SHIFT {
606let offset = (head >> SHIFT) % LAP;
607608if offset < BLOCK_CAP {
609// Drop the message in the slot.
610let slot = (*block).slots.get_unchecked(offset);
611 slot.wait_write();
612 (*slot.msg.get()).assume_init_drop();
613 } else {
614 (*block).wait_next();
615// Deallocate the block and move to the next one.
616let next = (*block).next.load(Ordering::Acquire);
617 drop(Box::from_raw(block));
618 block = next;
619 }
620621 head = head.wrapping_add(1 << SHIFT);
622 }
623624// Deallocate the last remaining block.
625if !block.is_null() {
626 drop(Box::from_raw(block));
627 }
628 }
629 head &= !MARK_BIT;
630self.head.index.store(head, Ordering::Release);
631 }
632633/// Returns `true` if the channel is disconnected.
634pub(crate) fn is_disconnected(&self) -> bool {
635self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
636}
637638/// Returns `true` if the channel is empty.
639pub(crate) fn is_empty(&self) -> bool {
640let head = self.head.index.load(Ordering::SeqCst);
641let tail = self.tail.index.load(Ordering::SeqCst);
642 head >> SHIFT == tail >> SHIFT
643 }
644645/// Returns `true` if the channel is full.
646pub(crate) fn is_full(&self) -> bool {
647false
648}
649}
650651impl<T> Drop for Channel<T> {
652fn drop(&mut self) {
653let mut head = *self.head.index.get_mut();
654let mut tail = *self.tail.index.get_mut();
655let mut block = *self.head.block.get_mut();
656657// Erase the lower bits.
658head &= !((1 << SHIFT) - 1);
659 tail &= !((1 << SHIFT) - 1);
660661unsafe {
662// Drop all messages between head and tail and deallocate the heap-allocated blocks.
663while head != tail {
664let offset = (head >> SHIFT) % LAP;
665666if offset < BLOCK_CAP {
667// Drop the message in the slot.
668let slot = (*block).slots.get_unchecked(offset);
669 (*slot.msg.get()).assume_init_drop();
670 } else {
671// Deallocate the block and move to the next one.
672let next = *(*block).next.get_mut();
673 drop(Box::from_raw(block));
674 block = next;
675 }
676677 head = head.wrapping_add(1 << SHIFT);
678 }
679680// Deallocate the last remaining block.
681if !block.is_null() {
682 drop(Box::from_raw(block));
683 }
684 }
685 }
686}
687688/// Receiver handle to a channel.
689pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
690691/// Sender handle to a channel.
692pub(crate) struct Sender<'a, T>(&'a Channel<T>);
693694impl<T> SelectHandle for Receiver<'_, T> {
695fn try_select(&self, token: &mut Token) -> bool {
696self.0.start_recv(token)
697 }
698699fn deadline(&self) -> Option<Instant> {
700None
701}
702703fn register(&self, oper: Operation, cx: &Context) -> bool {
704self.0.receivers.register(oper, cx);
705self.is_ready()
706 }
707708fn unregister(&self, oper: Operation) {
709self.0.receivers.unregister(oper);
710 }
711712fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
713self.try_select(token)
714 }
715716fn is_ready(&self) -> bool {
717 !self.0.is_empty() || self.0.is_disconnected()
718 }
719720fn watch(&self, oper: Operation, cx: &Context) -> bool {
721self.0.receivers.watch(oper, cx);
722self.is_ready()
723 }
724725fn unwatch(&self, oper: Operation) {
726self.0.receivers.unwatch(oper);
727 }
728}
729730impl<T> SelectHandle for Sender<'_, T> {
731fn try_select(&self, token: &mut Token) -> bool {
732self.0.start_send(token)
733 }
734735fn deadline(&self) -> Option<Instant> {
736None
737}
738739fn register(&self, _oper: Operation, _cx: &Context) -> bool {
740self.is_ready()
741 }
742743fn unregister(&self, _oper: Operation) {}
744745fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
746self.try_select(token)
747 }
748749fn is_ready(&self) -> bool {
750true
751}
752753fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
754self.is_ready()
755 }
756757fn unwatch(&self, _oper: Operation) {}
758}