Skip to content

Commit 8c3c3bd

Browse files
authored
m: Optimize block_on by caching Parker and Waker
1 parent 1b1466a commit 8c3c3bd

File tree

2 files changed

+301
-88
lines changed

2 files changed

+301
-88
lines changed

src/driver.rs

Lines changed: 123 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
use std::cell::Cell;
1+
use std::cell::{Cell, RefCell};
22
use std::future::Future;
33
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
44
use std::sync::Arc;
5+
use std::task::Waker;
56
use std::task::{Context, Poll};
67
use std::thread;
78
use std::time::{Duration, Instant};
89

910
use async_lock::OnceCell;
1011
use futures_lite::pin;
12+
use parking::Parker;
1113
use waker_fn::waker_fn;
1214

1315
use crate::reactor::Reactor;
@@ -120,112 +122,145 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
120122
unparker().unpark();
121123
});
122124

123-
// Parker and unparker for notifying the current thread.
124-
let (p, u) = parking::pair();
125-
// This boolean is set to `true` when the current thread is blocked on I/O.
126-
let io_blocked = Arc::new(AtomicBool::new(false));
125+
// Creates a parker and an associated waker that unparks it.
126+
fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
127+
// Parker and unparker for notifying the current thread.
128+
let (p, u) = parking::pair();
129+
130+
// This boolean is set to `true` when the current thread is blocked on I/O.
131+
let io_blocked = Arc::new(AtomicBool::new(false));
132+
133+
// Prepare the waker.
134+
let waker = waker_fn({
135+
let io_blocked = io_blocked.clone();
136+
move || {
137+
if u.unpark() {
138+
// Check if waking from another thread and if currently blocked on I/O.
139+
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
140+
Reactor::get().notify();
141+
}
142+
}
143+
}
144+
});
145+
146+
(p, waker, io_blocked)
147+
}
127148

128149
thread_local! {
150+
// Cached parker and waker for efficiency.
151+
static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
152+
129153
// Indicates that the current thread is polling I/O, but not necessarily blocked on it.
130154
static IO_POLLING: Cell<bool> = Cell::new(false);
131155
}
132156

