shadow_rs/utility/
callback_queue.rsuse std::collections::VecDeque;
use std::num::Wrapping;
use std::sync::{Arc, Weak};
use atomic_refcell::AtomicRefCell;
#[allow(clippy::type_complexity)]
pub struct CallbackQueue(VecDeque<Box<dyn FnOnce(&mut Self)>>);
impl CallbackQueue {
pub fn new() -> Self {
Self(VecDeque::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn add(&mut self, f: impl FnOnce(&mut Self) + 'static) {
self.0.push_back(Box::new(f));
}
pub fn run(&mut self) {
let mut count = 0;
while let Some(f) = self.0.pop_front() {
(f)(self);
count += 1;
if count == 10_000 {
log::trace!("Possible infinite loop of event callbacks.");
} else if count == 10_000_000 {
log::warn!("Very likely an infinite loop of event callbacks.");
}
}
}
fn queue_and_run<F, U>(f: F) -> U
where
F: FnOnce(&mut Self) -> U,
{
let mut cb_queue = Self::new();
let rv = (f)(&mut cb_queue);
cb_queue.run();
rv
}
pub fn queue_and_run_with_legacy<F, U>(f: F) -> U
where
F: FnOnce(&mut Self) -> U,
{
crate::utility::legacy_callback_queue::with_global_cb_queue(|| Self::queue_and_run(f))
}
}
impl Default for CallbackQueue {
fn default() -> Self {
Self::new()
}
}
impl Drop for CallbackQueue {
fn drop(&mut self) {
if std::thread::panicking() {
return;
}
if !self.is_empty() {
debug_panic!("Dropping EventQueue while it still has events pending.");
}
}
}
#[derive(Clone, Copy, PartialEq, PartialOrd)]
struct HandleId(u32);
#[must_use = "Stops listening when the handle is dropped"]
pub struct Handle<T> {
id: HandleId,
source: Weak<AtomicRefCell<EventSourceInner<T>>>,
}
impl<T> Handle<T> {
fn new(id: HandleId, source: Weak<AtomicRefCell<EventSourceInner<T>>>) -> Self {
Self { id, source }
}
pub fn stop_listening(self) {}
}
impl<T> Drop for Handle<T> {
fn drop(&mut self) {
if let Some(x) = self.source.upgrade() {
x.borrow_mut().remove_listener(self.id);
}
}
}
pub struct EventSource<T> {
inner: Arc<AtomicRefCell<EventSourceInner<T>>>,
}
impl<T: Clone + Copy + 'static> EventSource<T> {
pub fn new() -> Self {
Self {
inner: Arc::new(AtomicRefCell::new(EventSourceInner::new())),
}
}
pub fn add_listener(
&mut self,
notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
) -> Handle<T> {
let inner_ref = Arc::downgrade(&Arc::clone(&self.inner));
self.inner.borrow_mut().add_listener(inner_ref, notify_fn)
}
pub fn notify_listeners(&mut self, message: T, cb_queue: &mut CallbackQueue) {
for (_, l) in &self.inner.borrow().listeners {
let l_clone = l.clone();
cb_queue.add(move |cb_queue| (l_clone)(message, cb_queue));
}
}
}
impl<T: Clone + Copy + 'static> Default for EventSource<T> {
fn default() -> Self {
Self::new()
}
}
type Listener<T> = Arc<dyn Fn(T, &mut CallbackQueue) + Send + Sync>;
struct EventSourceInner<T> {
listeners: Vec<(HandleId, Listener<T>)>,
next_id: Wrapping<u32>,
}
impl<T> EventSourceInner<T> {
pub fn new() -> Self {
Self {
listeners: Vec::new(),
next_id: Wrapping(0),
}
}
fn get_unused_id(&mut self) -> HandleId {
loop {
let id = HandleId(self.next_id.0);
self.next_id += Wrapping(1);
if !self.listeners.iter().any(|x| x.0 == id) {
break id;
}
}
}
pub fn add_listener(
&mut self,
inner: std::sync::Weak<AtomicRefCell<Self>>,
notify_fn: impl Fn(T, &mut CallbackQueue) + Send + Sync + 'static,
) -> Handle<T> {
let handle_id = self.get_unused_id();
self.listeners.push((handle_id, Arc::new(notify_fn)));
Handle::new(handle_id, inner)
}
pub fn remove_listener(&mut self, id: HandleId) {
self.listeners
.remove(self.listeners.iter().position(|x| x.0 == id).unwrap());
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_eventqueue() {
let counter = Arc::new(AtomicRefCell::new(0u32));
let counter_clone = Arc::clone(&counter);
let mut source = EventSource::new();
let handle = source.add_listener(move |inc, _| {
*counter_clone.borrow_mut() += inc;
});
CallbackQueue::queue_and_run(|queue| source.notify_listeners(1, queue));
CallbackQueue::queue_and_run(|queue| source.notify_listeners(3, queue));
handle.stop_listening();
CallbackQueue::queue_and_run(|queue| source.notify_listeners(5, queue));
CallbackQueue::queue_and_run(|queue| source.notify_listeners(7, queue));
assert_eq!(*counter.borrow(), 4);
}
}