shadow_rs/utility/
callback_queue.rs

1//! An event/listener framework to allow listeners to subscribe to event sources.
2//!
3//! To prevent recursive events (events which trigger new events) from leading to two listeners
4//! attempting to mutate the same state simultaneously, an event queue is used to defer new events
5//! until the current event has finished running.
6
7use std::collections::VecDeque;
8use std::num::Wrapping;
9use std::sync::{Arc, Weak};
10
11use atomic_refcell::AtomicRefCell;
12
13/// A queue of events (functions/closures) which when run can add their own events to the queue.
14/// This allows events to be deferred and run later.
15#[allow(clippy::type_complexity)]
16pub struct CallbackQueue(VecDeque<Box<dyn FnOnce(&mut Self)>>);
17
18impl CallbackQueue {
19    /// Create an empty event queue.
20    pub fn new() -> Self {
21        Self(VecDeque::new())
22    }
23
24    pub fn len(&self) -> usize {
25        self.0.len()
26    }
27
28    pub fn is_empty(&self) -> bool {
29        self.len() == 0
30    }
31
32    /// Add an event to the queue.
33    pub fn add(&mut self, f: impl FnOnce(&mut Self) + 'static) {
34        self.0.push_back(Box::new(f));
35    }
36
37    /// Process all of the events in the queue (and any new events that are generated).
38    pub fn run(&mut self) {
39        // loop until there are no more events
40        let mut count = 0;
41        while let Some(f) = self.0.pop_front() {
42            // run the event and allow it to add new events
43            (f)(self);
44
45            count += 1;
46            if count == 10_000 {
47                log::trace!("Possible infinite loop of event callbacks.");
48            } else if count == 10_000_000 {
49                log::warn!("Very likely an infinite loop of event callbacks.");
50            }
51        }
52    }
53
54    /// A convenience function to create a [CallbackQueue], allow the caller to add events, and
55    /// process them all before returning.
56    fn queue_and_run<F, U>(f: F) -> U
57    where
58        F: FnOnce(&mut Self) -> U,
59    {
60        let mut cb_queue = Self::new();
61        let rv = (f)(&mut cb_queue);
62        cb_queue.run();
63        rv
64    }
65
66    /// A convenience function to create a [CallbackQueue], allow the caller to add events, and
67    /// process them all before returning.
68    ///
69    /// This also has the side-effect of ensuring that a global thread-local queue is configured for
70    /// C code using
71    /// [`with_global_cb_queue`](crate::utility::legacy_callback_queue::with_global_cb_queue). We do
72    /// this for convenience and to help prevent bugs where we forget to call
73    /// `with_global_cb_queue`. Ideally we'd like to remove this side-effect as we remove more C
74    /// code from Shadow.
75    ///
76    /// TODO: Once we have removed C file objects, remove this function and make
77    /// `Self::queue_and_run` public.
78    pub fn queue_and_run_with_legacy<F, U>(f: F) -> U
79    where
80        F: FnOnce(&mut Self) -> U,
81    {
82        crate::utility::legacy_callback_queue::with_global_cb_queue(|| Self::queue_and_run(f))
83    }
84}
85
86impl Default for CallbackQueue {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl Drop for CallbackQueue {
93    fn drop(&mut self) {
94        // don't show the following warning message if panicking
95        if std::thread::panicking() {
96            return;
97        }
98
99        if !self.is_empty() {
100            // panic in debug builds since the backtrace will be helpful for debugging
101            debug_panic!("Dropping EventQueue while it still has events pending.");
102        }
103    }
104}
105
106#[derive(Clone, Copy, PartialEq, PartialOrd)]
107struct HandleId(u32);
108
109#[must_use = "Stops listening when the handle is dropped"]
110/// A handle is used to stop listening for events. The listener will receive events until the handle
111/// is dropped, or [`stop_listening()`](Self::stop_listening) is called.
112pub struct Handle<T> {
113    id: HandleId,
114    source: Weak<AtomicRefCell<EventSourceInner<T>>>,
115}
116
117impl<T> Handle<T> {
118    fn new(id: HandleId, source: Weak<AtomicRefCell<EventSourceInner<T>>>) -> Self {
119        Self { id, source }
120    }
121
122    /// Stop listening for new events. Equivalent to dropping the handle.
123    pub fn stop_listening(self) {}
124}
125
126impl<T> Drop for Handle<T> {
127    fn drop(&mut self) {
128        if let Some(x) = self.source.upgrade() {
129            x.borrow_mut().remove_listener(self.id);
130        }
131    }
132}
133
134/// Emits events to subscribed listeners.
135pub struct EventSource<T> {
136    inner: Arc<AtomicRefCell<EventSourceInner<T>>>,
137}
138
139impl<T: Clone + Copy + 'static> EventSource<T> {
140    pub fn new() -> Self {
141        Self {
142            inner: Arc::new(AtomicRefCell::new(EventSourceInner::new())),
143        }
144    }
145
146    /// Add a listener.
147    pub fn add_listener(
148        &mut self,
149        notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
150    ) -> Handle<T> {
151        let inner_ref = Arc::downgrade(&Arc::clone(&self.inner));
152        self.inner.borrow_mut().add_listener(inner_ref, notify_fn)
153    }
154
155    /// Notify all listeners.
156    pub fn notify_listeners(&mut self, message: T, cb_queue: &mut CallbackQueue) {
157        for (_, l) in &self.inner.borrow().listeners {
158            let l_clone = l.clone();
159            cb_queue.add(move |cb_queue| (l_clone)(message, cb_queue));
160        }
161    }
162}
163
164impl<T: Clone + Copy + 'static> Default for EventSource<T> {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170type Listener<T> = Arc<dyn Fn(T, &mut CallbackQueue) + Send + Sync>;
171
172struct EventSourceInner<T> {
173    listeners: Vec<(HandleId, Listener<T>)>,
174    next_id: Wrapping<u32>,
175}
176
177impl<T> EventSourceInner<T> {
178    pub fn new() -> Self {
179        Self {
180            listeners: Vec::new(),
181            next_id: Wrapping(0),
182        }
183    }
184
185    fn get_unused_id(&mut self) -> HandleId {
186        // it's very unlikely that there will be collisions, but we loop anyways since we
187        // don't care about worst-case performance here
188        loop {
189            let id = HandleId(self.next_id.0);
190            self.next_id += Wrapping(1);
191
192            if !self.listeners.iter().any(|x| x.0 == id) {
193                break id;
194            }
195        }
196    }
197
198    pub fn add_listener(
199        &mut self,
200        inner: std::sync::Weak<AtomicRefCell<Self>>,
201        notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
202    ) -> Handle<T> {
203        let handle_id = self.get_unused_id();
204
205        self.listeners.push((handle_id, Arc::new(notify_fn)));
206
207        Handle::new(handle_id, inner)
208    }
209
210    pub fn remove_listener(&mut self, id: HandleId) {
211        self.listeners
212            .remove(self.listeners.iter().position(|x| x.0 == id).unwrap());
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_eventqueue() {
222        let counter = Arc::new(AtomicRefCell::new(0u32));
223        let counter_clone = Arc::clone(&counter);
224
225        let mut source = EventSource::new();
226
227        let handle = source.add_listener(move |inc, _| {
228            *counter_clone.borrow_mut() += inc;
229        });
230
231        CallbackQueue::queue_and_run(|queue| source.notify_listeners(1, queue));
232        CallbackQueue::queue_and_run(|queue| source.notify_listeners(3, queue));
233
234        handle.stop_listening();
235
236        CallbackQueue::queue_and_run(|queue| source.notify_listeners(5, queue));
237        CallbackQueue::queue_and_run(|queue| source.notify_listeners(7, queue));
238
239        assert_eq!(*counter.borrow(), 4);
240    }
241}