Rust async/await: Async executor
This article is a continuation of the previous articles on Rust async/await. If you haven't read them yet, I recommend you read them first:
- Desugaring and assembly of async/await in Rust -
goto - Nested async/await in Rust: Desugaring and assembly -
patrol
This article will teach us how the async executor schedules async tasks. We will see how a simple async executor works, and how it is implemented in Rust. We will analyze async scheduling of the goto and patrol functions from the previous articles.
Note: The code in this article is taken from a fork of the simple async local executor.
goto and patrol functions
To recap, here are the goto and patrol functions from the previous articles:
#[derive(Default)]
struct Unit {
/// The 1-D position of the unit. The unit can only move along this axis.
pub pos: i32,
}
type UnitRef = Rc<RefCell<Unit>>;
/// A future that will move the unit towards `target_pos` at each step,
/// and complete when the unit has reached that position.
struct UnitGotoFuture {
unit: UnitRef,
target_pos: i32,
}
impl Future for UnitGotoFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let unit_pos = self.unit.borrow().pos;
if unit_pos == self.target_pos {
Poll::Ready(())
} else {
self.unit.borrow_mut().pos += (self.target_pos - unit_pos).signum();
Poll::Pending
}
}
}
/// Helper async function to write unit behavior nicely
async fn goto(unit: UnitRef, pos: i32) {
UnitGotoFuture {
unit,
target_pos: pos,
}
.await;
}
/// Let a unit go back and forth between two positions
async fn patrol(unit: UnitRef, poses: [i32; 2]) {
loop {
goto(unit.clone(), poses[0]).await;
goto(unit.clone(), poses[1]).await;
}
}
Async executor
An async executor is a piece of code that runs async tasks. It is responsible for scheduling the tasks and waking them up when they are ready to progress.
An async program's main function creates the async tasks and then calls the executor to run the tasks. The executor will keep running the tasks until all tasks are completed. In our example, we simulate the executor by calling the Executor::step() method in a loop. We limit the number of steps to 30 to run the example in the Rust playground.
/// Test program with two units
fn main() {
let executor = Executor::default();
let units: [UnitRef; 2] = Default::default();
executor.spawn(patrol(units[0].clone(), [-5, 5]));
// executor.spawn(patrol(units[1].clone(), [-1,1]));
for _ in 0..30 {
executor.step();
}
}
The flow chart of the Executor::step() method is shown below. The Executor::step() method is the crucial method of the executor. It is responsible for scheduling the tasks. When the function starts, it will look for any new spawned tasks and add them to the task_queue. Then, it will loop through the task_queue and execute the tasks. A task will be added to the pending_tasks list if it is not ready. The pending_tasks list will be used for the next step. The Executor::step() method returns true if there are still tasks to run.
The Executor::step() method is implemented as follows:
pub fn step(&self) -> bool {
// Dummy waker and context (not used as we poll all tasks)
let waker = dummy_waker();
let mut context = Context::from_waker(&waker);
// Append new tasks created since the last step into the task queue
let mut task_queue = self.inner.task_queue.borrow_mut();
task_queue.append(&mut self.inner.new_tasks.borrow_mut());
// Loop over all tasks, polling them. If a task is not ready, add it to the pending tasks.
let mut pending_tasks = Vec::new();
let mut any_left = false;
for mut task in task_queue.drain(..) {
match task.poll(&mut context) {
Poll::Ready(()) => {} // task done
Poll::Pending => {
pending_tasks.push(task);
any_left = true;
}
}
}
// Keep pending tasks for the next step
*task_queue = pending_tasks;
any_left // Return true if there are still tasks to run
}
Understanding the async executor flow
Now that we have seen how the async executor works let's see how the executor schedules the goto and patrol functions. We will use the following sequence diagrams to understand how the executor schedules the async tasks.
The complete flow is split into the following parts:
Spawn an async task
The following sequence diagram shows how an executor spawns a task and how the task is executed.
- The executor is created.
- The executor returns to
main. - The
maincreates thepatrol_closureand captures theunit[0]and[5, -5]in the closure environment. The initial state of the closure is set to 🚀Patrol::Start(0). - The executor creates a new task from the closure and stores it in the
new_taskslist. - The executor returns to
main.
Start executing async tasks
Once the task is spawned, the executor will keep running the async runtime if any events are scheduled. In our example, we call it Executor::step() and put it in a loop. Note that we are limiting the number of steps to 30 to run the example in the Rust playground.
-
maincallsExecutor::step()to execute the first task. * Theexecutorlooks for any new tasks that are spawned and adds them to thetask_queue. -
The
executorloops through thetask_queueand executes the tasks. This scenario finds only one task in thetask_queue. It calls thepollmethod on the task. -
The task scheduled here contains the
patrol_closure. The task involves thepollmethod of thepatrol_closure. -
The
patrol_closureis in the 🚀Patrol::Startstate. In this state, the closure starts at the beginning of the function. Thegoto_closureis created by capturing theunitandposes[0]in the closure environment. The initial state of the closure is set to 🚀Goto::Start.async fn patrol(unit: UnitRef, poses: [i32; 2]) { loop { ➡️ goto(unit.clone(), poses[0]).await; goto(unit.clone(), poses[1]).await; } } -
The
patrol_closurepolls thegoto_closureto find if the first position specified inposes[0]has been reached. Thepollmethod of thegoto_closureresults from the.awaitmarked with the ➡️ arrow in the above code snippet. -
The
goto_closurecreates aUnitGotoFuture. -
The
goto_closurepolls theUnitGotoFuture. Thepollmethod of theUnitGotoFuturechecks if theunithas reached thepos. In this case, theunithas not reached theposyet. -
The
pollmethod returnsPoll::Pendingto thegoto_closure. Thegoto_closuresets its state to 🕓Goto::Waiting. -
The
goto_closurereturnsPoll::Pendingto thepatrol_closure. Thepatrol_closuresets its state to 🕓Patrol::WaitingToReachPosition0. -
The
patrol_closurereturnsPoll::Pendingto thetask. -
The
taskreturnsPoll::Pendingto theexecutor. -
Since the
taskis not ready, theexecutoradds it to thepending_taskslist to be scheduled later. Theexecutorthen returns to the caller ofExecutor::step().
Continue execution of async tasks
The executor runs the async runtime if any events are scheduled. In our example, we call it Executor::step() and put it in a loop. At each step, polling of the goto_closure moves the unit closer to the pos. Now we will see how the goto_closure returns Poll::Ready to the patrol_closure when the unit reaches the pos. We start at step 6 in the following sequence diagram.
-
Once the
unitreaches thepos, thegoto_closurereturnsPoll::Readyto thepatrol_closure. Thepatrol_closurethen polls thegoto_closureagain. The ➡️ shows the.awaitpoint in thepatrol_closure.async fn patrol(unit: UnitRef, poses: [i32; 2]) { loop { ➡️ goto(unit.clone(), poses[0]).await; goto(unit.clone(), poses[1]).await; } } -
The
UnitGotoFuturereturnsPoll::Readyto thegoto_closure. -
The
goto_closurereturnsPoll::Readyto thepatrol_closure. Thegoto_closuresets its state to ✅Goto::Done. -
The
Poll::Readyreturn helps us advance the execution until we hit the next.awaitpoint (➡️ in the following code snippet). Thegoto_closurecaptures theunitandposes[1]in the closure environment. The initial state of the closure is set to 🚀Goto::Start.async fn patrol(unit: UnitRef, poses: [i32; 2]) { loop { goto(unit.clone(), poses[0]).await; ➡️ goto(unit.clone(), poses[1]).await; } } -
The
patrol_closurethen triggers the nextpollto move theunitto the nextpos.
The execution of the patrol_closure is now at the next .await point. The patrol_closure is now in the state 🕓 Patrol::WaitingToReachPosition1. The patrol_closure returns Poll::Pending to the task. The task returns Poll::Pending to the executor. The executor adds the task to the pending_tasks list to be scheduled later—the executor returns to the Executor::step() caller.
The patrol function has an infinite loop, so returning Poll::Ready from the goto_closure does not terminate the patrol_closure. The patrol_closure will continue to poll the goto_closure until the unit reaches the next pos.
Key takeaways
-
Spawning an async task has nothing to do with the operating system. It is just a closure that is scheduled to be executed by the executor.
-
The
awaitkeyword is used to suspend the execution of the async task until the future is ready. Theawaitkeyword is used to poll the future. -
The executor calls the
pollmethod of the future. The method returnsPoll::Pendingif the future is not ready. It returnsPoll::Readyif the future is ready. -
If
awaitis called inside an infinite loop, the function will never returnPoll::Readyas it continues in the loop. Even a function with an infinite loop will returnPoll::Pendingif the future is not ready.
Experiment in the Rust playground
We have created a Rust playground to experiment with the code. You can modify the code and run it to see the trace output.
-
Scroll to the end of the code in the Rust playground and uncomment the
executor.spawn(patrol(units[1].clone(), [-1,1]));line to spawn the second async task. Click on theRunbutton on the top left to run the code.pub fn main() { let executor = Executor::default(); let units: [UnitRef; 2] = Default::default(); executor.spawn(patrol(units[0].clone(), [-5, 5])); // executor.spawn(patrol(units[1].clone(), [-1,1])); -
Click the
...button in the top left corner of the Rust playground to look at the generated assembly for themainfunction.
Articles in the async/await series
- Desugaring and assembly of async/await in Rust -
goto - Nested async/await in Rust: Desugaring and assembly -
patrol - Rust async executor -
executor