shadow_rs/utility/
callback_queue.rs1use std::collections::VecDeque;
8use std::num::Wrapping;
9use std::sync::{Arc, Weak};
10
11use atomic_refcell::AtomicRefCell;
12
13#[allow(clippy::type_complexity)]
16pub struct CallbackQueue(VecDeque<Box<dyn FnOnce(&mut Self)>>);
17
18impl CallbackQueue {
19 pub fn new() -> Self {
21 Self(VecDeque::new())
22 }
23
24 pub fn len(&self) -> usize {
25 self.0.len()
26 }
27
28 pub fn is_empty(&self) -> bool {
29 self.len() == 0
30 }
31
32 pub fn add(&mut self, f: impl FnOnce(&mut Self) + 'static) {
34 self.0.push_back(Box::new(f));
35 }
36
37 pub fn run(&mut self) {
39 let mut count = 0;
41 while let Some(f) = self.0.pop_front() {
42 (f)(self);
44
45 count += 1;
46 if count == 10_000 {
47 log::trace!("Possible infinite loop of event callbacks.");
48 } else if count == 10_000_000 {
49 log::warn!("Very likely an infinite loop of event callbacks.");
50 }
51 }
52 }
53
54 fn queue_and_run<F, U>(f: F) -> U
57 where
58 F: FnOnce(&mut Self) -> U,
59 {
60 let mut cb_queue = Self::new();
61 let rv = (f)(&mut cb_queue);
62 cb_queue.run();
63 rv
64 }
65
66 pub fn queue_and_run_with_legacy<F, U>(f: F) -> U
79 where
80 F: FnOnce(&mut Self) -> U,
81 {
82 crate::utility::legacy_callback_queue::with_global_cb_queue(|| Self::queue_and_run(f))
83 }
84}
85
86impl Default for CallbackQueue {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl Drop for CallbackQueue {
93 fn drop(&mut self) {
94 if std::thread::panicking() {
96 return;
97 }
98
99 if !self.is_empty() {
100 debug_panic!("Dropping EventQueue while it still has events pending.");
102 }
103 }
104}
105
106#[derive(Clone, Copy, PartialEq, PartialOrd)]
107struct HandleId(u32);
108
109#[must_use = "Stops listening when the handle is dropped"]
110pub struct Handle<T> {
113 id: HandleId,
114 source: Weak<AtomicRefCell<EventSourceInner<T>>>,
115}
116
117impl<T> Handle<T> {
118 fn new(id: HandleId, source: Weak<AtomicRefCell<EventSourceInner<T>>>) -> Self {
119 Self { id, source }
120 }
121
122 pub fn stop_listening(self) {}
124}
125
126impl<T> Drop for Handle<T> {
127 fn drop(&mut self) {
128 if let Some(x) = self.source.upgrade() {
129 x.borrow_mut().remove_listener(self.id);
130 }
131 }
132}
133
134pub struct EventSource<T> {
136 inner: Arc<AtomicRefCell<EventSourceInner<T>>>,
137}
138
139impl<T: Clone + Copy + 'static> EventSource<T> {
140 pub fn new() -> Self {
141 Self {
142 inner: Arc::new(AtomicRefCell::new(EventSourceInner::new())),
143 }
144 }
145
146 pub fn add_listener(
148 &mut self,
149 notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
150 ) -> Handle<T> {
151 let inner_ref = Arc::downgrade(&Arc::clone(&self.inner));
152 self.inner.borrow_mut().add_listener(inner_ref, notify_fn)
153 }
154
155 pub fn notify_listeners(&mut self, message: T, cb_queue: &mut CallbackQueue) {
157 for (_, l) in &self.inner.borrow().listeners {
158 let l_clone = l.clone();
159 cb_queue.add(move |cb_queue| (l_clone)(message, cb_queue));
160 }
161 }
162}
163
164impl<T: Clone + Copy + 'static> Default for EventSource<T> {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170type Listener<T> = Arc<dyn Fn(T, &mut CallbackQueue) + Send + Sync>;
171
172struct EventSourceInner<T> {
173 listeners: Vec<(HandleId, Listener<T>)>,
174 next_id: Wrapping<u32>,
175}
176
177impl<T> EventSourceInner<T> {
178 pub fn new() -> Self {
179 Self {
180 listeners: Vec::new(),
181 next_id: Wrapping(0),
182 }
183 }
184
185 fn get_unused_id(&mut self) -> HandleId {
186 loop {
189 let id = HandleId(self.next_id.0);
190 self.next_id += Wrapping(1);
191
192 if !self.listeners.iter().any(|x| x.0 == id) {
193 break id;
194 }
195 }
196 }
197
198 pub fn add_listener(
199 &mut self,
200 inner: std::sync::Weak<AtomicRefCell<Self>>,
201 notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
202 ) -> Handle<T> {
203 let handle_id = self.get_unused_id();
204
205 self.listeners.push((handle_id, Arc::new(notify_fn)));
206
207 Handle::new(handle_id, inner)
208 }
209
210 pub fn remove_listener(&mut self, id: HandleId) {
211 self.listeners
212 .remove(self.listeners.iter().position(|x| x.0 == id).unwrap());
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_eventqueue() {
222 let counter = Arc::new(AtomicRefCell::new(0u32));
223 let counter_clone = Arc::clone(&counter);
224
225 let mut source = EventSource::new();
226
227 let handle = source.add_listener(move |inc, _| {
228 *counter_clone.borrow_mut() += inc;
229 });
230
231 CallbackQueue::queue_and_run(|queue| source.notify_listeners(1, queue));
232 CallbackQueue::queue_and_run(|queue| source.notify_listeners(3, queue));
233
234 handle.stop_listening();
235
236 CallbackQueue::queue_and_run(|queue| source.notify_listeners(5, queue));
237 CallbackQueue::queue_and_run(|queue| source.notify_listeners(7, queue));
238
239 assert_eq!(*counter.borrow(), 4);
240 }
241}