1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
1011use std::boxed::Box;
12use std::cell::UnsafeCell;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
1718use crossbeam_utils::{Backoff, CachePadded};
1920use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
2425/// A slot in a channel.
26struct Slot<T> {
27/// The current stamp.
28stamp: AtomicUsize,
2930/// The message in this slot.
31msg: UnsafeCell<MaybeUninit<T>>,
32}
3334/// The token type for the array flavor.
35#[derive(Debug)]
36pub(crate) struct ArrayToken {
37/// Slot to read from or write to.
38slot: *const u8,
3940/// Stamp to store into the slot after reading or writing.
41stamp: usize,
42}
4344impl Default for ArrayToken {
45#[inline]
46fn default() -> Self {
47 ArrayToken {
48 slot: ptr::null(),
49 stamp: 0,
50 }
51 }
52}
5354/// Bounded channel based on a preallocated array.
55pub(crate) struct Channel<T> {
56/// The head of the channel.
57 ///
58 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60 /// represent the lap. The mark bit in the head is always zero.
61 ///
62 /// Messages are popped from the head of the channel.
63head: CachePadded<AtomicUsize>,
6465/// The tail of the channel.
66 ///
67 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69 /// represent the lap. The mark bit indicates that the channel is disconnected.
70 ///
71 /// Messages are pushed into the tail of the channel.
72tail: CachePadded<AtomicUsize>,
7374/// The buffer holding slots.
75buffer: Box<[Slot<T>]>,
7677/// The channel capacity.
78cap: usize,
7980/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81one_lap: usize,
8283/// If this bit is set in the tail, that means the channel is disconnected.
84mark_bit: usize,
8586/// Senders waiting while the channel is full.
87senders: SyncWaker,
8889/// Receivers waiting while the channel is empty and not disconnected.
90receivers: SyncWaker,
91}
9293impl<T> Channel<T> {
94/// Creates a bounded channel of capacity `cap`.
95pub(crate) fn with_capacity(cap: usize) -> Self {
96assert!(cap > 0, "capacity must be positive");
9798// Compute constants `mark_bit` and `one_lap`.
99let mark_bit = (cap + 1).next_power_of_two();
100let one_lap = mark_bit * 2;
101102// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
103let head = 0;
104// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
105let tail = 0;
106107// Allocate a buffer of `cap` slots initialized
108 // with stamps.
109let buffer: Box<[Slot<T>]> = (0..cap)
110 .map(|i| {
111// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
112Slot {
113 stamp: AtomicUsize::new(i),
114 msg: UnsafeCell::new(MaybeUninit::uninit()),
115 }
116 })
117 .collect();
118119 Channel {
120 buffer,
121 cap,
122 one_lap,
123 mark_bit,
124 head: CachePadded::new(AtomicUsize::new(head)),
125 tail: CachePadded::new(AtomicUsize::new(tail)),
126 senders: SyncWaker::new(),
127 receivers: SyncWaker::new(),
128 }
129 }
130131/// Returns a receiver handle to the channel.
132pub(crate) fn receiver(&self) -> Receiver<'_, T> {
133 Receiver(self)
134 }
135136/// Returns a sender handle to the channel.
137pub(crate) fn sender(&self) -> Sender<'_, T> {
138 Sender(self)
139 }
140141/// Attempts to reserve a slot for sending a message.
142fn start_send(&self, token: &mut Token) -> bool {
143let backoff = Backoff::new();
144let mut tail = self.tail.load(Ordering::Relaxed);
145146loop {
147// Check if the channel is disconnected.
148if tail & self.mark_bit != 0 {
149 token.array.slot = ptr::null();
150 token.array.stamp = 0;
151return true;
152 }
153154// Deconstruct the tail.
155let index = tail & (self.mark_bit - 1);
156let lap = tail & !(self.one_lap - 1);
157158// Inspect the corresponding slot.
159debug_assert!(index < self.buffer.len());
160let slot = unsafe { self.buffer.get_unchecked(index) };
161let stamp = slot.stamp.load(Ordering::Acquire);
162163// If the tail and the stamp match, we may attempt to push.
164if tail == stamp {
165let new_tail = if index + 1 < self.cap {
166// Same lap, incremented index.
167 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
168tail + 1
169} else {
170// One lap forward, index wraps around to zero.
171 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
172lap.wrapping_add(self.one_lap)
173 };
174175// Try moving the tail.
176match self.tail.compare_exchange_weak(
177 tail,
178 new_tail,
179 Ordering::SeqCst,
180 Ordering::Relaxed,
181 ) {
182Ok(_) => {
183// Prepare the token for the follow-up call to `write`.
184token.array.slot = slot as *const Slot<T> as *const u8;
185 token.array.stamp = tail + 1;
186return true;
187 }
188Err(t) => {
189 tail = t;
190 backoff.spin();
191 }
192 }
193 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
194 atomic::fence(Ordering::SeqCst);
195let head = self.head.load(Ordering::Relaxed);
196197// If the head lags one lap behind the tail as well...
198if head.wrapping_add(self.one_lap) == tail {
199// ...then the channel is full.
200return false;
201 }
202203 backoff.spin();
204 tail = self.tail.load(Ordering::Relaxed);
205 } else {
206// Snooze because we need to wait for the stamp to get updated.
207backoff.snooze();
208 tail = self.tail.load(Ordering::Relaxed);
209 }
210 }
211 }
212213/// Writes a message into the channel.
214pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
215// If there is no slot, the channel is disconnected.
216if token.array.slot.is_null() {
217return Err(msg);
218 }
219220let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
221222// Write the message into the slot and update the stamp.
223slot.msg.get().write(MaybeUninit::new(msg));
224 slot.stamp.store(token.array.stamp, Ordering::Release);
225226// Wake a sleeping receiver.
227self.receivers.notify();
228Ok(())
229 }
230231/// Attempts to reserve a slot for receiving a message.
232fn start_recv(&self, token: &mut Token) -> bool {
233let backoff = Backoff::new();
234let mut head = self.head.load(Ordering::Relaxed);
235236loop {
237// Deconstruct the head.
238let index = head & (self.mark_bit - 1);
239let lap = head & !(self.one_lap - 1);
240241// Inspect the corresponding slot.
242debug_assert!(index < self.buffer.len());
243let slot = unsafe { self.buffer.get_unchecked(index) };
244let stamp = slot.stamp.load(Ordering::Acquire);
245246// If the stamp is ahead of the head by 1, we may attempt to pop.
247if head + 1 == stamp {
248let new = if index + 1 < self.cap {
249// Same lap, incremented index.
250 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
251head + 1
252} else {
253// One lap forward, index wraps around to zero.
254 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
255lap.wrapping_add(self.one_lap)
256 };
257258// Try moving the head.
259match self.head.compare_exchange_weak(
260 head,
261 new,
262 Ordering::SeqCst,
263 Ordering::Relaxed,
264 ) {
265Ok(_) => {
266// Prepare the token for the follow-up call to `read`.
267token.array.slot = slot as *const Slot<T> as *const u8;
268 token.array.stamp = head.wrapping_add(self.one_lap);
269return true;
270 }
271Err(h) => {
272 head = h;
273 backoff.spin();
274 }
275 }
276 } else if stamp == head {
277 atomic::fence(Ordering::SeqCst);
278let tail = self.tail.load(Ordering::Relaxed);
279280// If the tail equals the head, that means the channel is empty.
281if (tail & !self.mark_bit) == head {
282// If the channel is disconnected...
283if tail & self.mark_bit != 0 {
284// ...then receive an error.
285token.array.slot = ptr::null();
286 token.array.stamp = 0;
287return true;
288 } else {
289// Otherwise, the receive operation is not ready.
290return false;
291 }
292 }
293294 backoff.spin();
295 head = self.head.load(Ordering::Relaxed);
296 } else {
297// Snooze because we need to wait for the stamp to get updated.
298backoff.snooze();
299 head = self.head.load(Ordering::Relaxed);
300 }
301 }
302 }
303304/// Reads a message from the channel.
305pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
306if token.array.slot.is_null() {
307// The channel is disconnected.
308return Err(());
309 }
310311let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
312313// Read the message from the slot and update the stamp.
314let msg = slot.msg.get().read().assume_init();
315 slot.stamp.store(token.array.stamp, Ordering::Release);
316317// Wake a sleeping sender.
318self.senders.notify();
319Ok(msg)
320 }
321322/// Attempts to send a message into the channel.
323pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
324let token = &mut Token::default();
325if self.start_send(token) {
326unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
327 } else {
328Err(TrySendError::Full(msg))
329 }
330 }
331332/// Sends a message into the channel.
333pub(crate) fn send(
334&self,
335 msg: T,
336 deadline: Option<Instant>,
337 ) -> Result<(), SendTimeoutError<T>> {
338let token = &mut Token::default();
339loop {
340// Try sending a message several times.
341let backoff = Backoff::new();
342loop {
343if self.start_send(token) {
344let res = unsafe { self.write(token, msg) };
345return res.map_err(SendTimeoutError::Disconnected);
346 }
347348if backoff.is_completed() {
349break;
350 } else {
351 backoff.snooze();
352 }
353 }
354355if let Some(d) = deadline {
356if Instant::now() >= d {
357return Err(SendTimeoutError::Timeout(msg));
358 }
359 }
360361 Context::with(|cx| {
362// Prepare for blocking until a receiver wakes us up.
363let oper = Operation::hook(token);
364self.senders.register(oper, cx);
365366// Has the channel become ready just now?
367if !self.is_full() || self.is_disconnected() {
368let _ = cx.try_select(Selected::Aborted);
369 }
370371// Block the current thread.
372let sel = cx.wait_until(deadline);
373374match sel {
375 Selected::Waiting => unreachable!(),
376 Selected::Aborted | Selected::Disconnected => {
377self.senders.unregister(oper).unwrap();
378 }
379 Selected::Operation(_) => {}
380 }
381 });
382 }
383 }
384385/// Attempts to receive a message without blocking.
386pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
387let token = &mut Token::default();
388389if self.start_recv(token) {
390unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
391 } else {
392Err(TryRecvError::Empty)
393 }
394 }
395396/// Receives a message from the channel.
397pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
398let token = &mut Token::default();
399loop {
400// Try receiving a message several times.
401let backoff = Backoff::new();
402loop {
403if self.start_recv(token) {
404let res = unsafe { self.read(token) };
405return res.map_err(|_| RecvTimeoutError::Disconnected);
406 }
407408if backoff.is_completed() {
409break;
410 } else {
411 backoff.snooze();
412 }
413 }
414415if let Some(d) = deadline {
416if Instant::now() >= d {
417return Err(RecvTimeoutError::Timeout);
418 }
419 }
420421 Context::with(|cx| {
422// Prepare for blocking until a sender wakes us up.
423let oper = Operation::hook(token);
424self.receivers.register(oper, cx);
425426// Has the channel become ready just now?
427if !self.is_empty() || self.is_disconnected() {
428let _ = cx.try_select(Selected::Aborted);
429 }
430431// Block the current thread.
432let sel = cx.wait_until(deadline);
433434match sel {
435 Selected::Waiting => unreachable!(),
436 Selected::Aborted | Selected::Disconnected => {
437self.receivers.unregister(oper).unwrap();
438// If the channel was disconnected, we still have to check for remaining
439 // messages.
440}
441 Selected::Operation(_) => {}
442 }
443 });
444 }
445 }
446447/// Returns the current number of messages inside the channel.
448pub(crate) fn len(&self) -> usize {
449loop {
450// Load the tail, then load the head.
451let tail = self.tail.load(Ordering::SeqCst);
452let head = self.head.load(Ordering::SeqCst);
453454// If the tail didn't change, we've got consistent values to work with.
455if self.tail.load(Ordering::SeqCst) == tail {
456let hix = head & (self.mark_bit - 1);
457let tix = tail & (self.mark_bit - 1);
458459return if hix < tix {
460 tix - hix
461 } else if hix > tix {
462self.cap - hix + tix
463 } else if (tail & !self.mark_bit) == head {
4640
465} else {
466self.cap
467 };
468 }
469 }
470 }
471472/// Returns the capacity of the channel.
473pub(crate) fn capacity(&self) -> Option<usize> {
474Some(self.cap)
475 }
476477/// Disconnects the channel and wakes up all blocked senders and receivers.
478 ///
479 /// Returns `true` if this call disconnected the channel.
480pub(crate) fn disconnect(&self) -> bool {
481let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
482483if tail & self.mark_bit == 0 {
484self.senders.disconnect();
485self.receivers.disconnect();
486true
487} else {
488false
489}
490 }
491492/// Returns `true` if the channel is disconnected.
493pub(crate) fn is_disconnected(&self) -> bool {
494self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
495}
496497/// Returns `true` if the channel is empty.
498pub(crate) fn is_empty(&self) -> bool {
499let head = self.head.load(Ordering::SeqCst);
500let tail = self.tail.load(Ordering::SeqCst);
501502// Is the tail equal to the head?
503 //
504 // Note: If the head changes just before we load the tail, that means there was a moment
505 // when the channel was not empty, so it is safe to just return `false`.
506(tail & !self.mark_bit) == head
507 }
508509/// Returns `true` if the channel is full.
510pub(crate) fn is_full(&self) -> bool {
511let tail = self.tail.load(Ordering::SeqCst);
512let head = self.head.load(Ordering::SeqCst);
513514// Is the head lagging one lap behind tail?
515 //
516 // Note: If the tail changes just before we load the head, that means there was a moment
517 // when the channel was not full, so it is safe to just return `false`.
518head.wrapping_add(self.one_lap) == tail & !self.mark_bit
519 }
520}
521522impl<T> Drop for Channel<T> {
523fn drop(&mut self) {
524if mem::needs_drop::<T>() {
525// Get the index of the head.
526let head = *self.head.get_mut();
527let tail = *self.tail.get_mut();
528529let hix = head & (self.mark_bit - 1);
530let tix = tail & (self.mark_bit - 1);
531532let len = if hix < tix {
533 tix - hix
534 } else if hix > tix {
535self.cap - hix + tix
536 } else if (tail & !self.mark_bit) == head {
5370
538} else {
539self.cap
540 };
541542// Loop over all slots that hold a message and drop them.
543for i in 0..len {
544// Compute the index of the next slot holding a message.
545let index = if hix + i < self.cap {
546 hix + i
547 } else {
548 hix + i - self.cap
549 };
550551unsafe {
552debug_assert!(index < self.buffer.len());
553let slot = self.buffer.get_unchecked_mut(index);
554 (*slot.msg.get()).assume_init_drop();
555 }
556 }
557 }
558 }
559}
560561/// Receiver handle to a channel.
562pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
563564/// Sender handle to a channel.
565pub(crate) struct Sender<'a, T>(&'a Channel<T>);
566567impl<T> SelectHandle for Receiver<'_, T> {
568fn try_select(&self, token: &mut Token) -> bool {
569self.0.start_recv(token)
570 }
571572fn deadline(&self) -> Option<Instant> {
573None
574}
575576fn register(&self, oper: Operation, cx: &Context) -> bool {
577self.0.receivers.register(oper, cx);
578self.is_ready()
579 }
580581fn unregister(&self, oper: Operation) {
582self.0.receivers.unregister(oper);
583 }
584585fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
586self.try_select(token)
587 }
588589fn is_ready(&self) -> bool {
590 !self.0.is_empty() || self.0.is_disconnected()
591 }
592593fn watch(&self, oper: Operation, cx: &Context) -> bool {
594self.0.receivers.watch(oper, cx);
595self.is_ready()
596 }
597598fn unwatch(&self, oper: Operation) {
599self.0.receivers.unwatch(oper);
600 }
601}
602603impl<T> SelectHandle for Sender<'_, T> {
604fn try_select(&self, token: &mut Token) -> bool {
605self.0.start_send(token)
606 }
607608fn deadline(&self) -> Option<Instant> {
609None
610}
611612fn register(&self, oper: Operation, cx: &Context) -> bool {
613self.0.senders.register(oper, cx);
614self.is_ready()
615 }
616617fn unregister(&self, oper: Operation) {
618self.0.senders.unregister(oper);
619 }
620621fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
622self.try_select(token)
623 }
624625fn is_ready(&self) -> bool {
626 !self.0.is_full() || self.0.is_disconnected()
627 }
628629fn watch(&self, oper: Operation, cx: &Context) -> bool {
630self.0.senders.watch(oper, cx);
631self.is_ready()
632 }
633634fn unwatch(&self, oper: Operation) {
635self.0.senders.unwatch(oper);
636 }
637}