The last devlog saw a large refactor of the entity activity system, to make use of
Rust's elegant async
/await
language features, instead of badly implementing them by hand.
The main challenge here is implementing a custom async runtime that drives futures in a way that integrates into a game engine's architecture. This post goes into the details of my implementation and the design decisions behind it.
Some definitions
There are some terms and concepts to define early on, to ensure this post doesn't end up a thick jargony mess.
-
Future: a computation that completes asynchronously. In the context of the game this means its execution takes place over many game ticks.
-
Activity: a living entity's current behaviour, such as "go here" or "go pick up that item". Each instance of an activity is represented by a future, as these behaviours take time to complete (as covered in a previous devlog).
-
Task: a reference-counted allocation that holds a future's state while it executes, and its result (success, cancelled, failed, etc) when it finishes. A task is ready when it has work it can do (such as searching for an item to pick up), or otherwise pending when it is waiting for an event until it can make progress (such as notification that the item was picked up successfully).1
-
Runtime: central coordinator and executor of tasks, responsible for polling those that are ready each game tick. Unlike normal async executors, it does not spawn any worker threads for computation, but rather polls futures in serial on the main game thread. This is what allows us to use
Rc
s instead of marginally more expensiveArc
s.
These concepts can be summarised by the following Rust types, which are loosely based on reality. By the end of this post these should be clear.
struct Runtime(Rc<RefCell<RuntimeInner>>);
struct RuntimeInner {
ready_tasks: Vec<WeakTaskRef>,
..
}
type TaskOutput = Result<(), Box<dyn Error>>;
impl Runtime {
fn spawn(&self, future: impl Future<Output=TaskOutput>) -> TaskRef { ... }
fn tick(&self) { ... }
fn mark_ready(&self, task: TaskRef) { ... }
}
struct TaskRef(std::rc::Rc<Task>);
struct WeakTaskRef(std::rc::Weak<Task>);
struct Task {
runtime: Runtime,
future: TaskFuture,
event_sink: VecDeque<EntityEvent>,
..
}
enum TaskFuture {
Running(futures::future::LocalBoxFuture<'static, TaskOutput>>),
Done(TaskOutput),
Cancelled,
}
impl TaskRef {
fn poll(&self) { ... }
fn mark_ready(&self) { ... }
fn cancel(&self) { ... }
}
/// Each living entity has one of these
struct ActivityComponent {
current: TaskRef,
..
}
Custom futures executor
As we've seen before, writing activities with futures is nice and ergonomic, but they need to be polled in order to be of any actual use. This requires a custom executor to be integrated into the game engine.
The hardest bit was getting my head around the concepts of a runtime, tasks and futures, and how they relate to each other in terms of ownership. The official async book was really helpful here.
Task references
The core concept for a runtime is tasks - where do they live, and who owns them?
I showed above in the type definitions that it's typically TaskRef
s and WeakTaskRef
s
(reference-counted immutable Task
s, strong and weak respectively) that are passed around rather than Task
s
themselves.
Runtimes normally need to use Arc
s and some kind of locking here because they conventionally
spread tasks across multiple threads. In this specialised single-threaded runtime we can skip the
costs of atomics and use Rc
s and RefCell
s, but in order to do that we need to convince the
compiler with a few incantations.
// everything will run on the main thread
unsafe impl Send for TaskRef {}
unsafe impl Sync for TaskRef {}
unsafe impl Send for WeakTaskRef {}
unsafe impl Sync for WeakTaskRef {}
Whether we use a strong or weak task reference depends on its usage, of which there are several:
In the agent
This is the main use for a strong, owned task reference, because conceptually the agent owns the
task for as long as it is doing the behaviour. As the game engine follows the ECS
architecture, this is stored in the
ActivityComponent
.
When it changes its mind and starts a new activity, dropping the old one should also drop the
underlying Task
and future, regardless of any other references.
In the future itself
As we'll see below, it takes a TaskRef
in order to subscribe to events and timers. As these
requests need to be issued from within an activity (and therefore within a task), the activity needs
a reference to its owning task.
Rc
s make it easy to create cyclic references like this, but you need to be careful of how they are
dropped, otherwise you can cause memory leaks where the strong reference count can never drop to 0. We'll see later
how this isn't a problem for this situation.
Runtime::spawn
Due to the requirement that all tasks unconditionally need a self reference, the runtime's spawn
method makes use of a channel to pass it through to the future.
impl Runtime {
pub fn spawn(
&mut self,
gimme_task_ref: futures::channel::oneshot::Sender<TaskRef>,
future: impl Future<Output = BoxedResult<()>> + 'static,
) -> TaskRef {
// allocate task ref
let task_ref = TaskRef(Rc::new(Task {
future: RefCell::new(TaskFuture::Running(future.boxed_local())),
..
}));
// send a strong copy to the future for cyclic reference
let _ = gimme_task_ref.send(task_ref.clone());
// task is ready immediately
runtime.ready.push(task_ref.weak());
task
}
}
// elsewhere, when starting a new activity
let (taskref_tx, taskref_rx) = futures::channel::oneshot::channel();
let task_ref = runtime.spawn(taskref_tx, async move {
// recv task ref from runtime
let task = taskref_rx.await.unwrap(); // sent unconditionally
// create context for activity
let ctx = ActivityContext::new(
task,
..
);
// execute the activity to completion
new_activity.dew_it(&ctx).await
});
The runtime takes the sending half of a one-shot channel, and sends the newly created task reference
down immediately before the future has yet had a chance to polled. In futures
' current
implementation this
stores the TaskRef
in an Arc
.
A little later when the runtime polls all ready tasks, the task reference is read from the
channel/Arc
and is stored in an ActivityContext
. With this the activity can use events and timers.
In the runtime
The main responsibility of the runtime is to poll tasks that are ready for execution. The reasons a task might be ready are that it's just been created, a timer has elapsed or an event has arrived.
A weak reference to the task is stored in the ready queue, which is processed each game tick. Only a weak reference is needed here because there is no need to poll a task if the executing agent has dropped the single owning reference; we don't care to make any more progress on it2.
In a scheduled timer
A timer is also associated with a weak task reference. When it elapses after the specified number of game ticks, the task is marked as ready by pushing it onto the runtime's ready queue if the strong reference is still alive.
Polling tasks
As we've seen, the runtime holds a list of ready tasks. Each game tick we call Runtime::tick
to
poll these futures on the main game thread, allowing them to make progress.
The interface for polling a future is defined in the std::future::Future trait, which looks like the following:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
Getting a waker
Something to note here is the Context
reference we need to pass in, but where does this come from?
The docs show it holds a
Waker
which needs a
RawWaker
, which needs a very unsafe
vtable...
Luckily the handy cooked-waker crate makes this very
easy for tasks that use Rust's standard pointer types like Rc
and Arc
. Integrating this with our
task types now only requires some simple
glue
to mark a task as ready in our own runtime, and to convert between raw pointers and TaskRef
s:
impl WakeRef for TaskRef {
fn wake_by_ref(&self) {
self.0.runtime.mark_ready(self);
}
}
impl Wake for TaskRef {}
unsafe impl ViaRawPointer for TaskRef {
type Target = Task;
fn into_raw(self) -> *mut Task {
Rc::into_raw(self.0) as *mut Task
}
unsafe fn from_raw(ptr: *mut Task) -> Self {
Self(Rc::from_raw(ptr as *const Task))
}
}
Runtime references
In the function wake_by_ref
above, our task needs to reach out into the runtime and mark itself as
ready. This implies that each TaskRef
needs to hold a reference to the runtime, which is the main
motivation for the Runtime
struct to be a thin wrapper around a reference-counted RuntimeInner
.
Another key aspect of the Runtime
type is that it holds the inner type in a
RefCell
. This allows for juggling
Rust's strict rules of aliasing XOR mutability at runtime, which is needed when polling ready
futures.
We need to iterate the ready queue while simultaneously appending to it, which requires both
aliasing (iteration) and mutability (appending). Without a RefCell
it wouldn't compile, and with
it we get a panic:
thread 'main' panicked at 'already mutably borrowed: BorrowError
3: std::panicking::begin_panic_handler::{{closure}}
...
8: core::result::Result<T,E>::expect
9: core::cell::RefCell<T>::borrow
...
12: simulation::runtime::runtime::Runtime::mark_ready <---- adding to ready queue
13: <simulation::runtime::runtime::TaskRef as cooked_waker::WakeRef>::wake_by_ref
14: <T as cooked_waker::IntoWaker>::VTABLE::{{closure}}
15: core::ops::function::FnOnce::call_once
16: core::task::wake::Waker::wake_by_ref
17: <simulation::TimerFuture as core::future::future::Future>::poll
18: <simulation::WanderActivity as simulation::Activity>::dew_it::{{closure}}
19: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
20: <core::pin::Pin<P> as core::future::future::Future>::poll
21: <simulation::activity::system::ActivitySystem as shred::system::System>::run::{{closure}}
22: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
23: simulation::runtime::runtime::TaskRef::poll_task
24: simulation::runtime::runtime::Runtime::tick <---- iterating ready queue
...
We can see from the backtrace that a TimerFuture
is waking up during the poll, and is unable to
take a second mutable reference to the runtime.
The workaround here is to move the ready queue out of the runtime in order to drop the mutable reference:
// simplified
impl Runtime {
pub fn tick(&self) {
let mut ready_tasks: Vec<WeakTaskRef> = {
// take a mutable reference to RuntimeInner
let mut inner = self.0.borrow_mut();
// move ready tasks out of runtime so we can
// release the mutable ref
std::mem::take(&mut inner.ready)
// mutable reference is dropped here
};
// the ready queue is now empty
// poll tasks in ready queue, which could invoke
// `wake_by_ref` on the Waker and push themselves
// back onto the ready queue
for task in ready_tasks.drain(..).filter_map(|t| t.upgrade()) {
task.poll_task();
}
}
}
The actual code is more efficient with allocations, but implements the same concept.
Using the runtime
Now that we've seen how the runtime works, let's look into how it's used.
To avoid things staying too abstract, we'll use the wander behaviour from the last devlog as a concrete example, whose implementation is conveniently short.
// pseudo-code
async fn wander_activity(ctx) -> Result<(), _> {
loop {
// walk to a nearby position
let pos = choose_wander_destination();
ctx.go_to(pos).await?;
// loiter for a few ticks
ctx.wait(3).await;
}
}
This shows use of events, timers and the handling of interruptions (or rather how we get it for free).
Events
When we see ctx.goto(pos).await
above, it's instantiating a GoToSubactivity
, which is a
self-contained unit of behaviour (and an implementor of Future
). What it does is as follows:
- Post a new destination request to the navigation system
- Subscribe to
Arrived(Result<_, _>)
events where the subject of the event is this agent- This will either contain
Ok(_)
if the agent arrives successfully, or someErr(_)
if something went wrong
- This will either contain
- Wait as many game ticks as necessary until the event arrives
- This is where the
async
magic happens
- This is where the
- Unsubscribe from any more arrival events, extract the
Result
from the received event and return back up to the calling activity
A major convenience here is how the async
function go_to
hides all details of the
implementation, including the fact that it may take place over many game ticks. The caller does not
need to concern itself with event subscriptions either.
The key part here is how we wait for an event, and get woken up by the runtime when an appropriate one arrives.
impl ActivityContext {
/// Waits until the next event matching this agent's
/// subscription arrives
async fn next_event(&self) -> EntityEvent {
loop {
match self.task.pop_event() {
None => {
// keep waiting until an event marks
// this task as ready again
self.task.park_until_triggered().await;
}
Some(evt) => return evt,
}
}
}
}
impl TaskRef {
/// Returns Some(evt) if one is waiting, otherwise
/// None. Does not block
pub fn pop_event(&self) -> Option<EntityEvent> {
/// self.0.event_sink is a RefCell<VecDeque<EntityEvent>>
self.0.event_sink.borrow_mut().pop_front()
}
}
In
next_event
,
we sit in a loop popping events from the task's event queue. Note that pop_event
is not async, and
returns without blocking.
If there is an event in the queue then
next_event
returns immediately, otherwise it calls task.park_until_triggered
. This uses a slightly unusual future that intentionally does not use the waker passed to it:
pub struct ParkUntilWakeupFuture(ParkState);
#[derive(Copy, Clone)]
enum ParkState {
Unpolled,
Parked,
Complete,
}
impl Future for ParkUntilWakeupFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
match self.0 {
ParkState::Unpolled => {
// first call
self.0 = ParkState::Parked;
// intentionally does use waker, the runtime
// will wake us up
Poll::Pending
}
ParkState::Parked => {
// must have been woken up by the runtime
self.0 = ParkState::Complete;
Poll::Ready(())
}
ParkState::Complete => unreachable!("task has already been unparked"),
}
}
}
When this future is await
'd on, it is effectively doomed forever and can never again make progress -
unless there's a mechanism in the runtime to mark the task as ready, such as the arrival of an
event!
So back to the context of the wandering behaviour; we have subscribed the agent to the Arrived
event and parked ourselves indefinitely. At some point later this event might arrive and get pushed
onto the task's event queue, the task marked as ready by the runtime, and the result passed up to the
subactivity.
Let's assume it was a successful arrival for now, so it's time to loiter for a few moments at our new destination.
Timers
Loitering, also known as standing still doing nothing, is implemented with ctx.wait(n).await
,
where n
is the number of game ticks to wait.
The implementation of timers is out of the scope of this post, but the idea is that we submit a weak task reference and the number of ticks to sleep, and we get back an opaque token that represents our timer instance. The token can be used for cancellation.
impl ActivityContext {
pub fn wait(&self, ticks: u32) -> TimerFuture {
let timers = self.world.resource_mut::<RuntimeTimers>();
let (end_tick, token) = timers.schedule(ticks, self.task.weak());
TimerFuture::new(end_tick, token, self.world) // world ref is used later
}
}
impl Future for TimerFuture<'_> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.elapsed() {
cx.waker().wake_by_ref();
Poll::Ready(())
} else {
Poll::Pending
}
}
}
There's nothing really too complicated in the Future
implementation, just an elapsed check and
call to the waker as all normal futures should. It's rather in the Drop
implementation where
things get exciting.
Interruptions
An activity doesn't always finish gracefully, either due to a more urgent behaviour preempting the current or a subactivity like goto failing. Switching to a new activity causes the old one to be dropped along with all its state.
We can use the destructor of a Future
implementor to neatly clean up following an ungraceful cancellation - let's see how the futures used in wandering make the most of this.
-
Navigating to a destination: if the
GotoSubactivity
future is dropped before it receives anArrived
event and terminates normally, we can send a request to the navigation system to cancel the current journey. -
Waiting for a timer: if a
TimerFuture
is dropped before the timer elapses, the timer can be cancelled in the destructor.
Compare this to the old wander
implementation,
where the loiter timer had to manually clear the path AND cancel the timer in its on_finish
method
- a terrible substitute for a destructor.
Conclusion
We've seen what makes up a runtime, how tasks are allocated, used and interrupted, and how they are used to implement behaviours. I hope this breakdown helps others to make use of Rust's fantastic async features in their own games and engines.
If you'd like to see how activities more complex than wandering make use of subactivities, have a look at:
- All subactivities, such as "eat", "break block" and "haul"
- All activities, which make use of subactivities
-
This is the same concept as in devlog 4, except now we make use of Rust's future types for ready/pending instead of manually implementing the active/blocked lifecycle. The reasons for doing so are documented in devlog 6. ↩
-
I found while writing this post that the runtime was in fact holding strong task references. Thank you for being a rubber duck! ↩