1//! Channel that delivers a message at a certain moment in time.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
45use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::Instant;
89use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
1314/// Result of a receive operation.
15pub(crate) type AtToken = Option<Instant>;
1617/// Channel that delivers a message at a certain moment in time
18pub(crate) struct Channel {
19/// The instant at which the message will be delivered.
20delivery_time: Instant,
2122/// `true` if the message has been received.
23received: AtomicBool,
24}
2526impl Channel {
27/// Creates a channel that delivers a message at a certain instant in time.
28#[inline]
29pub(crate) fn new_deadline(when: Instant) -> Self {
30 Channel {
31 delivery_time: when,
32 received: AtomicBool::new(false),
33 }
34 }
3536/// Attempts to receive a message without blocking.
37#[inline]
38pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39// We use relaxed ordering because this is just an optional optimistic check.
40if self.received.load(Ordering::Relaxed) {
41// The message has already been received.
42return Err(TryRecvError::Empty);
43 }
4445if Instant::now() < self.delivery_time {
46// The message was not delivered yet.
47return Err(TryRecvError::Empty);
48 }
4950// Try receiving the message if it is still available.
51if !self.received.swap(true, Ordering::SeqCst) {
52// Success! Return delivery time as the message.
53Ok(self.delivery_time)
54 } else {
55// The message was already received.
56Err(TryRecvError::Empty)
57 }
58 }
5960/// Receives a message from the channel.
61#[inline]
62pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
63// We use relaxed ordering because this is just an optional optimistic check.
64if self.received.load(Ordering::Relaxed) {
65// The message has already been received.
66utils::sleep_until(deadline);
67return Err(RecvTimeoutError::Timeout);
68 }
6970// Wait until the message is received or the deadline is reached.
71loop {
72let now = Instant::now();
7374let deadline = match deadline {
75// Check if we can receive the next message.
76_ if now >= self.delivery_time => break,
77// Check if the timeout deadline has been reached.
78Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
7980// Sleep until one of the above happens
81Some(d) if d < self.delivery_time => d,
82_ => self.delivery_time,
83 };
8485 thread::sleep(deadline - now);
86 }
8788// Try receiving the message if it is still available.
89if !self.received.swap(true, Ordering::SeqCst) {
90// Success! Return the message, which is the instant at which it was delivered.
91Ok(self.delivery_time)
92 } else {
93// The message was already received. Block forever.
94utils::sleep_until(None);
95unreachable!()
96 }
97 }
9899/// Reads a message from the channel.
100#[inline]
101pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
102 token.at.ok_or(())
103 }
104105/// Returns `true` if the channel is empty.
106#[inline]
107pub(crate) fn is_empty(&self) -> bool {
108// We use relaxed ordering because this is just an optional optimistic check.
109if self.received.load(Ordering::Relaxed) {
110return true;
111 }
112113// If the delivery time hasn't been reached yet, the channel is empty.
114if Instant::now() < self.delivery_time {
115return true;
116 }
117118// The delivery time has been reached. The channel is empty only if the message has already
119 // been received.
120self.received.load(Ordering::SeqCst)
121 }
122123/// Returns `true` if the channel is full.
124#[inline]
125pub(crate) fn is_full(&self) -> bool {
126 !self.is_empty()
127 }
128129/// Returns the number of messages in the channel.
130#[inline]
131pub(crate) fn len(&self) -> usize {
132if self.is_empty() {
1330
134} else {
1351
136}
137 }
138139/// Returns the capacity of the channel.
140#[inline]
141pub(crate) fn capacity(&self) -> Option<usize> {
142Some(1)
143 }
144}
145146impl SelectHandle for Channel {
147#[inline]
148fn try_select(&self, token: &mut Token) -> bool {
149match self.try_recv() {
150Ok(msg) => {
151 token.at = Some(msg);
152true
153}
154Err(TryRecvError::Disconnected) => {
155 token.at = None;
156true
157}
158Err(TryRecvError::Empty) => false,
159 }
160 }
161162#[inline]
163fn deadline(&self) -> Option<Instant> {
164// We use relaxed ordering because this is just an optional optimistic check.
165if self.received.load(Ordering::Relaxed) {
166None
167} else {
168Some(self.delivery_time)
169 }
170 }
171172#[inline]
173fn register(&self, _oper: Operation, _cx: &Context) -> bool {
174self.is_ready()
175 }
176177#[inline]
178fn unregister(&self, _oper: Operation) {}
179180#[inline]
181fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
182self.try_select(token)
183 }
184185#[inline]
186fn is_ready(&self) -> bool {
187 !self.is_empty()
188 }
189190#[inline]
191fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
192self.is_ready()
193 }
194195#[inline]
196fn unwatch(&self, _oper: Operation) {}
197}