Dom Williams

Devlog #7: custom async runtime

The last devlog saw a large refactor of the entity activity system, to make use of Rust's elegant async/await language features, instead of badly implementing them by hand.

The main challenge here is implementing a custom async runtime that drives futures in a way that integrates into a game engine's architecture. This post goes into the details of my implementation and the design decisions behind it.

Some definitions

There are some terms and concepts to define early on, to ensure this post doesn't end up a thick jargony mess.

These concepts can be summarised by the following Rust types, which are loosely based on reality. By the end of this post these should be clear.

struct Runtime(Rc<RefCell<RuntimeInner>>);
struct RuntimeInner {
    ready_tasks: Vec<WeakTaskRef>,
    ..
}

type TaskOutput = Result<(), Box<dyn Error>>;

impl Runtime {
    fn spawn(&self, future: impl Future<Output=TaskOutput>) -> TaskRef { ... }
    fn tick(&self) { ... }
    fn mark_ready(&self, task: TaskRef) { ... }
}

struct TaskRef(std::rc::Rc<Task>);
struct WeakTaskRef(std::rc::Weak<Task>);

struct Task {
    runtime: Runtime,
    future: TaskFuture,
    event_sink: VecDeque<EntityEvent>,
    ..
}

enum TaskFuture {
    Running(futures::future::LocalBoxFuture<'static, TaskOutput>>),
    Done(TaskOutput),
    Cancelled,
}

impl TaskRef {
    fn poll(&self) { ... }
    fn mark_ready(&self) { ... }
    fn cancel(&self) { ... }
}

/// Each living entity has one of these
struct ActivityComponent {
    current: TaskRef,
    ..
}

Custom futures executor

As we've seen before, writing activities with futures is nice and ergonomic, but they need to be polled in order to be of any actual use. This requires a custom executor to be integrated into the game engine.

The hardest bit was getting my head around the concepts of a runtime, tasks and futures, and how they relate to each other in terms of ownership. The official async book was really helpful here.

Task references

The core concept for a runtime is tasks - where do they live, and who owns them?

I showed above in the type definitions that it's typically TaskRefs and WeakTaskRefs (reference-counted immutable Tasks, strong and weak respectively) that are passed around rather than Tasks themselves.

Runtimes normally need to use Arcs and some kind of locking here because they conventionally spread tasks across multiple threads. In this specialised single-threaded runtime we can skip the costs of atomics and use Rcs and RefCells, but in order to do that we need to convince the compiler with a few incantations.

// everything will run on the main thread
unsafe impl Send for TaskRef {}
unsafe impl Sync for TaskRef {}
unsafe impl Send for WeakTaskRef {}
unsafe impl Sync for WeakTaskRef {}

Whether we use a strong or weak task reference depends on its usage, of which there are several:

In the agent

This is the main use for a strong, owned task reference, because conceptually the agent owns the task for as long as it is doing the behaviour. As the game engine follows the ECS architecture, this is stored in the ActivityComponent.

When it changes its mind and starts a new activity, dropping the old one should also drop the underlying Task and future, regardless of any other references.

In the future itself

As we'll see below, it takes a TaskRef in order to subscribe to events and timers. As these requests need to be issued from within an activity (and therefore within a task), the activity needs a reference to its owning task.

Rcs make it easy to create cyclic references like this, but you need to be careful of how they are dropped, otherwise you can cause memory leaks where the strong reference count can never drop to 0. We'll see later how this isn't a problem for this situation.

Runtime::spawn

Due to the requirement that all tasks unconditionally need a self reference, the runtime's spawn method makes use of a channel to pass it through to the future.

impl Runtime {
    pub fn spawn(
        &mut self,
        gimme_task_ref: futures::channel::oneshot::Sender<TaskRef>,
        future: impl Future<Output = BoxedResult<()>> + 'static,
    ) -> TaskRef {
        // allocate task ref
        let task_ref = TaskRef(Rc::new(Task {
            future: RefCell::new(TaskFuture::Running(future.boxed_local())),
            ..
        }));

        // send a strong copy to the future for cyclic reference
        let _ = gimme_task_ref.send(task_ref.clone());

        // task is ready immediately
        runtime.ready.push(task_ref.weak());
        task
    }
}

// elsewhere, when starting a new activity
let (taskref_tx, taskref_rx) = futures::channel::oneshot::channel();
let task_ref = runtime.spawn(taskref_tx, async move {
    // recv task ref from runtime
    let task = taskref_rx.await.unwrap(); // sent unconditionally

    // create context for activity
    let ctx = ActivityContext::new(
        task,
        ..
    );

    // execute the activity to completion
    new_activity.dew_it(&ctx).await
});

The runtime takes the sending half of a one-shot channel, and sends the newly created task reference down immediately before the future has yet had a chance to polled. In futures' current implementation this stores the TaskRef in an Arc.

A little later when the runtime polls all ready tasks, the task reference is read from the channel/Arc and is stored in an ActivityContext. With this the activity can use events and timers.

In the runtime

The main responsibility of the runtime is to poll tasks that are ready for execution. The reasons a task might be ready are that it's just been created, a timer has elapsed or an event has arrived.

A weak reference to the task is stored in the ready queue, which is processed each game tick. Only a weak reference is needed here because there is no need to poll a task if the executing agent has dropped the single owning reference; we don't care to make any more progress on it2.

In a scheduled timer

A timer is also associated with a weak task reference. When it elapses after the specified number of game ticks, the task is marked as ready by pushing it onto the runtime's ready queue if the strong reference is still alive.

Polling tasks

As we've seen, the runtime holds a list of ready tasks. Each game tick we call Runtime::tick to poll these futures on the main game thread, allowing them to make progress.

The interface for polling a future is defined in the std::future::Future trait, which looks like the following:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Getting a waker

Something to note here is the Context reference we need to pass in, but where does this come from? The docs show it holds a Waker which needs a RawWaker, which needs a very unsafe vtable...

Luckily the handy cooked-waker crate makes this very easy for tasks that use Rust's standard pointer types like Rc and Arc. Integrating this with our task types now only requires some simple glue to mark a task as ready in our own runtime, and to convert between raw pointers and TaskRefs:

impl WakeRef for TaskRef {
    fn wake_by_ref(&self) {
        self.0.runtime.mark_ready(self);
    }
}

impl Wake for TaskRef {}

unsafe impl ViaRawPointer for TaskRef {
    type Target = Task;

