Chapter 4 - Our first async runtime
As we saw in previous sections, our cooperative tasks boil down to a type that implements Future and
must be repeatedly polled in order to finish. We also have the Context and Waker system
that allows the runtime to sleep if there's currently nothing to do.
If we want our async program to run, then it needs to bridge between async poll-based functions, and non-async blocking functions. Let's look at tokio for example:
#[tokio::main] async fn main() { foo().await; }
If you expand the tokio::main, macro, you will see:
fn main() { let body = async { foo().await; }; #[allow(clippy::expect_used, clippy::diverging_sub_expression)] { return tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("Failed building the Runtime") .block_on(body); } }
The key thing to look out for here is this block_on function. It takes the form of
#![allow(unused)] fn main() { /// Turns an async function into a blocking function fn block_on<F: Future>(f: F) -> F::Output { todo!() } }
But how would we write this?
Defining the bare-minimum, we need to build something representing
#![allow(unused)] fn main() { /// Turns an async function into a blocking function fn block_on<F: Future>(f: F) -> F::Output { // futures must be pinned to be polled! let mut f = std::pin::pin!(f); loop { let mut cx: Context = todo!(); match f.as_mut().poll(&mut cx) { Poll::Ready(r) => break r, Poll::Pending => continue, } } } }
But there's two outstanding problems here.
- How do we build our
Context? - How do we allow the thread to sleep while idle
Let's tackle the first problem. There's a bit of a chain we will need to take.
To construct a Context, we can provider a &Waker to the Context::from_waker() function.
So, how do we construct a Waker? Conveniently, there's a impl<W: Wake> From<Arc<W>> for Waker,
so assuming we have some Arc<impl Wake>, we can construct a Waker and thus a Context.
Let's see the Wake trait:
#![allow(unused)] fn main() { pub trait Wake { // Required method fn wake(self: Arc<Self>); } }
All we need to provide is a wake function. Convenient!
For now, let's define it as a no-op. We will take it in the next step.
#![allow(unused)] fn main() { struct SimpleWaker {} impl Wake for SimpleWaker { fn wake(self: Arc<Self>) {} } }
Now, we need to construct the waker and context in our block_on function.
#![allow(unused)] fn main() { /// Turns an async function into a blocking function fn block_on<F: Future>(f: F) -> F::Output { // futures must be pinned to be polled! let mut f = std::pin::pin!(f); let root_waker_state = Arc::new(SimpleWaker {}); let root_waker = Waker::from(root_waker_state); loop { let mut cx = Context::from_waker(&root_waker); match f.as_mut().poll(&mut cx) { Poll::Ready(r) => break r, Poll::Pending => continue, } } } }
And now our code should run. Try it!
The only problem is that it uses 100% CPU while it's running. Not great!
Now, how do we tackle the issue of allowing our mini-runtime to sleep.
The Condvar type in the standard library offers a powerful primitive to
allow one thread to Condvar::wait, and then wake up when another thread runs Condvar::notify_one.
Let's introduce a new Runtime struct to contain some useful state.
#![allow(unused)] fn main() { struct Runtime { park: Condvar, worker: Mutex<Worker>, } /// Tracks a single runtime worker state. /// Currently we only have 1 worker in our runtime. struct Worker {} }
Then we should update our waker accordingly
#![allow(unused)] fn main() { struct SimpleWaker { runtime: Arc<Runtime>, } impl Wake for SimpleWaker { fn wake(self: Arc<Self>) { // notify the main thread self.park.notify_one(); } } }
Finally, we need to update our poll-loop to wait on the condvar:
#![allow(unused)] fn main() { /// Turns an async function into a blocking function fn block_on<F: Future>(f: F) -> F::Output { // futures must be pinned to be polled! let mut f = std::pin::pin!(f); // create our runtime state let runtime = Arc::new(Runtime { park: Condvar::new(), worker: Mutex::new(Worker {}), }); let root_waker_state = Arc::new(SimpleWaker { runtime: Arc::clone(&runtime), }); let root_waker = Waker::from(root_waker_state); loop { let mut cx = Context::from_waker(&root_waker); match f.as_mut().poll(&mut cx) { Poll::Ready(r) => break r, Poll::Pending => { // park until later let mut worker = runtime.worker.lock().unwrap(); worker = runtime.park.wait(worker); continue; }, } } } }
Theoretically this should all work fine. However, there's a few low-hanging fruits for efficiency.
Should we get a lot of wake() calls, they will all be hitting the Condvar::notify_one even if the worker thread is active.
This is not ideal. Additionally, Condvar::wait is susceptible to 'spurious' wake ups, meaning it can wake up even
if not explicitly notified.
There is also an unfortunate race condition we need to handle where if the task wakes itself up, we don't want to park at all!
Let's update the code one more time to introduce a WorkerState.
#![allow(unused)] fn main() { struct Worker { state: WorkerState, } #[derive(PartialEq)] enum WorkerState { /// Is the worker thread currently running a task Running, /// Is the worker thread ready to continue Ready, /// Is the worker thread parked Parked, } }
Next, let's update the waker to action on these states:
#![allow(unused)] fn main() { impl Wake for SimpleWaker { fn wake(self: Arc<Self>) { let mut worker = self.worker.lock().unwrap(); if worker.state == WorkerState::Parked { // notify the main parked thread self.park.notify_one(); } // announce there is a task ready. worker.state = WorkerState::Ready; } } }
And finally, let's make sure we keep track of the state while in our loop.
#![allow(unused)] fn main() { /// Turns an async function into a blocking function fn block_on<F: Future>(f: F) -> F::Output { // futures must be pinned to be polled! let mut f = std::pin::pin!(f); // create our runtime state let runtime = Arc::new(Runtime { park: Condvar::new(), worker: Mutex::new(Worker { // we start in the running state. state: WorkerState::Running, }), }); let root_waker_state = Arc::new(SimpleWaker { runtime: Arc::clone(&runtime), }); let root_waker = Waker::from(root_waker_state); loop { let mut cx = Context::from_waker(&root_waker); match f.as_mut().poll(&mut cx) { Poll::Ready(r) => break r, Poll::Pending => { let mut worker = runtime.worker.lock().unwrap(); while worker.state != WorkerState::Ready { // park until we are ready later worker.state = WorkerState::Parked; worker = runtime.park.wait(worker); } // resume the loop and mark as running again worker.state = WorkerState::Running; continue; }, } } } }
And just like that, a fully functional single-task runtime that correctly sleeps when not active.