crossbeam_channel/
select.rs

1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7use std::vec::Vec;
8
9use crossbeam_utils::Backoff;
10
11use crate::channel::{self, Receiver, Sender};
12use crate::context::Context;
13use crate::err::{ReadyTimeoutError, TryReadyError};
14use crate::err::{RecvError, SendError};
15use crate::err::{SelectTimeoutError, TrySelectError};
16use crate::flavors;
17use crate::utils;
18
19/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
20/// `read` or `write`.
21///
22/// Each field contains data associated with a specific channel flavor.
23// This is a private API that is used by the select macro.
24#[derive(Debug, Default)]
25pub struct Token {
26    pub(crate) at: flavors::at::AtToken,
27    pub(crate) array: flavors::array::ArrayToken,
28    pub(crate) list: flavors::list::ListToken,
29    #[allow(dead_code)]
30    pub(crate) never: flavors::never::NeverToken,
31    pub(crate) tick: flavors::tick::TickToken,
32    pub(crate) zero: flavors::zero::ZeroToken,
33}
34
35/// Identifier associated with an operation by a specific thread on a specific channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct Operation(usize);
38
39impl Operation {
40    /// Creates an operation identifier from a mutable reference.
41    ///
42    /// This function essentially just turns the address of the reference into a number. The
43    /// reference should point to a variable that is specific to the thread and the operation,
44    /// and is alive for the entire duration of select or blocking operation.
45    #[inline]
46    pub fn hook<T>(r: &mut T) -> Operation {
47        let val = r as *mut T as usize;
48        // Make sure that the pointer address doesn't equal the numerical representation of
49        // `Selected::{Waiting, Aborted, Disconnected}`.
50        assert!(val > 2);
51        Operation(val)
52    }
53}
54
55/// Current state of a select or a blocking operation.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum Selected {
58    /// Still waiting for an operation.
59    Waiting,
60
61    /// The attempt to block the current thread has been aborted.
62    Aborted,
63
64    /// An operation became ready because a channel is disconnected.
65    Disconnected,
66
67    /// An operation became ready because a message can be sent or received.
68    Operation(Operation),
69}
70
71impl From<usize> for Selected {
72    #[inline]
73    fn from(val: usize) -> Selected {
74        match val {
75            0 => Selected::Waiting,
76            1 => Selected::Aborted,
77            2 => Selected::Disconnected,
78            oper => Selected::Operation(Operation(oper)),
79        }
80    }
81}
82
83impl Into<usize> for Selected {
84    #[inline]
85    fn into(self) -> usize {
86        match self {
87            Selected::Waiting => 0,
88            Selected::Aborted => 1,
89            Selected::Disconnected => 2,
90            Selected::Operation(Operation(val)) => val,
91        }
92    }
93}
94
95/// A receiver or a sender that can participate in select.
96///
97/// This is a handle that assists select in executing an operation, registration, deciding on the
98/// appropriate deadline for blocking, etc.
99// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
100pub trait SelectHandle {
101    /// Attempts to select an operation and returns `true` on success.
102    fn try_select(&self, token: &mut Token) -> bool;
103
104    /// Returns a deadline for an operation, if there is one.
105    fn deadline(&self) -> Option<Instant>;
106
107    /// Registers an operation for execution and returns `true` if it is now ready.
108    fn register(&self, oper: Operation, cx: &Context) -> bool;
109
110    /// Unregisters an operation for execution.
111    fn unregister(&self, oper: Operation);
112
113    /// Attempts to select an operation the thread got woken up for and returns `true` on success.
114    fn accept(&self, token: &mut Token, cx: &Context) -> bool;
115
116    /// Returns `true` if an operation can be executed without blocking.
117    fn is_ready(&self) -> bool;
118
119    /// Registers an operation for readiness notification and returns `true` if it is now ready.
120    fn watch(&self, oper: Operation, cx: &Context) -> bool;
121
122    /// Unregisters an operation for readiness notification.
123    fn unwatch(&self, oper: Operation);
124}
125
126impl<T: SelectHandle> SelectHandle for &T {
127    fn try_select(&self, token: &mut Token) -> bool {
128        (**self).try_select(token)
129    }
130
131    fn deadline(&self) -> Option<Instant> {
132        (**self).deadline()
133    }
134
135    fn register(&self, oper: Operation, cx: &Context) -> bool {
136        (**self).register(oper, cx)
137    }
138
139    fn unregister(&self, oper: Operation) {
140        (**self).unregister(oper);
141    }
142
143    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
144        (**self).accept(token, cx)
145    }
146
147    fn is_ready(&self) -> bool {
148        (**self).is_ready()
149    }
150
151    fn watch(&self, oper: Operation, cx: &Context) -> bool {
152        (**self).watch(oper, cx)
153    }
154
155    fn unwatch(&self, oper: Operation) {
156        (**self).unwatch(oper)
157    }
158}
159
160/// Determines when a select operation should time out.
161#[derive(Clone, Copy, Eq, PartialEq)]
162enum Timeout {
163    /// No blocking.
164    Now,
165
166    /// Block forever.
167    Never,
168
169    /// Time out after the time instant.
170    At(Instant),
171}
172
173/// Runs until one of the operations is selected, potentially blocking the current thread.
174///
175/// Successful receive operations will have to be followed up by `channel::read()` and successful
176/// send operations by `channel::write()`.
177fn run_select(
178    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179    timeout: Timeout,
180    is_biased: bool,
181) -> Option<(Token, usize, *const u8)> {
182    if handles.is_empty() {
183        // Wait until the timeout and return.
184        match timeout {
185            Timeout::Now => return None,
186            Timeout::Never => {
187                utils::sleep_until(None);
188                unreachable!();
189            }
190            Timeout::At(when) => {
191                utils::sleep_until(Some(when));
192                return None;
193            }
194        }
195    }
196
197    if !is_biased {
198        // Shuffle the operations for fairness.
199        utils::shuffle(handles);
200    }
201
202    // Create a token, which serves as a temporary variable that gets initialized in this function
203    // and is later used by a call to `channel::read()` or `channel::write()` that completes the
204    // selected operation.
205    let mut token = Token::default();
206
207    // Try selecting one of the operations without blocking.
208    for &(handle, i, ptr) in handles.iter() {
209        if handle.try_select(&mut token) {
210            return Some((token, i, ptr));
211        }
212    }
213
214    loop {
215        // Prepare for blocking.
216        let res = Context::with(|cx| {
217            let mut sel = Selected::Waiting;
218            let mut registered_count = 0;
219            let mut index_ready = None;
220
221            if let Timeout::Now = timeout {
222                cx.try_select(Selected::Aborted).unwrap();
223            }
224
225            // Register all operations.
226            for (handle, i, _) in handles.iter_mut() {
227                registered_count += 1;
228
229                // If registration returns `false`, that means the operation has just become ready.
230                if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
231                    // Try aborting select.
232                    sel = match cx.try_select(Selected::Aborted) {
233                        Ok(()) => {
234                            index_ready = Some(*i);
235                            Selected::Aborted
236                        }
237                        Err(s) => s,
238                    };
239                    break;
240                }
241
242                // If another thread has already selected one of the operations, stop registration.
243                sel = cx.selected();
244                if sel != Selected::Waiting {
245                    break;
246                }
247            }
248
249            if sel == Selected::Waiting {
250                // Check with each operation for how long we're allowed to block, and compute the
251                // earliest deadline.
252                let mut deadline: Option<Instant> = match timeout {
253                    Timeout::Now => return None,
254                    Timeout::Never => None,
255                    Timeout::At(when) => Some(when),
256                };
257                for &(handle, _, _) in handles.iter() {
258                    if let Some(x) = handle.deadline() {
259                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
260                    }
261                }
262
263                // Block the current thread.
264                sel = cx.wait_until(deadline);
265            }
266
267            // Unregister all registered operations.
268            for (handle, _, _) in handles.iter_mut().take(registered_count) {
269                handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
270            }
271
272            match sel {
273                Selected::Waiting => unreachable!(),
274                Selected::Aborted => {
275                    // If an operation became ready during registration, try selecting it.
276                    if let Some(index_ready) = index_ready {
277                        for &(handle, i, ptr) in handles.iter() {
278                            if i == index_ready && handle.try_select(&mut token) {
279                                return Some((i, ptr));
280                            }
281                        }
282                    }
283                }
284                Selected::Disconnected => {}
285                Selected::Operation(_) => {
286                    // Find the selected operation.
287                    for (handle, i, ptr) in handles.iter_mut() {
288                        // Is this the selected operation?
289                        if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
290                        {
291                            // Try selecting this operation.
292                            if handle.accept(&mut token, cx) {
293                                return Some((*i, *ptr));
294                            }
295                        }
296                    }
297                }
298            }
299
300            None
301        });
302
303        // Return if an operation was selected.
304        if let Some((i, ptr)) = res {
305            return Some((token, i, ptr));
306        }
307
308        // Try selecting one of the operations without blocking.
309        for &(handle, i, ptr) in handles.iter() {
310            if handle.try_select(&mut token) {
311                return Some((token, i, ptr));
312            }
313        }
314
315        match timeout {
316            Timeout::Now => return None,
317            Timeout::Never => {}
318            Timeout::At(when) => {
319                if Instant::now() >= when {
320                    return None;
321                }
322            }
323        }
324    }
325}
326
327/// Runs until one of the operations becomes ready, potentially blocking the current thread.
328fn run_ready(
329    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
330    timeout: Timeout,
331    is_biased: bool,
332) -> Option<usize> {
333    if handles.is_empty() {
334        // Wait until the timeout and return.
335        match timeout {
336            Timeout::Now => return None,
337            Timeout::Never => {
338                utils::sleep_until(None);
339                unreachable!();
340            }
341            Timeout::At(when) => {
342                utils::sleep_until(Some(when));
343                return None;
344            }
345        }
346    }
347
348    if !is_biased {
349        // Shuffle the operations for fairness.
350        utils::shuffle(handles);
351    }
352
353    loop {
354        let backoff = Backoff::new();
355        loop {
356            // Check operations for readiness.
357            for &(handle, i, _) in handles.iter() {
358                if handle.is_ready() {
359                    return Some(i);
360                }
361            }
362
363            if backoff.is_completed() {
364                break;
365            } else {
366                backoff.snooze();
367            }
368        }
369
370        // Check for timeout.
371        match timeout {
372            Timeout::Now => return None,
373            Timeout::Never => {}
374            Timeout::At(when) => {
375                if Instant::now() >= when {
376                    return None;
377                }
378            }
379        }
380
381        // Prepare for blocking.
382        let res = Context::with(|cx| {
383            let mut sel = Selected::Waiting;
384            let mut registered_count = 0;
385
386            // Begin watching all operations.
387            for (handle, _, _) in handles.iter_mut() {
388                registered_count += 1;
389                let oper = Operation::hook::<&dyn SelectHandle>(handle);
390
391                // If registration returns `false`, that means the operation has just become ready.
392                if handle.watch(oper, cx) {
393                    sel = match cx.try_select(Selected::Operation(oper)) {
394                        Ok(()) => Selected::Operation(oper),
395                        Err(s) => s,
396                    };
397                    break;
398                }
399
400                // If another thread has already chosen one of the operations, stop registration.
401                sel = cx.selected();
402                if sel != Selected::Waiting {
403                    break;
404                }
405            }
406
407            if sel == Selected::Waiting {
408                // Check with each operation for how long we're allowed to block, and compute the
409                // earliest deadline.
410                let mut deadline: Option<Instant> = match timeout {
411                    Timeout::Now => unreachable!(),
412                    Timeout::Never => None,
413                    Timeout::At(when) => Some(when),
414                };
415                for &(handle, _, _) in handles.iter() {
416                    if let Some(x) = handle.deadline() {
417                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
418                    }
419                }
420
421                // Block the current thread.
422                sel = cx.wait_until(deadline);
423            }
424
425            // Unwatch all operations.
426            for (handle, _, _) in handles.iter_mut().take(registered_count) {
427                handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
428            }
429
430            match sel {
431                Selected::Waiting => unreachable!(),
432                Selected::Aborted => {}
433                Selected::Disconnected => {}
434                Selected::Operation(_) => {
435                    for (handle, i, _) in handles.iter_mut() {
436                        let oper = Operation::hook::<&dyn SelectHandle>(handle);
437                        if sel == Selected::Operation(oper) {
438                            return Some(*i);
439                        }
440                    }
441                }
442            }
443
444            None
445        });
446
447        // Return if an operation became ready.
448        if res.is_some() {
449            return res;
450        }
451    }
452}
453
454/// Attempts to select one of the operations without blocking.
455// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
456#[inline]
457pub fn try_select<'a>(
458    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459    is_biased: bool,
460) -> Result<SelectedOperation<'a>, TrySelectError> {
461    match run_select(handles, Timeout::Now, is_biased) {
462        None => Err(TrySelectError),
463        Some((token, index, ptr)) => Ok(SelectedOperation {
464            token,
465            index,
466            ptr,
467            _marker: PhantomData,
468        }),
469    }
470}
471
472/// Blocks until one of the operations becomes ready and selects it.
473// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
474#[inline]
475pub fn select<'a>(
476    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477    is_biased: bool,
478) -> SelectedOperation<'a> {
479    if handles.is_empty() {
480        panic!("no operations have been added to `Select`");
481    }
482
483    let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
484    SelectedOperation {
485        token,
486        index,
487        ptr,
488        _marker: PhantomData,
489    }
490}
491
492/// Blocks for a limited time until one of the operations becomes ready and selects it.
493// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
494#[inline]
495pub fn select_timeout<'a>(
496    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497    timeout: Duration,
498    is_biased: bool,
499) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
500    match Instant::now().checked_add(timeout) {
501        Some(deadline) => select_deadline(handles, deadline, is_biased),
502        None => Ok(select(handles, is_biased)),
503    }
504}
505
506/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
507#[inline]
508pub(crate) fn select_deadline<'a>(
509    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
510    deadline: Instant,
511    is_biased: bool,
512) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
513    match run_select(handles, Timeout::At(deadline), is_biased) {
514        None => Err(SelectTimeoutError),
515        Some((token, index, ptr)) => Ok(SelectedOperation {
516            token,
517            index,
518            ptr,
519            _marker: PhantomData,
520        }),
521    }
522}
523
524/// Selects from a set of channel operations.
525///
526/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
527/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
528/// among them is selected.
529///
530/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
531/// when it will simply return an error because the channel is disconnected.
532///
533/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
534/// dynamically created list of channel operations.
535///
536/// [`select!`]: crate::select!
537///
538/// Once a list of operations has been built with `Select`, there are two different ways of
539/// proceeding:
540///
541/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
542///   the returned selected operation has already begun and **must** be completed. If we don't
543///   complete it, a panic will occur.
544///
545/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
546///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
547///   possible for another thread to make the operation not ready just before we try executing it,
548///   so it's wise to use a retry loop. However, note that these methods might return with success
549///   spuriously, so it's a good idea to always double check if the operation is really ready.
550///
551/// # Examples
552///
553/// Use [`select`] to receive a message from a list of receivers:
554///
555/// ```
556/// use crossbeam_channel::{Receiver, RecvError, Select};
557///
558/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
559///     // Build a list of operations.
560///     let mut sel = Select::new();
561///     for r in rs {
562///         sel.recv(r);
563///     }
564///
565///     // Complete the selected operation.
566///     let oper = sel.select();
567///     let index = oper.index();
568///     oper.recv(&rs[index])
569/// }
570/// ```
571///
572/// Use [`ready`] to receive a message from a list of receivers:
573///
574/// ```
575/// use crossbeam_channel::{Receiver, RecvError, Select};
576///
577/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
578///     // Build a list of operations.
579///     let mut sel = Select::new();
580///     for r in rs {
581///         sel.recv(r);
582///     }
583///
584///     loop {
585///         // Wait until a receive operation becomes ready and try executing it.
586///         let index = sel.ready();
587///         let res = rs[index].try_recv();
588///
589///         // If the operation turns out not to be ready, retry.
590///         if let Err(e) = res {
591///             if e.is_empty() {
592///                 continue;
593///             }
594///         }
595///
596///         // Success!
597///         return res.map_err(|_| RecvError);
598///     }
599/// }
600/// ```
601///
602/// [`try_select`]: Select::try_select
603/// [`select`]: Select::select
604/// [`select_timeout`]: Select::select_timeout
605/// [`try_ready`]: Select::try_ready
606/// [`ready`]: Select::ready
607/// [`ready_timeout`]: Select::ready_timeout
608pub struct Select<'a> {
609    /// A list of senders and receivers participating in selection.
610    handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
611
612    /// The next index to assign to an operation.
613    next_index: usize,
614}
615
616unsafe impl Send for Select<'_> {}
617unsafe impl Sync for Select<'_> {}
618
619impl<'a> Select<'a> {
620    /// Creates an empty list of channel operations for selection.
621    ///
622    /// # Examples
623    ///
624    /// ```
625    /// use crossbeam_channel::Select;
626    ///
627    /// let mut sel = Select::new();
628    ///
629    /// // The list of operations is empty, which means no operation can be selected.
630    /// assert!(sel.try_select().is_err());
631    /// ```
632    pub fn new() -> Select<'a> {
633        Select {
634            handles: Vec::with_capacity(4),
635            next_index: 0,
636        }
637    }
638
639    /// Adds a send operation.
640    ///
641    /// Returns the index of the added operation.
642    ///
643    /// # Examples
644    ///
645    /// ```
646    /// use crossbeam_channel::{unbounded, Select};
647    ///
648    /// let (s, r) = unbounded::<i32>();
649    ///
650    /// let mut sel = Select::new();
651    /// let index = sel.send(&s);
652    /// ```
653    pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
654        let i = self.next_index;
655        let ptr = s as *const Sender<_> as *const u8;
656        self.handles.push((s, i, ptr));
657        self.next_index += 1;
658        i
659    }
660
661    /// Adds a receive operation.
662    ///
663    /// Returns the index of the added operation.
664    ///
665    /// # Examples
666    ///
667    /// ```
668    /// use crossbeam_channel::{unbounded, Select};
669    ///
670    /// let (s, r) = unbounded::<i32>();
671    ///
672    /// let mut sel = Select::new();
673    /// let index = sel.recv(&r);
674    /// ```
675    pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
676        let i = self.next_index;
677        let ptr = r as *const Receiver<_> as *const u8;
678        self.handles.push((r, i, ptr));
679        self.next_index += 1;
680        i
681    }
682
683    /// Removes a previously added operation.
684    ///
685    /// This is useful when an operation is selected because the channel got disconnected and we
686    /// want to try again to select a different operation instead.
687    ///
688    /// If new operations are added after removing some, the indices of removed operations will not
689    /// be reused.
690    ///
691    /// # Panics
692    ///
693    /// An attempt to remove a non-existing or already removed operation will panic.
694    ///
695    /// # Examples
696    ///
697    /// ```
698    /// use crossbeam_channel::{unbounded, Select};
699    ///
700    /// let (s1, r1) = unbounded::<i32>();
701    /// let (_, r2) = unbounded::<i32>();
702    ///
703    /// let mut sel = Select::new();
704    /// let oper1 = sel.recv(&r1);
705    /// let oper2 = sel.recv(&r2);
706    ///
707    /// // Both operations are initially ready, so a random one will be executed.
708    /// let oper = sel.select();
709    /// assert_eq!(oper.index(), oper2);
710    /// assert!(oper.recv(&r2).is_err());
711    /// sel.remove(oper2);
712    ///
713    /// s1.send(10).unwrap();
714    ///
715    /// let oper = sel.select();
716    /// assert_eq!(oper.index(), oper1);
717    /// assert_eq!(oper.recv(&r1), Ok(10));
718    /// ```
719    pub fn remove(&mut self, index: usize) {
720        assert!(
721            index < self.next_index,
722            "index out of bounds; {} >= {}",
723            index,
724            self.next_index,
725        );
726
727        let i = self
728            .handles
729            .iter()
730            .enumerate()
731            .find(|(_, (_, i, _))| *i == index)
732            .expect("no operation with this index")
733            .0;
734
735        self.handles.swap_remove(i);
736    }
737
738    /// Attempts to select one of the operations without blocking.
739    ///
740    /// If an operation is ready, it is selected and returned. If multiple operations are ready at
741    /// the same time, a random one among them is selected. If none of the operations are ready, an
742    /// error is returned.
743    ///
744    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
745    /// even when it will simply return an error because the channel is disconnected.
746    ///
747    /// The selected operation must be completed with [`SelectedOperation::send`]
748    /// or [`SelectedOperation::recv`].
749    ///
750    /// # Examples
751    ///
752    /// ```
753    /// use crossbeam_channel::{unbounded, Select};
754    ///
755    /// let (s1, r1) = unbounded();
756    /// let (s2, r2) = unbounded();
757    ///
758    /// s1.send(10).unwrap();
759    /// s2.send(20).unwrap();
760    ///
761    /// let mut sel = Select::new();
762    /// let oper1 = sel.recv(&r1);
763    /// let oper2 = sel.recv(&r2);
764    ///
765    /// // Both operations are initially ready, so a random one will be executed.
766    /// let oper = sel.try_select();
767    /// match oper {
768    ///     Err(_) => panic!("both operations should be ready"),
769    ///     Ok(oper) => match oper.index() {
770    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
771    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
772    ///         _ => unreachable!(),
773    ///     }
774    /// }
775    /// ```
776    pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
777        try_select(&mut self.handles, false)
778    }
779
780    /// Blocks until one of the operations becomes ready and selects it.
781    ///
782    /// Once an operation becomes ready, it is selected and returned. If multiple operations are
783    /// ready at the same time, a random one among them is selected.
784    ///
785    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
786    /// even when it will simply return an error because the channel is disconnected.
787    ///
788    /// The selected operation must be completed with [`SelectedOperation::send`]
789    /// or [`SelectedOperation::recv`].
790    ///
791    /// # Panics
792    ///
793    /// Panics if no operations have been added to `Select`.
794    ///
795    /// # Examples
796    ///
797    /// ```
798    /// use std::thread;
799    /// use std::time::Duration;
800    /// use crossbeam_channel::{unbounded, Select};
801    ///
802    /// let (s1, r1) = unbounded();
803    /// let (s2, r2) = unbounded();
804    ///
805    /// thread::spawn(move || {
806    ///     thread::sleep(Duration::from_secs(1));
807    ///     s1.send(10).unwrap();
808    /// });
809    /// thread::spawn(move || s2.send(20).unwrap());
810    ///
811    /// let mut sel = Select::new();
812    /// let oper1 = sel.recv(&r1);
813    /// let oper2 = sel.recv(&r2);
814    ///
815    /// // The second operation will be selected because it becomes ready first.
816    /// let oper = sel.select();
817    /// match oper.index() {
818    ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
819    ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
820    ///     _ => unreachable!(),
821    /// }
822    /// ```
823    pub fn select(&mut self) -> SelectedOperation<'a> {
824        select(&mut self.handles, false)
825    }
826
827    /// Blocks for a limited time until one of the operations becomes ready and selects it.
828    ///
829    /// If an operation becomes ready, it is selected and returned. If multiple operations are
830    /// ready at the same time, a random one among them is selected. If none of the operations
831    /// become ready for the specified duration, an error is returned.
832    ///
833    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
834    /// even when it will simply return an error because the channel is disconnected.
835    ///
836    /// The selected operation must be completed with [`SelectedOperation::send`]
837    /// or [`SelectedOperation::recv`].
838    ///
839    /// # Examples
840    ///
841    /// ```
842    /// use std::thread;
843    /// use std::time::Duration;
844    /// use crossbeam_channel::{unbounded, Select};
845    ///
846    /// let (s1, r1) = unbounded();
847    /// let (s2, r2) = unbounded();
848    ///
849    /// thread::spawn(move || {
850    ///     thread::sleep(Duration::from_secs(1));
851    ///     s1.send(10).unwrap();
852    /// });
853    /// thread::spawn(move || s2.send(20).unwrap());
854    ///
855    /// let mut sel = Select::new();
856    /// let oper1 = sel.recv(&r1);
857    /// let oper2 = sel.recv(&r2);
858    ///
859    /// // The second operation will be selected because it becomes ready first.
860    /// let oper = sel.select_timeout(Duration::from_millis(500));
861    /// match oper {
862    ///     Err(_) => panic!("should not have timed out"),
863    ///     Ok(oper) => match oper.index() {
864    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
865    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
866    ///         _ => unreachable!(),
867    ///     }
868    /// }
869    /// ```
870    pub fn select_timeout(
871        &mut self,
872        timeout: Duration,
873    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
874        select_timeout(&mut self.handles, timeout, false)
875    }
876
877    /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
878    ///
879    /// If an operation becomes ready, it is selected and returned. If multiple operations are
880    /// ready at the same time, a random one among them is selected. If none of the operations
881    /// become ready before the given deadline, an error is returned.
882    ///
883    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
884    /// even when it will simply return an error because the channel is disconnected.
885    ///
886    /// The selected operation must be completed with [`SelectedOperation::send`]
887    /// or [`SelectedOperation::recv`].
888    ///
889    /// # Examples
890    ///
891    /// ```
892    /// use std::thread;
893    /// use std::time::{Instant, Duration};
894    /// use crossbeam_channel::{unbounded, Select};
895    ///
896    /// let (s1, r1) = unbounded();
897    /// let (s2, r2) = unbounded();
898    ///
899    /// thread::spawn(move || {
900    ///     thread::sleep(Duration::from_secs(1));
901    ///     s1.send(10).unwrap();
902    /// });
903    /// thread::spawn(move || s2.send(20).unwrap());
904    ///
905    /// let mut sel = Select::new();
906    /// let oper1 = sel.recv(&r1);
907    /// let oper2 = sel.recv(&r2);
908    ///
909    /// let deadline = Instant::now() + Duration::from_millis(500);
910    ///
911    /// // The second operation will be selected because it becomes ready first.
912    /// let oper = sel.select_deadline(deadline);
913    /// match oper {
914    ///     Err(_) => panic!("should not have timed out"),
915    ///     Ok(oper) => match oper.index() {
916    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
917    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
918    ///         _ => unreachable!(),
919    ///     }
920    /// }
921    /// ```
922    pub fn select_deadline(
923        &mut self,
924        deadline: Instant,
925    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
926        select_deadline(&mut self.handles, deadline, false)
927    }
928
929    /// Attempts to find a ready operation without blocking.
930    ///
931    /// If an operation is ready, its index is returned. If multiple operations are ready at the
932    /// same time, a random one among them is chosen. If none of the operations are ready, an error
933    /// is returned.
934    ///
935    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
936    /// even when it will simply return an error because the channel is disconnected.
937    ///
938    /// Note that this method might return with success spuriously, so it's a good idea to always
939    /// double check if the operation is really ready.
940    ///
941    /// # Examples
942    ///
943    /// ```
944    /// use crossbeam_channel::{unbounded, Select};
945    ///
946    /// let (s1, r1) = unbounded();
947    /// let (s2, r2) = unbounded();
948    ///
949    /// s1.send(10).unwrap();
950    /// s2.send(20).unwrap();
951    ///
952    /// let mut sel = Select::new();
953    /// let oper1 = sel.recv(&r1);
954    /// let oper2 = sel.recv(&r2);
955    ///
956    /// // Both operations are initially ready, so a random one will be chosen.
957    /// match sel.try_ready() {
958    ///     Err(_) => panic!("both operations should be ready"),
959    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
960    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
961    ///     Ok(_) => unreachable!(),
962    /// }
963    /// ```
964    pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
965        match run_ready(&mut self.handles, Timeout::Now, false) {
966            None => Err(TryReadyError),
967            Some(index) => Ok(index),
968        }
969    }
970
971    /// Blocks until one of the operations becomes ready.
972    ///
973    /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
974    /// the same time, a random one among them is chosen.
975    ///
976    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
977    /// even when it will simply return an error because the channel is disconnected.
978    ///
979    /// Note that this method might return with success spuriously, so it's a good idea to always
980    /// double check if the operation is really ready.
981    ///
982    /// # Panics
983    ///
984    /// Panics if no operations have been added to `Select`.
985    ///
986    /// # Examples
987    ///
988    /// ```
989    /// use std::thread;
990    /// use std::time::Duration;
991    /// use crossbeam_channel::{unbounded, Select};
992    ///
993    /// let (s1, r1) = unbounded();
994    /// let (s2, r2) = unbounded();
995    ///
996    /// thread::spawn(move || {
997    ///     thread::sleep(Duration::from_secs(1));
998    ///     s1.send(10).unwrap();
999    /// });
1000    /// thread::spawn(move || s2.send(20).unwrap());
1001    ///
1002    /// let mut sel = Select::new();
1003    /// let oper1 = sel.recv(&r1);
1004    /// let oper2 = sel.recv(&r2);
1005    ///
1006    /// // The second operation will be selected because it becomes ready first.
1007    /// match sel.ready() {
1008    ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1009    ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1010    ///     _ => unreachable!(),
1011    /// }
1012    /// ```
1013    pub fn ready(&mut self) -> usize {
1014        if self.handles.is_empty() {
1015            panic!("no operations have been added to `Select`");
1016        }
1017
1018        run_ready(&mut self.handles, Timeout::Never, false).unwrap()
1019    }
1020
1021    /// Blocks for a limited time until one of the operations becomes ready.
1022    ///
1023    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1024    /// the same time, a random one among them is chosen. If none of the operations become ready
1025    /// for the specified duration, an error is returned.
1026    ///
1027    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1028    /// even when it will simply return an error because the channel is disconnected.
1029    ///
1030    /// Note that this method might return with success spuriously, so it's a good idea to double
1031    /// check if the operation is really ready.
1032    ///
1033    /// # Examples
1034    ///
1035    /// ```
1036    /// use std::thread;
1037    /// use std::time::Duration;
1038    /// use crossbeam_channel::{unbounded, Select};
1039    ///
1040    /// let (s1, r1) = unbounded();
1041    /// let (s2, r2) = unbounded();
1042    ///
1043    /// thread::spawn(move || {
1044    ///     thread::sleep(Duration::from_secs(1));
1045    ///     s1.send(10).unwrap();
1046    /// });
1047    /// thread::spawn(move || s2.send(20).unwrap());
1048    ///
1049    /// let mut sel = Select::new();
1050    /// let oper1 = sel.recv(&r1);
1051    /// let oper2 = sel.recv(&r2);
1052    ///
1053    /// // The second operation will be selected because it becomes ready first.
1054    /// match sel.ready_timeout(Duration::from_millis(500)) {
1055    ///     Err(_) => panic!("should not have timed out"),
1056    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1057    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1058    ///     Ok(_) => unreachable!(),
1059    /// }
1060    /// ```
1061    pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1062        match Instant::now().checked_add(timeout) {
1063            Some(deadline) => self.ready_deadline(deadline),
1064            None => Ok(self.ready()),
1065        }
1066    }
1067
1068    /// Blocks until a given deadline, or until one of the operations becomes ready.
1069    ///
1070    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1071    /// the same time, a random one among them is chosen. If none of the operations become ready
1072    /// before the deadline, an error is returned.
1073    ///
1074    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1075    /// even when it will simply return an error because the channel is disconnected.
1076    ///
1077    /// Note that this method might return with success spuriously, so it's a good idea to double
1078    /// check if the operation is really ready.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```
1083    /// use std::thread;
1084    /// use std::time::{Duration, Instant};
1085    /// use crossbeam_channel::{unbounded, Select};
1086    ///
1087    /// let deadline = Instant::now() + Duration::from_millis(500);
1088    ///
1089    /// let (s1, r1) = unbounded();
1090    /// let (s2, r2) = unbounded();
1091    ///
1092    /// thread::spawn(move || {
1093    ///     thread::sleep(Duration::from_secs(1));
1094    ///     s1.send(10).unwrap();
1095    /// });
1096    /// thread::spawn(move || s2.send(20).unwrap());
1097    ///
1098    /// let mut sel = Select::new();
1099    /// let oper1 = sel.recv(&r1);
1100    /// let oper2 = sel.recv(&r2);
1101    ///
1102    /// // The second operation will be selected because it becomes ready first.
1103    /// match sel.ready_deadline(deadline) {
1104    ///     Err(_) => panic!("should not have timed out"),
1105    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1106    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1107    ///     Ok(_) => unreachable!(),
1108    /// }
1109    /// ```
1110    pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1111        match run_ready(&mut self.handles, Timeout::At(deadline), false) {
1112            None => Err(ReadyTimeoutError),
1113            Some(index) => Ok(index),
1114        }
1115    }
1116}
1117
1118impl<'a> Clone for Select<'a> {
1119    fn clone(&self) -> Select<'a> {
1120        Select {
1121            handles: self.handles.clone(),
1122            next_index: self.next_index,
1123        }
1124    }
1125}
1126
1127impl<'a> Default for Select<'a> {
1128    fn default() -> Select<'a> {
1129        Select::new()
1130    }
1131}
1132
1133impl fmt::Debug for Select<'_> {
1134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135        f.pad("Select { .. }")
1136    }
1137}
1138
1139/// A selected operation that needs to be completed.
1140///
1141/// To complete the operation, call [`send`] or [`recv`].
1142///
1143/// # Panics
1144///
1145/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1146/// `SelectedOperation` is dropped without completion, a panic occurs.
1147///
1148/// [`send`]: SelectedOperation::send
1149/// [`recv`]: SelectedOperation::recv
1150#[must_use]
1151pub struct SelectedOperation<'a> {
1152    /// Token needed to complete the operation.
1153    token: Token,
1154
1155    /// The index of the selected operation.
1156    index: usize,
1157
1158    /// The address of the selected `Sender` or `Receiver`.
1159    ptr: *const u8,
1160
1161    /// Indicates that `Sender`s and `Receiver`s are borrowed.
1162    _marker: PhantomData<&'a ()>,
1163}
1164
1165impl SelectedOperation<'_> {
1166    /// Returns the index of the selected operation.
1167    ///
1168    /// # Examples
1169    ///
1170    /// ```
1171    /// use crossbeam_channel::{bounded, Select};
1172    ///
1173    /// let (s1, r1) = bounded::<()>(0);
1174    /// let (s2, r2) = bounded::<()>(0);
1175    /// let (s3, r3) = bounded::<()>(1);
1176    ///
1177    /// let mut sel = Select::new();
1178    /// let oper1 = sel.send(&s1);
1179    /// let oper2 = sel.recv(&r2);
1180    /// let oper3 = sel.send(&s3);
1181    ///
1182    /// // Only the last operation is ready.
1183    /// let oper = sel.select();
1184    /// assert_eq!(oper.index(), 2);
1185    /// assert_eq!(oper.index(), oper3);
1186    ///
1187    /// // Complete the operation.
1188    /// oper.send(&s3, ()).unwrap();
1189    /// ```
1190    pub fn index(&self) -> usize {
1191        self.index
1192    }
1193
1194    /// Completes the send operation.
1195    ///
1196    /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1197    /// when the operation was added.
1198    ///
1199    /// # Panics
1200    ///
1201    /// Panics if an incorrect [`Sender`] reference is passed.
1202    ///
1203    /// # Examples
1204    ///
1205    /// ```
1206    /// use crossbeam_channel::{bounded, Select, SendError};
1207    ///
1208    /// let (s, r) = bounded::<i32>(0);
1209    /// drop(r);
1210    ///
1211    /// let mut sel = Select::new();
1212    /// let oper1 = sel.send(&s);
1213    ///
1214    /// let oper = sel.select();
1215    /// assert_eq!(oper.index(), oper1);
1216    /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1217    /// ```
1218    pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1219        assert!(
1220            s as *const Sender<T> as *const u8 == self.ptr,
1221            "passed a sender that wasn't selected",
1222        );
1223        let res = unsafe { channel::write(s, &mut self.token, msg) };
1224        mem::forget(self);
1225        res.map_err(SendError)
1226    }
1227
1228    /// Completes the receive operation.
1229    ///
1230    /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1231    /// when the operation was added.
1232    ///
1233    /// # Panics
1234    ///
1235    /// Panics if an incorrect [`Receiver`] reference is passed.
1236    ///
1237    /// # Examples
1238    ///
1239    /// ```
1240    /// use crossbeam_channel::{bounded, Select, RecvError};
1241    ///
1242    /// let (s, r) = bounded::<i32>(0);
1243    /// drop(s);
1244    ///
1245    /// let mut sel = Select::new();
1246    /// let oper1 = sel.recv(&r);
1247    ///
1248    /// let oper = sel.select();
1249    /// assert_eq!(oper.index(), oper1);
1250    /// assert_eq!(oper.recv(&r), Err(RecvError));
1251    /// ```
1252    pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1253        assert!(
1254            r as *const Receiver<T> as *const u8 == self.ptr,
1255            "passed a receiver that wasn't selected",
1256        );
1257        let res = unsafe { channel::read(r, &mut self.token) };
1258        mem::forget(self);
1259        res.map_err(|_| RecvError)
1260    }
1261}
1262
1263impl fmt::Debug for SelectedOperation<'_> {
1264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1265        f.pad("SelectedOperation { .. }")
1266    }
1267}
1268
1269impl Drop for SelectedOperation<'_> {
1270    fn drop(&mut self) {
1271        panic!("dropped `SelectedOperation` without completing the operation");
1272    }
1273}