shadow_rs/utility/
callback_queue.rs1use std::collections::VecDeque;
8use std::num::Wrapping;
9use std::sync::{Arc, Weak};
10
11use atomic_refcell::AtomicRefCell;
12
13#[allow(clippy::type_complexity)]
16pub struct CallbackQueue(VecDeque<Box<dyn FnOnce(&mut Self)>>);
17
18impl CallbackQueue {
19    pub fn new() -> Self {
21        Self(VecDeque::new())
22    }
23
24    pub fn len(&self) -> usize {
25        self.0.len()
26    }
27
28    pub fn is_empty(&self) -> bool {
29        self.len() == 0
30    }
31
32    pub fn add(&mut self, f: impl FnOnce(&mut Self) + 'static) {
34        self.0.push_back(Box::new(f));
35    }
36
37    pub fn run(&mut self) {
39        let mut count = 0;
41        while let Some(f) = self.0.pop_front() {
42            (f)(self);
44
45            count += 1;
46            if count == 10_000 {
47                log::trace!("Possible infinite loop of event callbacks.");
48            } else if count == 10_000_000 {
49                log::warn!("Very likely an infinite loop of event callbacks.");
50            }
51        }
52    }
53
54    fn queue_and_run<F, U>(f: F) -> U
57    where
58        F: FnOnce(&mut Self) -> U,
59    {
60        let mut cb_queue = Self::new();
61        let rv = (f)(&mut cb_queue);
62        cb_queue.run();
63        rv
64    }
65
66    pub fn queue_and_run_with_legacy<F, U>(f: F) -> U
79    where
80        F: FnOnce(&mut Self) -> U,
81    {
82        crate::utility::legacy_callback_queue::with_global_cb_queue(|| Self::queue_and_run(f))
83    }
84}
85
86impl Default for CallbackQueue {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl Drop for CallbackQueue {
93    fn drop(&mut self) {
94        if std::thread::panicking() {
96            return;
97        }
98
99        if !self.is_empty() {
100            debug_panic!("Dropping EventQueue while it still has events pending.");
102        }
103    }
104}
105
106#[derive(Clone, Copy, PartialEq, PartialOrd)]
107struct HandleId(u32);
108
109#[must_use = "Stops listening when the handle is dropped"]
110pub struct Handle<T> {
113    id: HandleId,
114    source: Weak<AtomicRefCell<EventSourceInner<T>>>,
115}
116
117impl<T> Handle<T> {
118    fn new(id: HandleId, source: Weak<AtomicRefCell<EventSourceInner<T>>>) -> Self {
119        Self { id, source }
120    }
121
122    pub fn stop_listening(self) {}
124}
125
126impl<T> Drop for Handle<T> {
127    fn drop(&mut self) {
128        if let Some(x) = self.source.upgrade() {
129            x.borrow_mut().remove_listener(self.id);
130        }
131    }
132}
133
134pub struct EventSource<T> {
136    inner: Arc<AtomicRefCell<EventSourceInner<T>>>,
137}
138
139impl<T: Clone + Copy + 'static> EventSource<T> {
140    pub fn new() -> Self {
141        Self {
142            inner: Arc::new(AtomicRefCell::new(EventSourceInner::new())),
143        }
144    }
145
146    pub fn add_listener(
148        &mut self,
149        notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
150    ) -> Handle<T> {
151        let inner_ref = Arc::downgrade(&Arc::clone(&self.inner));
152        self.inner.borrow_mut().add_listener(inner_ref, notify_fn)
153    }
154
155    pub fn notify_listeners(&mut self, message: T, cb_queue: &mut CallbackQueue) {
157        for (_, l) in &self.inner.borrow().listeners {
158            let l_clone = l.clone();
159            cb_queue.add(move |cb_queue| (l_clone)(message, cb_queue));
160        }
161    }
162}
163
164impl<T: Clone + Copy + 'static> Default for EventSource<T> {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170type Listener<T> = Arc<dyn Fn(T, &mut CallbackQueue) + Send + Sync>;
171
172struct EventSourceInner<T> {
173    listeners: Vec<(HandleId, Listener<T>)>,
174    next_id: Wrapping<u32>,
175}
176
177impl<T> EventSourceInner<T> {
178    pub fn new() -> Self {
179        Self {
180            listeners: Vec::new(),
181            next_id: Wrapping(0),
182        }
183    }
184
185    fn get_unused_id(&mut self) -> HandleId {
186        loop {
189            let id = HandleId(self.next_id.0);
190            self.next_id += Wrapping(1);
191
192            if !self.listeners.iter().any(|x| x.0 == id) {
193                break id;
194            }
195        }
196    }
197
198    pub fn add_listener(
199        &mut self,
200        inner: std::sync::Weak<AtomicRefCell<Self>>,
201        notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
202    ) -> Handle<T> {
203        let handle_id = self.get_unused_id();
204
205        self.listeners.push((handle_id, Arc::new(notify_fn)));
206
207        Handle::new(handle_id, inner)
208    }
209
210    pub fn remove_listener(&mut self, id: HandleId) {
211        self.listeners
212            .remove(self.listeners.iter().position(|x| x.0 == id).unwrap());
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_eventqueue() {
222        let counter = Arc::new(AtomicRefCell::new(0u32));
223        let counter_clone = Arc::clone(&counter);
224
225        let mut source = EventSource::new();
226
227        let handle = source.add_listener(move |inc, _| {
228            *counter_clone.borrow_mut() += inc;
229        });
230
231        CallbackQueue::queue_and_run(|queue| source.notify_listeners(1, queue));
232        CallbackQueue::queue_and_run(|queue| source.notify_listeners(3, queue));
233
234        handle.stop_listening();
235
236        CallbackQueue::queue_and_run(|queue| source.notify_listeners(5, queue));
237        CallbackQueue::queue_and_run(|queue| source.notify_listeners(7, queue));
238
239        assert_eq!(*counter.borrow(), 4);
240    }
241}