    fn into_raw(self) -> *mut Task {
        Rc::into_raw(self.0) as *mut Task
    }

    unsafe fn from_raw(ptr: *mut Task) -> Self {
        Self(Rc::from_raw(ptr as *const Task))
    }
}

Runtime references

In the function wake_by_ref above, our task needs to reach out into the runtime and mark itself as ready. This implies that each TaskRef needs to hold a reference to the runtime, which is the main motivation for the Runtime struct to be a thin wrapper around a reference-counted RuntimeInner.

Another key aspect of the Runtime type is that it holds the inner type in a RefCell. This allows for juggling Rust's strict rules of aliasing XOR mutability at runtime, which is needed when polling ready futures.

We need to iterate the ready queue while simultaneously appending to it, which requires both aliasing (iteration) and mutability (appending). Without a RefCell it wouldn't compile, and with it we get a panic:

thread 'main' panicked at 'already mutably borrowed: BorrowError
   3: std::panicking::begin_panic_handler::{{closure}}
      ...
   8: core::result::Result<T,E>::expect
   9: core::cell::RefCell<T>::borrow
      ...
  12: simulation::runtime::runtime::Runtime::mark_ready    <---- adding to ready queue
  13: <simulation::runtime::runtime::TaskRef as cooked_waker::WakeRef>::wake_by_ref
  14: <T as cooked_waker::IntoWaker>::VTABLE::{{closure}}
  15: core::ops::function::FnOnce::call_once
  16: core::task::wake::Waker::wake_by_ref
  17: <simulation::TimerFuture as core::future::future::Future>::poll
  18: <simulation::WanderActivity as simulation::Activity>::dew_it::{{closure}}
  19: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
  20: <core::pin::Pin<P> as core::future::future::Future>::poll
  21: <simulation::activity::system::ActivitySystem as shred::system::System>::run::{{closure}}
  22: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
  23: simulation::runtime::runtime::TaskRef::poll_task
  24: simulation::runtime::runtime::Runtime::tick          <---- iterating ready queue
      ...

We can see from the backtrace that a TimerFuture is waking up during the poll, and is unable to take a second mutable reference to the runtime.

The workaround here is to move the ready queue out of the runtime in order to drop the mutable reference:

// simplified
impl Runtime {
    pub fn tick(&self) {
        let mut ready_tasks: Vec<WeakTaskRef> = {
            // take a mutable reference to RuntimeInner
            let mut inner = self.0.borrow_mut();

            // move ready tasks out of runtime so we can
            // release the mutable ref
            std::mem::take(&mut inner.ready)

            // mutable reference is dropped here
        };

        // the ready queue is now empty

        // poll tasks in ready queue, which could invoke
        // `wake_by_ref` on the Waker and push themselves
        // back onto the ready queue
        for task in ready_tasks.drain(..).filter_map(|t| t.upgrade()) {
            task.poll_task();
        }
    }
}

The actual code is more efficient with allocations, but implements the same concept.

Using the runtime

Now that we've seen how the runtime works, let's look into how it's used.

To avoid things staying too abstract, we'll use the wander behaviour from the last devlog as a concrete example, whose implementation is conveniently short.

// pseudo-code
async fn wander_activity(ctx) -> Result<(), _> {
    loop {
        // walk to a nearby position
        let pos = choose_wander_destination();
        ctx.go_to(pos).await?;

        // loiter for a few ticks
        ctx.wait(3).await;
    }
}

This shows use of events, timers and the handling of interruptions (or rather how we get it for free).

Events

When we see ctx.goto(pos).await above, it's instantiating a GoToSubactivity, which is a self-contained unit of behaviour (and an implementor of Future). What it does is as follows:

A major convenience here is how the async function go_to hides all details of the implementation, including the fact that it may take place over many game ticks. The caller does not need to concern itself with event subscriptions either.

The key part here is how we wait for an event, and get woken up by the runtime when an appropriate one arrives.

impl ActivityContext {
  /// Waits until the next event matching this agent's
  /// subscription arrives
  async fn next_event(&self) -> EntityEvent {
        loop {
            match self.task.pop_event() {
                None => {
                    // keep waiting until an event marks
                    // this task as ready again
                    self.task.park_until_triggered().await;
                }
                Some(evt) => return evt,
            }
        }
    }
}

impl TaskRef {
    /// Returns Some(evt) if one is waiting, otherwise
    /// None. Does not block
    pub fn pop_event(&self) -> Option<EntityEvent> {
        /// self.0.event_sink is a RefCell<VecDeque<EntityEvent>>
        self.0.event_sink.borrow_mut().pop_front()
    }
}

In next_event, we sit in a loop popping events from the task's event queue. Note that pop_event is not async, and returns without blocking.

If there is an event in the queue then next_event returns immediately, otherwise it calls task.park_until_triggered. This uses a slightly unusual future that intentionally does not use the waker passed to it:

pub struct ParkUntilWakeupFuture(ParkState);

#[derive(Copy, Clone)]
enum ParkState {
    Unpolled,
    Parked,
    Complete,
}

impl Future for ParkUntilWakeupFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
        match self.0 {
            ParkState::Unpolled => {
                // first call
                self.0 = ParkState::Parked;

                // intentionally does use waker, the runtime
                // will wake us up
                Poll::Pending
            }
            ParkState::Parked => {
                // must have been woken up by the runtime
                self.0 = ParkState::Complete;
                Poll::Ready(())
            }
            ParkState::Complete => unreachable!("task has already been unparked"),
        }
    }
}

