crossbeam_channel/select.rs
1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7use std::vec::Vec;
8
9use crossbeam_utils::Backoff;
10
11use crate::channel::{self, Receiver, Sender};
12use crate::context::Context;
13use crate::err::{ReadyTimeoutError, TryReadyError};
14use crate::err::{RecvError, SendError};
15use crate::err::{SelectTimeoutError, TrySelectError};
16use crate::flavors;
17use crate::utils;
18
19/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
20/// `read` or `write`.
21///
22/// Each field contains data associated with a specific channel flavor.
23// This is a private API that is used by the select macro.
24#[derive(Debug, Default)]
25pub struct Token {
26 pub(crate) at: flavors::at::AtToken,
27 pub(crate) array: flavors::array::ArrayToken,
28 pub(crate) list: flavors::list::ListToken,
29 #[allow(dead_code)]
30 pub(crate) never: flavors::never::NeverToken,
31 pub(crate) tick: flavors::tick::TickToken,
32 pub(crate) zero: flavors::zero::ZeroToken,
33}
34
35/// Identifier associated with an operation by a specific thread on a specific channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct Operation(usize);
38
39impl Operation {
40 /// Creates an operation identifier from a mutable reference.
41 ///
42 /// This function essentially just turns the address of the reference into a number. The
43 /// reference should point to a variable that is specific to the thread and the operation,
44 /// and is alive for the entire duration of select or blocking operation.
45 #[inline]
46 pub fn hook<T>(r: &mut T) -> Operation {
47 let val = r as *mut T as usize;
48 // Make sure that the pointer address doesn't equal the numerical representation of
49 // `Selected::{Waiting, Aborted, Disconnected}`.
50 assert!(val > 2);
51 Operation(val)
52 }
53}
54
55/// Current state of a select or a blocking operation.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum Selected {
58 /// Still waiting for an operation.
59 Waiting,
60
61 /// The attempt to block the current thread has been aborted.
62 Aborted,
63
64 /// An operation became ready because a channel is disconnected.
65 Disconnected,
66
67 /// An operation became ready because a message can be sent or received.
68 Operation(Operation),
69}
70
71impl From<usize> for Selected {
72 #[inline]
73 fn from(val: usize) -> Selected {
74 match val {
75 0 => Selected::Waiting,
76 1 => Selected::Aborted,
77 2 => Selected::Disconnected,
78 oper => Selected::Operation(Operation(oper)),
79 }
80 }
81}
82
83impl Into<usize> for Selected {
84 #[inline]
85 fn into(self) -> usize {
86 match self {
87 Selected::Waiting => 0,
88 Selected::Aborted => 1,
89 Selected::Disconnected => 2,
90 Selected::Operation(Operation(val)) => val,
91 }
92 }
93}
94
95/// A receiver or a sender that can participate in select.
96///
97/// This is a handle that assists select in executing an operation, registration, deciding on the
98/// appropriate deadline for blocking, etc.
99// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
100pub trait SelectHandle {
101 /// Attempts to select an operation and returns `true` on success.
102 fn try_select(&self, token: &mut Token) -> bool;
103
104 /// Returns a deadline for an operation, if there is one.
105 fn deadline(&self) -> Option<Instant>;
106
107 /// Registers an operation for execution and returns `true` if it is now ready.
108 fn register(&self, oper: Operation, cx: &Context) -> bool;
109
110 /// Unregisters an operation for execution.
111 fn unregister(&self, oper: Operation);
112
113 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
114 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
115
116 /// Returns `true` if an operation can be executed without blocking.
117 fn is_ready(&self) -> bool;
118
119 /// Registers an operation for readiness notification and returns `true` if it is now ready.
120 fn watch(&self, oper: Operation, cx: &Context) -> bool;
121
122 /// Unregisters an operation for readiness notification.
123 fn unwatch(&self, oper: Operation);
124}
125
126impl<T: SelectHandle> SelectHandle for &T {
127 fn try_select(&self, token: &mut Token) -> bool {
128 (**self).try_select(token)
129 }
130
131 fn deadline(&self) -> Option<Instant> {
132 (**self).deadline()
133 }
134
135 fn register(&self, oper: Operation, cx: &Context) -> bool {
136 (**self).register(oper, cx)
137 }
138
139 fn unregister(&self, oper: Operation) {
140 (**self).unregister(oper);
141 }
142
143 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
144 (**self).accept(token, cx)
145 }
146
147 fn is_ready(&self) -> bool {
148 (**self).is_ready()
149 }
150
151 fn watch(&self, oper: Operation, cx: &Context) -> bool {
152 (**self).watch(oper, cx)
153 }
154
155 fn unwatch(&self, oper: Operation) {
156 (**self).unwatch(oper)
157 }
158}
159
160/// Determines when a select operation should time out.
161#[derive(Clone, Copy, Eq, PartialEq)]
162enum Timeout {
163 /// No blocking.
164 Now,
165
166 /// Block forever.
167 Never,
168
169 /// Time out after the time instant.
170 At(Instant),
171}
172
173/// Runs until one of the operations is selected, potentially blocking the current thread.
174///
175/// Successful receive operations will have to be followed up by `channel::read()` and successful
176/// send operations by `channel::write()`.
177fn run_select(
178 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179 timeout: Timeout,
180 is_biased: bool,
181) -> Option<(Token, usize, *const u8)> {
182 if handles.is_empty() {
183 // Wait until the timeout and return.
184 match timeout {
185 Timeout::Now => return None,
186 Timeout::Never => {
187 utils::sleep_until(None);
188 unreachable!();
189 }
190 Timeout::At(when) => {
191 utils::sleep_until(Some(when));
192 return None;
193 }
194 }
195 }
196
197 if !is_biased {
198 // Shuffle the operations for fairness.
199 utils::shuffle(handles);
200 }
201
202 // Create a token, which serves as a temporary variable that gets initialized in this function
203 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
204 // selected operation.
205 let mut token = Token::default();
206
207 // Try selecting one of the operations without blocking.
208 for &(handle, i, ptr) in handles.iter() {
209 if handle.try_select(&mut token) {
210 return Some((token, i, ptr));
211 }
212 }
213
214 loop {
215 // Prepare for blocking.
216 let res = Context::with(|cx| {
217 let mut sel = Selected::Waiting;
218 let mut registered_count = 0;
219 let mut index_ready = None;
220
221 if let Timeout::Now = timeout {
222 cx.try_select(Selected::Aborted).unwrap();
223 }
224
225 // Register all operations.
226 for (handle, i, _) in handles.iter_mut() {
227 registered_count += 1;
228
229 // If registration returns `false`, that means the operation has just become ready.
230 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
231 // Try aborting select.
232 sel = match cx.try_select(Selected::Aborted) {
233 Ok(()) => {
234 index_ready = Some(*i);
235 Selected::Aborted
236 }
237 Err(s) => s,
238 };
239 break;
240 }
241
242 // If another thread has already selected one of the operations, stop registration.
243 sel = cx.selected();
244 if sel != Selected::Waiting {
245 break;
246 }
247 }
248
249 if sel == Selected::Waiting {
250 // Check with each operation for how long we're allowed to block, and compute the
251 // earliest deadline.
252 let mut deadline: Option<Instant> = match timeout {
253 Timeout::Now => return None,
254 Timeout::Never => None,
255 Timeout::At(when) => Some(when),
256 };
257 for &(handle, _, _) in handles.iter() {
258 if let Some(x) = handle.deadline() {
259 deadline = deadline.map(|y| x.min(y)).or(Some(x));
260 }
261 }
262
263 // Block the current thread.
264 sel = cx.wait_until(deadline);
265 }
266
267 // Unregister all registered operations.
268 for (handle, _, _) in handles.iter_mut().take(registered_count) {
269 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
270 }
271
272 match sel {
273 Selected::Waiting => unreachable!(),
274 Selected::Aborted => {
275 // If an operation became ready during registration, try selecting it.
276 if let Some(index_ready) = index_ready {
277 for &(handle, i, ptr) in handles.iter() {
278 if i == index_ready && handle.try_select(&mut token) {
279 return Some((i, ptr));
280 }
281 }
282 }
283 }
284 Selected::Disconnected => {}
285 Selected::Operation(_) => {
286 // Find the selected operation.
287 for (handle, i, ptr) in handles.iter_mut() {
288 // Is this the selected operation?
289 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
290 {
291 // Try selecting this operation.
292 if handle.accept(&mut token, cx) {
293 return Some((*i, *ptr));
294 }
295 }
296 }
297 }
298 }
299
300 None
301 });
302
303 // Return if an operation was selected.
304 if let Some((i, ptr)) = res {
305 return Some((token, i, ptr));
306 }
307
308 // Try selecting one of the operations without blocking.
309 for &(handle, i, ptr) in handles.iter() {
310 if handle.try_select(&mut token) {
311 return Some((token, i, ptr));
312 }
313 }
314
315 match timeout {
316 Timeout::Now => return None,
317 Timeout::Never => {}
318 Timeout::At(when) => {
319 if Instant::now() >= when {
320 return None;
321 }
322 }
323 }
324 }
325}
326
327/// Runs until one of the operations becomes ready, potentially blocking the current thread.
328fn run_ready(
329 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
330 timeout: Timeout,
331 is_biased: bool,
332) -> Option<usize> {
333 if handles.is_empty() {
334 // Wait until the timeout and return.
335 match timeout {
336 Timeout::Now => return None,
337 Timeout::Never => {
338 utils::sleep_until(None);
339 unreachable!();
340 }
341 Timeout::At(when) => {
342 utils::sleep_until(Some(when));
343 return None;
344 }
345 }
346 }
347
348 if !is_biased {
349 // Shuffle the operations for fairness.
350 utils::shuffle(handles);
351 }
352
353 loop {
354 let backoff = Backoff::new();
355 loop {
356 // Check operations for readiness.
357 for &(handle, i, _) in handles.iter() {
358 if handle.is_ready() {
359 return Some(i);
360 }
361 }
362
363 if backoff.is_completed() {
364 break;
365 } else {
366 backoff.snooze();
367 }
368 }
369
370 // Check for timeout.
371 match timeout {
372 Timeout::Now => return None,
373 Timeout::Never => {}
374 Timeout::At(when) => {
375 if Instant::now() >= when {
376 return None;
377 }
378 }
379 }
380
381 // Prepare for blocking.
382 let res = Context::with(|cx| {
383 let mut sel = Selected::Waiting;
384 let mut registered_count = 0;
385
386 // Begin watching all operations.
387 for (handle, _, _) in handles.iter_mut() {
388 registered_count += 1;
389 let oper = Operation::hook::<&dyn SelectHandle>(handle);
390
391 // If registration returns `false`, that means the operation has just become ready.
392 if handle.watch(oper, cx) {
393 sel = match cx.try_select(Selected::Operation(oper)) {
394 Ok(()) => Selected::Operation(oper),
395 Err(s) => s,
396 };
397 break;
398 }
399
400 // If another thread has already chosen one of the operations, stop registration.
401 sel = cx.selected();
402 if sel != Selected::Waiting {
403 break;
404 }
405 }
406
407 if sel == Selected::Waiting {
408 // Check with each operation for how long we're allowed to block, and compute the
409 // earliest deadline.
410 let mut deadline: Option<Instant> = match timeout {
411 Timeout::Now => unreachable!(),
412 Timeout::Never => None,
413 Timeout::At(when) => Some(when),
414 };
415 for &(handle, _, _) in handles.iter() {
416 if let Some(x) = handle.deadline() {
417 deadline = deadline.map(|y| x.min(y)).or(Some(x));
418 }
419 }
420
421 // Block the current thread.
422 sel = cx.wait_until(deadline);
423 }
424
425 // Unwatch all operations.
426 for (handle, _, _) in handles.iter_mut().take(registered_count) {
427 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
428 }
429
430 match sel {
431 Selected::Waiting => unreachable!(),
432 Selected::Aborted => {}
433 Selected::Disconnected => {}
434 Selected::Operation(_) => {
435 for (handle, i, _) in handles.iter_mut() {
436 let oper = Operation::hook::<&dyn SelectHandle>(handle);
437 if sel == Selected::Operation(oper) {
438 return Some(*i);
439 }
440 }
441 }
442 }
443
444 None
445 });
446
447 // Return if an operation became ready.
448 if res.is_some() {
449 return res;
450 }
451 }
452}
453
454/// Attempts to select one of the operations without blocking.
455// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
456#[inline]
457pub fn try_select<'a>(
458 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459 is_biased: bool,
460) -> Result<SelectedOperation<'a>, TrySelectError> {
461 match run_select(handles, Timeout::Now, is_biased) {
462 None => Err(TrySelectError),
463 Some((token, index, ptr)) => Ok(SelectedOperation {
464 token,
465 index,
466 ptr,
467 _marker: PhantomData,
468 }),
469 }
470}
471
472/// Blocks until one of the operations becomes ready and selects it.
473// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
474#[inline]
475pub fn select<'a>(
476 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477 is_biased: bool,
478) -> SelectedOperation<'a> {
479 if handles.is_empty() {
480 panic!("no operations have been added to `Select`");
481 }
482
483 let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
484 SelectedOperation {
485 token,
486 index,
487 ptr,
488 _marker: PhantomData,
489 }
490}
491
492/// Blocks for a limited time until one of the operations becomes ready and selects it.
493// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
494#[inline]
495pub fn select_timeout<'a>(
496 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497 timeout: Duration,
498 is_biased: bool,
499) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
500 match Instant::now().checked_add(timeout) {
501 Some(deadline) => select_deadline(handles, deadline, is_biased),
502 None => Ok(select(handles, is_biased)),
503 }
504}
505
506/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
507#[inline]
508pub(crate) fn select_deadline<'a>(
509 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
510 deadline: Instant,
511 is_biased: bool,
512) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
513 match run_select(handles, Timeout::At(deadline), is_biased) {
514 None => Err(SelectTimeoutError),
515 Some((token, index, ptr)) => Ok(SelectedOperation {
516 token,
517 index,
518 ptr,
519 _marker: PhantomData,
520 }),
521 }
522}
523
524/// Selects from a set of channel operations.
525///
526/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
527/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
528/// among them is selected.
529///
530/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
531/// when it will simply return an error because the channel is disconnected.
532///
533/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
534/// dynamically created list of channel operations.
535///
536/// [`select!`]: crate::select!
537///
538/// Once a list of operations has been built with `Select`, there are two different ways of
539/// proceeding:
540///
541/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
542/// the returned selected operation has already begun and **must** be completed. If we don't
543/// complete it, a panic will occur.
544///
545/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
546/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
547/// possible for another thread to make the operation not ready just before we try executing it,
548/// so it's wise to use a retry loop. However, note that these methods might return with success
549/// spuriously, so it's a good idea to always double check if the operation is really ready.
550///
551/// # Examples
552///
553/// Use [`select`] to receive a message from a list of receivers:
554///
555/// ```
556/// use crossbeam_channel::{Receiver, RecvError, Select};
557///
558/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
559/// // Build a list of operations.
560/// let mut sel = Select::new();
561/// for r in rs {
562/// sel.recv(r);
563/// }
564///
565/// // Complete the selected operation.
566/// let oper = sel.select();
567/// let index = oper.index();
568/// oper.recv(&rs[index])
569/// }
570/// ```
571///
572/// Use [`ready`] to receive a message from a list of receivers:
573///
574/// ```
575/// use crossbeam_channel::{Receiver, RecvError, Select};
576///
577/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
578/// // Build a list of operations.
579/// let mut sel = Select::new();
580/// for r in rs {
581/// sel.recv(r);
582/// }
583///
584/// loop {
585/// // Wait until a receive operation becomes ready and try executing it.
586/// let index = sel.ready();
587/// let res = rs[index].try_recv();
588///
589/// // If the operation turns out not to be ready, retry.
590/// if let Err(e) = res {
591/// if e.is_empty() {
592/// continue;
593/// }
594/// }
595///
596/// // Success!
597/// return res.map_err(|_| RecvError);
598/// }
599/// }
600/// ```
601///
602/// [`try_select`]: Select::try_select
603/// [`select`]: Select::select
604/// [`select_timeout`]: Select::select_timeout
605/// [`try_ready`]: Select::try_ready
606/// [`ready`]: Select::ready
607/// [`ready_timeout`]: Select::ready_timeout
608pub struct Select<'a> {
609 /// A list of senders and receivers participating in selection.
610 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
611
612 /// The next index to assign to an operation.
613 next_index: usize,
614}
615
616unsafe impl Send for Select<'_> {}
617unsafe impl Sync for Select<'_> {}
618
619impl<'a> Select<'a> {
620 /// Creates an empty list of channel operations for selection.
621 ///
622 /// # Examples
623 ///
624 /// ```
625 /// use crossbeam_channel::Select;
626 ///
627 /// let mut sel = Select::new();
628 ///
629 /// // The list of operations is empty, which means no operation can be selected.
630 /// assert!(sel.try_select().is_err());
631 /// ```
632 pub fn new() -> Select<'a> {
633 Select {
634 handles: Vec::with_capacity(4),
635 next_index: 0,
636 }
637 }
638
639 /// Adds a send operation.
640 ///
641 /// Returns the index of the added operation.
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use crossbeam_channel::{unbounded, Select};
647 ///
648 /// let (s, r) = unbounded::<i32>();
649 ///
650 /// let mut sel = Select::new();
651 /// let index = sel.send(&s);
652 /// ```
653 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
654 let i = self.next_index;
655 let ptr = s as *const Sender<_> as *const u8;
656 self.handles.push((s, i, ptr));
657 self.next_index += 1;
658 i
659 }
660
661 /// Adds a receive operation.
662 ///
663 /// Returns the index of the added operation.
664 ///
665 /// # Examples
666 ///
667 /// ```
668 /// use crossbeam_channel::{unbounded, Select};
669 ///
670 /// let (s, r) = unbounded::<i32>();
671 ///
672 /// let mut sel = Select::new();
673 /// let index = sel.recv(&r);
674 /// ```
675 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
676 let i = self.next_index;
677 let ptr = r as *const Receiver<_> as *const u8;
678 self.handles.push((r, i, ptr));
679 self.next_index += 1;
680 i
681 }
682
683 /// Removes a previously added operation.
684 ///
685 /// This is useful when an operation is selected because the channel got disconnected and we
686 /// want to try again to select a different operation instead.
687 ///
688 /// If new operations are added after removing some, the indices of removed operations will not
689 /// be reused.
690 ///
691 /// # Panics
692 ///
693 /// An attempt to remove a non-existing or already removed operation will panic.
694 ///
695 /// # Examples
696 ///
697 /// ```
698 /// use crossbeam_channel::{unbounded, Select};
699 ///
700 /// let (s1, r1) = unbounded::<i32>();
701 /// let (_, r2) = unbounded::<i32>();
702 ///
703 /// let mut sel = Select::new();
704 /// let oper1 = sel.recv(&r1);
705 /// let oper2 = sel.recv(&r2);
706 ///
707 /// // Both operations are initially ready, so a random one will be executed.
708 /// let oper = sel.select();
709 /// assert_eq!(oper.index(), oper2);
710 /// assert!(oper.recv(&r2).is_err());
711 /// sel.remove(oper2);
712 ///
713 /// s1.send(10).unwrap();
714 ///
715 /// let oper = sel.select();
716 /// assert_eq!(oper.index(), oper1);
717 /// assert_eq!(oper.recv(&r1), Ok(10));
718 /// ```
719 pub fn remove(&mut self, index: usize) {
720 assert!(
721 index < self.next_index,
722 "index out of bounds; {} >= {}",
723 index,
724 self.next_index,
725 );
726
727 let i = self
728 .handles
729 .iter()
730 .enumerate()
731 .find(|(_, (_, i, _))| *i == index)
732 .expect("no operation with this index")
733 .0;
734
735 self.handles.swap_remove(i);
736 }
737
738 /// Attempts to select one of the operations without blocking.
739 ///
740 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
741 /// the same time, a random one among them is selected. If none of the operations are ready, an
742 /// error is returned.
743 ///
744 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
745 /// even when it will simply return an error because the channel is disconnected.
746 ///
747 /// The selected operation must be completed with [`SelectedOperation::send`]
748 /// or [`SelectedOperation::recv`].
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// use crossbeam_channel::{unbounded, Select};
754 ///
755 /// let (s1, r1) = unbounded();
756 /// let (s2, r2) = unbounded();
757 ///
758 /// s1.send(10).unwrap();
759 /// s2.send(20).unwrap();
760 ///
761 /// let mut sel = Select::new();
762 /// let oper1 = sel.recv(&r1);
763 /// let oper2 = sel.recv(&r2);
764 ///
765 /// // Both operations are initially ready, so a random one will be executed.
766 /// let oper = sel.try_select();
767 /// match oper {
768 /// Err(_) => panic!("both operations should be ready"),
769 /// Ok(oper) => match oper.index() {
770 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
771 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
772 /// _ => unreachable!(),
773 /// }
774 /// }
775 /// ```
776 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
777 try_select(&mut self.handles, false)
778 }
779
780 /// Blocks until one of the operations becomes ready and selects it.
781 ///
782 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
783 /// ready at the same time, a random one among them is selected.
784 ///
785 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
786 /// even when it will simply return an error because the channel is disconnected.
787 ///
788 /// The selected operation must be completed with [`SelectedOperation::send`]
789 /// or [`SelectedOperation::recv`].
790 ///
791 /// # Panics
792 ///
793 /// Panics if no operations have been added to `Select`.
794 ///
795 /// # Examples
796 ///
797 /// ```
798 /// use std::thread;
799 /// use std::time::Duration;
800 /// use crossbeam_channel::{unbounded, Select};
801 ///
802 /// let (s1, r1) = unbounded();
803 /// let (s2, r2) = unbounded();
804 ///
805 /// thread::spawn(move || {
806 /// thread::sleep(Duration::from_secs(1));
807 /// s1.send(10).unwrap();
808 /// });
809 /// thread::spawn(move || s2.send(20).unwrap());
810 ///
811 /// let mut sel = Select::new();
812 /// let oper1 = sel.recv(&r1);
813 /// let oper2 = sel.recv(&r2);
814 ///
815 /// // The second operation will be selected because it becomes ready first.
816 /// let oper = sel.select();
817 /// match oper.index() {
818 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
819 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
820 /// _ => unreachable!(),
821 /// }
822 /// ```
823 pub fn select(&mut self) -> SelectedOperation<'a> {
824 select(&mut self.handles, false)
825 }
826
827 /// Blocks for a limited time until one of the operations becomes ready and selects it.
828 ///
829 /// If an operation becomes ready, it is selected and returned. If multiple operations are
830 /// ready at the same time, a random one among them is selected. If none of the operations
831 /// become ready for the specified duration, an error is returned.
832 ///
833 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
834 /// even when it will simply return an error because the channel is disconnected.
835 ///
836 /// The selected operation must be completed with [`SelectedOperation::send`]
837 /// or [`SelectedOperation::recv`].
838 ///
839 /// # Examples
840 ///
841 /// ```
842 /// use std::thread;
843 /// use std::time::Duration;
844 /// use crossbeam_channel::{unbounded, Select};
845 ///
846 /// let (s1, r1) = unbounded();
847 /// let (s2, r2) = unbounded();
848 ///
849 /// thread::spawn(move || {
850 /// thread::sleep(Duration::from_secs(1));
851 /// s1.send(10).unwrap();
852 /// });
853 /// thread::spawn(move || s2.send(20).unwrap());
854 ///
855 /// let mut sel = Select::new();
856 /// let oper1 = sel.recv(&r1);
857 /// let oper2 = sel.recv(&r2);
858 ///
859 /// // The second operation will be selected because it becomes ready first.
860 /// let oper = sel.select_timeout(Duration::from_millis(500));
861 /// match oper {
862 /// Err(_) => panic!("should not have timed out"),
863 /// Ok(oper) => match oper.index() {
864 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
865 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
866 /// _ => unreachable!(),
867 /// }
868 /// }
869 /// ```
870 pub fn select_timeout(
871 &mut self,
872 timeout: Duration,
873 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
874 select_timeout(&mut self.handles, timeout, false)
875 }
876
877 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
878 ///
879 /// If an operation becomes ready, it is selected and returned. If multiple operations are
880 /// ready at the same time, a random one among them is selected. If none of the operations
881 /// become ready before the given deadline, an error is returned.
882 ///
883 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
884 /// even when it will simply return an error because the channel is disconnected.
885 ///
886 /// The selected operation must be completed with [`SelectedOperation::send`]
887 /// or [`SelectedOperation::recv`].
888 ///
889 /// # Examples
890 ///
891 /// ```
892 /// use std::thread;
893 /// use std::time::{Instant, Duration};
894 /// use crossbeam_channel::{unbounded, Select};
895 ///
896 /// let (s1, r1) = unbounded();
897 /// let (s2, r2) = unbounded();
898 ///
899 /// thread::spawn(move || {
900 /// thread::sleep(Duration::from_secs(1));
901 /// s1.send(10).unwrap();
902 /// });
903 /// thread::spawn(move || s2.send(20).unwrap());
904 ///
905 /// let mut sel = Select::new();
906 /// let oper1 = sel.recv(&r1);
907 /// let oper2 = sel.recv(&r2);
908 ///
909 /// let deadline = Instant::now() + Duration::from_millis(500);
910 ///
911 /// // The second operation will be selected because it becomes ready first.
912 /// let oper = sel.select_deadline(deadline);
913 /// match oper {
914 /// Err(_) => panic!("should not have timed out"),
915 /// Ok(oper) => match oper.index() {
916 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
917 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
918 /// _ => unreachable!(),
919 /// }
920 /// }
921 /// ```
922 pub fn select_deadline(
923 &mut self,
924 deadline: Instant,
925 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
926 select_deadline(&mut self.handles, deadline, false)
927 }
928
929 /// Attempts to find a ready operation without blocking.
930 ///
931 /// If an operation is ready, its index is returned. If multiple operations are ready at the
932 /// same time, a random one among them is chosen. If none of the operations are ready, an error
933 /// is returned.
934 ///
935 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
936 /// even when it will simply return an error because the channel is disconnected.
937 ///
938 /// Note that this method might return with success spuriously, so it's a good idea to always
939 /// double check if the operation is really ready.
940 ///
941 /// # Examples
942 ///
943 /// ```
944 /// use crossbeam_channel::{unbounded, Select};
945 ///
946 /// let (s1, r1) = unbounded();
947 /// let (s2, r2) = unbounded();
948 ///
949 /// s1.send(10).unwrap();
950 /// s2.send(20).unwrap();
951 ///
952 /// let mut sel = Select::new();
953 /// let oper1 = sel.recv(&r1);
954 /// let oper2 = sel.recv(&r2);
955 ///
956 /// // Both operations are initially ready, so a random one will be chosen.
957 /// match sel.try_ready() {
958 /// Err(_) => panic!("both operations should be ready"),
959 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
960 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
961 /// Ok(_) => unreachable!(),
962 /// }
963 /// ```
964 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
965 match run_ready(&mut self.handles, Timeout::Now, false) {
966 None => Err(TryReadyError),
967 Some(index) => Ok(index),
968 }
969 }
970
971 /// Blocks until one of the operations becomes ready.
972 ///
973 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
974 /// the same time, a random one among them is chosen.
975 ///
976 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
977 /// even when it will simply return an error because the channel is disconnected.
978 ///
979 /// Note that this method might return with success spuriously, so it's a good idea to always
980 /// double check if the operation is really ready.
981 ///
982 /// # Panics
983 ///
984 /// Panics if no operations have been added to `Select`.
985 ///
986 /// # Examples
987 ///
988 /// ```
989 /// use std::thread;
990 /// use std::time::Duration;
991 /// use crossbeam_channel::{unbounded, Select};
992 ///
993 /// let (s1, r1) = unbounded();
994 /// let (s2, r2) = unbounded();
995 ///
996 /// thread::spawn(move || {
997 /// thread::sleep(Duration::from_secs(1));
998 /// s1.send(10).unwrap();
999 /// });
1000 /// thread::spawn(move || s2.send(20).unwrap());
1001 ///
1002 /// let mut sel = Select::new();
1003 /// let oper1 = sel.recv(&r1);
1004 /// let oper2 = sel.recv(&r2);
1005 ///
1006 /// // The second operation will be selected because it becomes ready first.
1007 /// match sel.ready() {
1008 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1009 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1010 /// _ => unreachable!(),
1011 /// }
1012 /// ```
1013 pub fn ready(&mut self) -> usize {
1014 if self.handles.is_empty() {
1015 panic!("no operations have been added to `Select`");
1016 }
1017
1018 run_ready(&mut self.handles, Timeout::Never, false).unwrap()
1019 }
1020
1021 /// Blocks for a limited time until one of the operations becomes ready.
1022 ///
1023 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1024 /// the same time, a random one among them is chosen. If none of the operations become ready
1025 /// for the specified duration, an error is returned.
1026 ///
1027 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1028 /// even when it will simply return an error because the channel is disconnected.
1029 ///
1030 /// Note that this method might return with success spuriously, so it's a good idea to double
1031 /// check if the operation is really ready.
1032 ///
1033 /// # Examples
1034 ///
1035 /// ```
1036 /// use std::thread;
1037 /// use std::time::Duration;
1038 /// use crossbeam_channel::{unbounded, Select};
1039 ///
1040 /// let (s1, r1) = unbounded();
1041 /// let (s2, r2) = unbounded();
1042 ///
1043 /// thread::spawn(move || {
1044 /// thread::sleep(Duration::from_secs(1));
1045 /// s1.send(10).unwrap();
1046 /// });
1047 /// thread::spawn(move || s2.send(20).unwrap());
1048 ///
1049 /// let mut sel = Select::new();
1050 /// let oper1 = sel.recv(&r1);
1051 /// let oper2 = sel.recv(&r2);
1052 ///
1053 /// // The second operation will be selected because it becomes ready first.
1054 /// match sel.ready_timeout(Duration::from_millis(500)) {
1055 /// Err(_) => panic!("should not have timed out"),
1056 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1057 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1058 /// Ok(_) => unreachable!(),
1059 /// }
1060 /// ```
1061 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1062 match Instant::now().checked_add(timeout) {
1063 Some(deadline) => self.ready_deadline(deadline),
1064 None => Ok(self.ready()),
1065 }
1066 }
1067
1068 /// Blocks until a given deadline, or until one of the operations becomes ready.
1069 ///
1070 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1071 /// the same time, a random one among them is chosen. If none of the operations become ready
1072 /// before the deadline, an error is returned.
1073 ///
1074 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1075 /// even when it will simply return an error because the channel is disconnected.
1076 ///
1077 /// Note that this method might return with success spuriously, so it's a good idea to double
1078 /// check if the operation is really ready.
1079 ///
1080 /// # Examples
1081 ///
1082 /// ```
1083 /// use std::thread;
1084 /// use std::time::{Duration, Instant};
1085 /// use crossbeam_channel::{unbounded, Select};
1086 ///
1087 /// let deadline = Instant::now() + Duration::from_millis(500);
1088 ///
1089 /// let (s1, r1) = unbounded();
1090 /// let (s2, r2) = unbounded();
1091 ///
1092 /// thread::spawn(move || {
1093 /// thread::sleep(Duration::from_secs(1));
1094 /// s1.send(10).unwrap();
1095 /// });
1096 /// thread::spawn(move || s2.send(20).unwrap());
1097 ///
1098 /// let mut sel = Select::new();
1099 /// let oper1 = sel.recv(&r1);
1100 /// let oper2 = sel.recv(&r2);
1101 ///
1102 /// // The second operation will be selected because it becomes ready first.
1103 /// match sel.ready_deadline(deadline) {
1104 /// Err(_) => panic!("should not have timed out"),
1105 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1106 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1107 /// Ok(_) => unreachable!(),
1108 /// }
1109 /// ```
1110 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1111 match run_ready(&mut self.handles, Timeout::At(deadline), false) {
1112 None => Err(ReadyTimeoutError),
1113 Some(index) => Ok(index),
1114 }
1115 }
1116}
1117
1118impl<'a> Clone for Select<'a> {
1119 fn clone(&self) -> Select<'a> {
1120 Select {
1121 handles: self.handles.clone(),
1122 next_index: self.next_index,
1123 }
1124 }
1125}
1126
1127impl<'a> Default for Select<'a> {
1128 fn default() -> Select<'a> {
1129 Select::new()
1130 }
1131}
1132
1133impl fmt::Debug for Select<'_> {
1134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135 f.pad("Select { .. }")
1136 }
1137}
1138
1139/// A selected operation that needs to be completed.
1140///
1141/// To complete the operation, call [`send`] or [`recv`].
1142///
1143/// # Panics
1144///
1145/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1146/// `SelectedOperation` is dropped without completion, a panic occurs.
1147///
1148/// [`send`]: SelectedOperation::send
1149/// [`recv`]: SelectedOperation::recv
1150#[must_use]
1151pub struct SelectedOperation<'a> {
1152 /// Token needed to complete the operation.
1153 token: Token,
1154
1155 /// The index of the selected operation.
1156 index: usize,
1157
1158 /// The address of the selected `Sender` or `Receiver`.
1159 ptr: *const u8,
1160
1161 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1162 _marker: PhantomData<&'a ()>,
1163}
1164
1165impl SelectedOperation<'_> {
1166 /// Returns the index of the selected operation.
1167 ///
1168 /// # Examples
1169 ///
1170 /// ```
1171 /// use crossbeam_channel::{bounded, Select};
1172 ///
1173 /// let (s1, r1) = bounded::<()>(0);
1174 /// let (s2, r2) = bounded::<()>(0);
1175 /// let (s3, r3) = bounded::<()>(1);
1176 ///
1177 /// let mut sel = Select::new();
1178 /// let oper1 = sel.send(&s1);
1179 /// let oper2 = sel.recv(&r2);
1180 /// let oper3 = sel.send(&s3);
1181 ///
1182 /// // Only the last operation is ready.
1183 /// let oper = sel.select();
1184 /// assert_eq!(oper.index(), 2);
1185 /// assert_eq!(oper.index(), oper3);
1186 ///
1187 /// // Complete the operation.
1188 /// oper.send(&s3, ()).unwrap();
1189 /// ```
1190 pub fn index(&self) -> usize {
1191 self.index
1192 }
1193
1194 /// Completes the send operation.
1195 ///
1196 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1197 /// when the operation was added.
1198 ///
1199 /// # Panics
1200 ///
1201 /// Panics if an incorrect [`Sender`] reference is passed.
1202 ///
1203 /// # Examples
1204 ///
1205 /// ```
1206 /// use crossbeam_channel::{bounded, Select, SendError};
1207 ///
1208 /// let (s, r) = bounded::<i32>(0);
1209 /// drop(r);
1210 ///
1211 /// let mut sel = Select::new();
1212 /// let oper1 = sel.send(&s);
1213 ///
1214 /// let oper = sel.select();
1215 /// assert_eq!(oper.index(), oper1);
1216 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1217 /// ```
1218 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1219 assert!(
1220 s as *const Sender<T> as *const u8 == self.ptr,
1221 "passed a sender that wasn't selected",
1222 );
1223 let res = unsafe { channel::write(s, &mut self.token, msg) };
1224 mem::forget(self);
1225 res.map_err(SendError)
1226 }
1227
1228 /// Completes the receive operation.
1229 ///
1230 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1231 /// when the operation was added.
1232 ///
1233 /// # Panics
1234 ///
1235 /// Panics if an incorrect [`Receiver`] reference is passed.
1236 ///
1237 /// # Examples
1238 ///
1239 /// ```
1240 /// use crossbeam_channel::{bounded, Select, RecvError};
1241 ///
1242 /// let (s, r) = bounded::<i32>(0);
1243 /// drop(s);
1244 ///
1245 /// let mut sel = Select::new();
1246 /// let oper1 = sel.recv(&r);
1247 ///
1248 /// let oper = sel.select();
1249 /// assert_eq!(oper.index(), oper1);
1250 /// assert_eq!(oper.recv(&r), Err(RecvError));
1251 /// ```
1252 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1253 assert!(
1254 r as *const Receiver<T> as *const u8 == self.ptr,
1255 "passed a receiver that wasn't selected",
1256 );
1257 let res = unsafe { channel::read(r, &mut self.token) };
1258 mem::forget(self);
1259 res.map_err(|_| RecvError)
1260 }
1261}
1262
1263impl fmt::Debug for SelectedOperation<'_> {
1264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1265 f.pad("SelectedOperation { .. }")
1266 }
1267}
1268
1269impl Drop for SelectedOperation<'_> {
1270 fn drop(&mut self) {
1271 panic!("dropped `SelectedOperation` without completing the operation");
1272 }
1273}