1//! An event/listener framework to allow listeners to subscribe to event sources.
2//!
3//! To prevent recursive events (events which trigger new events) from leading to two listeners
4//! attempting to mutate the same state simultaneously, an event queue is used to defer new events
5//! until the current event has finished running.
67use std::collections::VecDeque;
8use std::num::Wrapping;
9use std::sync::{Arc, Weak};
1011use atomic_refcell::AtomicRefCell;
1213/// A queue of events (functions/closures) which when run can add their own events to the queue.
14/// This allows events to be deferred and run later.
15#[allow(clippy::type_complexity)]
16pub struct CallbackQueue(VecDeque<Box<dyn FnOnce(&mut Self)>>);
1718impl CallbackQueue {
19/// Create an empty event queue.
20pub fn new() -> Self {
21Self(VecDeque::new())
22 }
2324pub fn len(&self) -> usize {
25self.0.len()
26 }
2728pub fn is_empty(&self) -> bool {
29self.len() == 0
30}
3132/// Add an event to the queue.
33pub fn add(&mut self, f: impl FnOnce(&mut Self) + 'static) {
34self.0.push_back(Box::new(f));
35 }
3637/// Process all of the events in the queue (and any new events that are generated).
38pub fn run(&mut self) {
39// loop until there are no more events
40let mut count = 0;
41while let Some(f) = self.0.pop_front() {
42// run the event and allow it to add new events
43(f)(self);
4445 count += 1;
46if count == 10_000 {
47log::trace!("Possible infinite loop of event callbacks.");
48 } else if count == 10_000_000 {
49log::warn!("Very likely an infinite loop of event callbacks.");
50 }
51 }
52 }
5354/// A convenience function to create a [CallbackQueue], allow the caller to add events, and
55 /// process them all before returning.
56fn queue_and_run<F, U>(f: F) -> U
57where
58F: FnOnce(&mut Self) -> U,
59 {
60let mut cb_queue = Self::new();
61let rv = (f)(&mut cb_queue);
62 cb_queue.run();
63 rv
64 }
6566/// A convenience function to create a [CallbackQueue], allow the caller to add events, and
67 /// process them all before returning.
68 ///
69 /// This also has the side-effect of ensuring that a global thread-local queue is configured for
70 /// C code using
71 /// [`with_global_cb_queue`](crate::utility::legacy_callback_queue::with_global_cb_queue). We do
72 /// this for convenience and to help prevent bugs where we forget to call
73 /// `with_global_cb_queue`. Ideally we'd like to remove this side-effect as we remove more C
74 /// code from Shadow.
75 ///
76 /// TODO: Once we have removed C file objects, remove this function and make
77 /// `Self::queue_and_run` public.
78pub fn queue_and_run_with_legacy<F, U>(f: F) -> U
79where
80F: FnOnce(&mut Self) -> U,
81 {
82crate::utility::legacy_callback_queue::with_global_cb_queue(|| Self::queue_and_run(f))
83 }
84}
8586impl Default for CallbackQueue {
87fn default() -> Self {
88Self::new()
89 }
90}
9192impl Drop for CallbackQueue {
93fn drop(&mut self) {
94// don't show the following warning message if panicking
95if std::thread::panicking() {
96return;
97 }
9899if !self.is_empty() {
100// panic in debug builds since the backtrace will be helpful for debugging
101debug_panic!("Dropping EventQueue while it still has events pending.");
102 }
103 }
104}
105106#[derive(Clone, Copy, PartialEq, PartialOrd)]
107struct HandleId(u32);
108109#[must_use = "Stops listening when the handle is dropped"]
110/// A handle is used to stop listening for events. The listener will receive events until the handle
111/// is dropped, or [`stop_listening()`](Self::stop_listening) is called.
112pub struct Handle<T> {
113 id: HandleId,
114 source: Weak<AtomicRefCell<EventSourceInner<T>>>,
115}
116117impl<T> Handle<T> {
118fn new(id: HandleId, source: Weak<AtomicRefCell<EventSourceInner<T>>>) -> Self {
119Self { id, source }
120 }
121122/// Stop listening for new events. Equivalent to dropping the handle.
123pub fn stop_listening(self) {}
124}
125126impl<T> Drop for Handle<T> {
127fn drop(&mut self) {
128if let Some(x) = self.source.upgrade() {
129 x.borrow_mut().remove_listener(self.id);
130 }
131 }
132}
133134/// Emits events to subscribed listeners.
135pub struct EventSource<T> {
136 inner: Arc<AtomicRefCell<EventSourceInner<T>>>,
137}
138139impl<T: Clone + Copy + 'static> EventSource<T> {
140pub fn new() -> Self {
141Self {
142 inner: Arc::new(AtomicRefCell::new(EventSourceInner::new())),
143 }
144 }
145146/// Add a listener.
147pub fn add_listener(
148&mut self,
149 notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
150 ) -> Handle<T> {
151let inner_ref = Arc::downgrade(&Arc::clone(&self.inner));
152self.inner.borrow_mut().add_listener(inner_ref, notify_fn)
153 }
154155/// Notify all listeners.
156pub fn notify_listeners(&mut self, message: T, cb_queue: &mut CallbackQueue) {
157for (_, l) in &self.inner.borrow().listeners {
158let l_clone = l.clone();
159 cb_queue.add(move |cb_queue| (l_clone)(message, cb_queue));
160 }
161 }
162}
163164impl<T: Clone + Copy + 'static> Default for EventSource<T> {
165fn default() -> Self {
166Self::new()
167 }
168}
169170type Listener<T> = Arc<dyn Fn(T, &mut CallbackQueue) + Send + Sync>;
171172struct EventSourceInner<T> {
173 listeners: Vec<(HandleId, Listener<T>)>,
174 next_id: Wrapping<u32>,
175}
176177impl<T> EventSourceInner<T> {
178pub fn new() -> Self {
179Self {
180 listeners: Vec::new(),
181 next_id: Wrapping(0),
182 }
183 }
184185fn get_unused_id(&mut self) -> HandleId {
186// it's very unlikely that there will be collisions, but we loop anyways since we
187 // don't care about worst-case performance here
188loop {
189let id = HandleId(self.next_id.0);
190self.next_id += Wrapping(1);
191192if !self.listeners.iter().any(|x| x.0 == id) {
193break id;
194 }
195 }
196 }
197198pub fn add_listener(
199&mut self,
200 inner: std::sync::Weak<AtomicRefCell<Self>>,
201 notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
202 ) -> Handle<T> {
203let handle_id = self.get_unused_id();
204205self.listeners.push((handle_id, Arc::new(notify_fn)));
206207 Handle::new(handle_id, inner)
208 }
209210pub fn remove_listener(&mut self, id: HandleId) {
211self.listeners
212 .remove(self.listeners.iter().position(|x| x.0 == id).unwrap());
213 }
214}
215216#[cfg(test)]
217mod tests {
218use super::*;
219220#[test]
221fn test_eventqueue() {
222let counter = Arc::new(AtomicRefCell::new(0u32));
223let counter_clone = Arc::clone(&counter);
224225let mut source = EventSource::new();
226227let handle = source.add_listener(move |inc, _| {
228*counter_clone.borrow_mut() += inc;
229 });
230231 CallbackQueue::queue_and_run(|queue| source.notify_listeners(1, queue));
232 CallbackQueue::queue_and_run(|queue| source.notify_listeners(3, queue));
233234 handle.stop_listening();
235236 CallbackQueue::queue_and_run(|queue| source.notify_listeners(5, queue));
237 CallbackQueue::queue_and_run(|queue| source.notify_listeners(7, queue));
238239assert_eq!(*counter.borrow(), 4);
240 }
241}