133-
// Prepare the waker.
134-
let waker = waker_fn({
135-
let io_blocked = io_blocked.clone();
136-
move || {
137-
if u.unpark() {
138-
// Check if waking from another thread and if currently blocked on I/O.
139-
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
140-
Reactor::get().notify();
141-
}
157+
CACHE.with(|cache| {
158+
// Try grabbing the cached parker and waker.
159+
let tmp_cached;
160+
let tmp_fresh;
161+
let (p, waker, io_blocked) = match cache.try_borrow_mut() {
162+
Ok(cache) => {
163+
// Use the cached parker and waker.
164+
tmp_cached = cache;
165+
&*tmp_cached
142166
}
143-
}
144-
});
145-
let cx = &mut Context::from_waker(&waker);
146-
pin!(future);
167+
Err(_) => {
168+
// Looks like this is a recursive `block_on()` call.
169+
// Create a fresh parker and waker.
170+
tmp_fresh = parker_and_waker();
171+
&tmp_fresh
172+
}
173+
};
147174

148-
loop {
149-
// Poll the future.
150-
if let Poll::Ready(t) = future.as_mut().poll(cx) {
151-
tracing::trace!("completed");
152-
return t;
153-
}
175+
pin!(future);
154176

155-
// Check if a notification was received.
156-
if p.park_timeout(Duration::from_secs(0)) {
157-
tracing::trace!("notified");
177+
let cx = &mut Context::from_waker(waker);
158178

159-
// Try grabbing a lock on the reactor to process I/O events.
160-
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
161-
// First let wakers know this parker is processing I/O events.
162-
IO_POLLING.with(|io| io.set(true));
163-
let _guard = CallOnDrop(|| {
164-
IO_POLLING.with(|io| io.set(false));
165-
});
166-
167-
// Process available I/O events.
168-
reactor_lock.react(Some(Duration::from_secs(0))).ok();
179+
loop {
180+
// Poll the future.
181+
if let Poll::Ready(t) = future.as_mut().poll(cx) {
182+
// Ensure the cached parker is reset to the unnotified state for future block_on calls,
183+
// in case this future called wake and then immediately returned Poll::Ready.
184+
p.park_timeout(Duration::from_secs(0));
185+
tracing::trace!("completed");
186+
return t;
169187
}
170-
continue;
171-
}
172188

173-
// Try grabbing a lock on the reactor to wait on I/O.
174-
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
175-
// Record the instant at which the lock was grabbed.
176-
let start = Instant::now();
177-
178-
loop {
179-
// First let wakers know this parker is blocked on I/O.
180-
IO_POLLING.with(|io| io.set(true));
181-
io_blocked.store(true, Ordering::SeqCst);
182-
let _guard = CallOnDrop(|| {
183-
IO_POLLING.with(|io| io.set(false));
184-
io_blocked.store(false, Ordering::SeqCst);
185-
});
186-
187-
// Check if a notification has been received before `io_blocked` was updated
188-
// because in that case the reactor won't receive a wakeup.
189-
if p.park_timeout(Duration::from_secs(0)) {
190-
tracing::trace!("notified");
191-
break;
192-
}
189+
// Check if a notification was received.
190+
if p.park_timeout(Duration::from_secs(0)) {
191+
tracing::trace!("notified");
193192

194-
// Wait for I/O events.
195-
tracing::trace!("waiting on I/O");
196-
reactor_lock.react(None).ok();
193+
// Try grabbing a lock on the reactor to process I/O events.
194+
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
195+
// First let wakers know this parker is processing I/O events.
196+
IO_POLLING.with(|io| io.set(true));
197+
let _guard = CallOnDrop(|| {
198+
IO_POLLING.with(|io| io.set(false));
199+
});
197200

198-
// Check if a notification has been received.
199-
if p.park_timeout(Duration::from_secs(0)) {
200-
tracing::trace!("notified");
201-
break;
201+
// Process available I/O events.
202+
reactor_lock.react(Some(Duration::from_secs(0))).ok();
202203
}
204+
continue;
205+
}
203206

204-
// Check if this thread been handling I/O events for a long time.
205-
if start.elapsed() > Duration::from_micros(500) {
206-
tracing::trace!("stops hogging the reactor");
207-
208-
// This thread is clearly processing I/O events for some other threads
209-
// because it didn't get a notification yet. It's best to stop hogging the
210-
// reactor and give other threads a chance to process I/O events for
211-
// themselves.
212-
drop(reactor_lock);
213-
214-
// Unpark the "async-io" thread in case no other thread is ready to start
215-
// processing I/O events. This way we prevent a potential latency spike.
216-
unparker().unpark();
217-
218-
// Wait for a notification.
219-
p.park();
220-
break;
207+
// Try grabbing a lock on the reactor to wait on I/O.
208+
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
209+
// Record the instant at which the lock was grabbed.
210+
let start = Instant::now();
211+
212+
loop {
213+
// First let wakers know this parker is blocked on I/O.
214+
IO_POLLING.with(|io| io.set(true));
215+
io_blocked.store(true, Ordering::SeqCst);
216+
let _guard = CallOnDrop(|| {
217+
IO_POLLING.with(|io| io.set(false));
218+
io_blocked.store(false, Ordering::SeqCst);
219+
});
220+
221+
// Check if a notification has been received before `io_blocked` was updated
222+
// because in that case the reactor won't receive a wakeup.
223+
if p.park_timeout(Duration::from_secs(0)) {
224+
tracing::trace!("notified");
225+
break;
226+
}
227+
228+
// Wait for I/O events.
229+
tracing::trace!("waiting on I/O");
230+
reactor_lock.react(None).ok();
231+
232+
// Check if a notification has been received.
233+
if p.park_timeout(Duration::from_secs(0)) {
234+
tracing::trace!("notified");
235+
break;
236+
}
237+
238+
// Check if this thread been handling I/O events for a long time.
239+
if start.elapsed() > Duration::from_micros(500) {
240+
tracing::trace!("stops hogging the reactor");
241+
242+
// This thread is clearly processing I/O events for some other threads
243+
// because it didn't get a notification yet. It's best to stop hogging the
244+
// reactor and give other threads a chance to process I/O events for
245+
// themselves.
246+
drop(reactor_lock);
247+
248+
// Unpark the "async-io" thread in case no other thread is ready to start
249+
// processing I/O events. This way we prevent a potential latency spike.
250+
unparker().unpark();
251+
252+
// Wait for a notification.
253+
p.park();
254+
break;
255+
}
221256
}
257+
} else {
258+
// Wait for an actual notification.
259+
tracing::trace!("sleep until notification");
260+
p.park();
222261
}
223-
} else {
224-
// Wait for an actual notification.
225-
tracing::trace!("sleep until notification");
226-
p.park();
227262
}
228-
}
263+
})
229264
}
230265

231266
/// Runs a closure when dropped.

0 commit comments

Comments
 (0)