cc/parallel/
async_executor.rs1use std::{
2 cell::Cell,
3 future::Future,
4 pin::Pin,
5 ptr,
6 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
7 thread,
8 time::Duration,
9};
10
11use crate::Error;
12
13const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
14 |_| NOOP_RAW_WAKER,
16 |_| {},
18 |_| {},
20 |_| {},
22);
23const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE);
24
25#[derive(Default)]
26pub(crate) struct YieldOnce(bool);
27
28impl Future for YieldOnce {
29 type Output = ();
30
31 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
32 let flag = &mut std::pin::Pin::into_inner(self).0;
33 if !*flag {
34 *flag = true;
35 Poll::Pending
36 } else {
37 Poll::Ready(())
38 }
39 }
40}
41
42pub(crate) fn block_on<Fut1, Fut2>(
48 mut fut1: Fut1,
49 mut fut2: Fut2,
50 has_made_progress: &Cell<bool>,
51) -> Result<(), Error>
52where
53 Fut1: Future<Output = Result<(), Error>>,
54 Fut2: Future<Output = Result<(), Error>>,
55{
56 let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) });
63 let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) });
64
65 let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
68 let mut context = Context::from_waker(&waker);
69
70 let mut backoff_cnt = 0;
71
72 loop {
73 has_made_progress.set(false);
74
75 if let Some(fut) = fut2.as_mut() {
76 if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
77 fut2 = None;
78 res?;
79 }
80 }
81
82 if let Some(fut) = fut1.as_mut() {
83 if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
84 fut1 = None;
85 res?;
86 }
87 }
88
89 if fut1.is_none() && fut2.is_none() {
90 return Ok(());
91 }
92
93 if !has_made_progress.get() {
94 if backoff_cnt > 3 {
95 let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10));
98 thread::sleep(duration);
99 } else {
100 thread::yield_now();
109 }
110 }
111
112 backoff_cnt = if has_made_progress.get() {
113 0
114 } else {
115 backoff_cnt + 1
116 };
117 }
118}