When this future is await'd on, it is effectively doomed forever and can never again make progress - unless there's a mechanism in the runtime to mark the task as ready, such as the arrival of an event!

So back to the context of the wandering behaviour; we have subscribed the agent to the Arrived event and parked ourselves indefinitely. At some point later this event might arrive and get pushed onto the task's event queue, the task marked as ready by the runtime, and the result passed up to the subactivity.

Let's assume it was a successful arrival for now, so it's time to loiter for a few moments at our new destination.

Timers

Loitering, also known as standing still doing nothing, is implemented with ctx.wait(n).await, where n is the number of game ticks to wait.

The implementation of timers is out of the scope of this post, but the idea is that we submit a weak task reference and the number of ticks to sleep, and we get back an opaque token that represents our timer instance. The token can be used for cancellation.

impl ActivityContext {
    pub fn wait(&self, ticks: u32) -> TimerFuture {
        let timers = self.world.resource_mut::<RuntimeTimers>();
        let (end_tick, token) = timers.schedule(ticks, self.task.weak());
        TimerFuture::new(end_tick, token, self.world) // world ref is used later
    }
}

impl Future for TimerFuture<'_> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.elapsed() {
            cx.waker().wake_by_ref();
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

There's nothing really too complicated in the Future implementation, just an elapsed check and call to the waker as all normal futures should. It's rather in the Drop implementation where things get exciting.

Interruptions

An activity doesn't always finish gracefully, either due to a more urgent behaviour preempting the current or a subactivity like goto failing. Switching to a new activity causes the old one to be dropped along with all its state.

We can use the destructor of a Future implementor to neatly clean up following an ungraceful cancellation - let's see how the futures used in wandering make the most of this.

Compare this to the old wander implementation, where the loiter timer had to manually clear the path AND cancel the timer in its on_finish method - a terrible substitute for a destructor.

Conclusion

We've seen what makes up a runtime, how tasks are allocated, used and interrupted, and how they are used to implement behaviours. I hope this breakdown helps others to make use of Rust's fantastic async features in their own games and engines.

If you'd like to see how activities more complex than wandering make use of subactivities, have a look at:


  1. This is the same concept as in devlog 4, except now we make use of Rust's future types for ready/pending instead of manually implementing the active/blocked lifecycle. The reasons for doing so are documented in devlog 6

  2. I found while writing this post that the runtime was in fact holding strong task references. Thank you for being a rubber duck!