python, async  -  2025-01-21

Creating a tiny async framework

Async programming has stirred the Python community in the past years. Some people adore the performance and elegant code, others hate how it requires all your code to be async. This post looks into what async does by creating a tiny async framework.

Grok visualizing async
Image by Grok

Why async

The core purpose of async is to deal with cases where the program execution has to wait for some reason. For instance when it's writing a file to disk, or downloading a file from the internet.

The classic approach is simply to ... wait. The program is then doing nothing until the file system or network has done what the program needs it to do before it can proceed. That's a waste of CPU cycles.

One approach to solve this is using callbacks. When you write the command to download a file, you also provide a function that will be called when the file has arrived. Older JavaScript is a good example:

fetch(url, ...).then(response => response.text()).then( text => console.log(text))

This works, and the then helps make this quite readable, but as applications get more complex, these things tend to become more messy and lead to something called "callback hell".

The modern way to write the above is to use async, which is much easier to follow:

response = await fetch(url, ...)
text = await response.text()
console.log(text)

That await keyword means that the program flow will wait there, except in such a way that other stuff can run in the mean time.

And this is the main point of aync: it allows you to define multiple tasks, so that when one task is waiting, the others can run.

Aync in Python

Have a look at this simple Python function:

def foo():
    do_something()
    do_someting_else()

To make it async we can add one word:

async def foo():
    do_something()
    do_someting_else()

Though the real power comes when there's waiting involved:

async def foo():
    await sleep(1)
    await download_file()

Aync is contagious

You may wonder whether it's possible to simplify the above example like this:

# FAIL: the outer function *must* be async
def foo():
    await sleep(1)
    await download_file()

But we cannot; in order for a function to await something, it must be marked as async. And therefore the parent function must too, and the function before that, etc. You can see how this forces you to make your whole application async.

A mini async framework

So what happens if you call foo()? You get a coroutine object:

coroutine = foo()

Normally you won't use the coroutine object directly; that's the purpose of the async framework, like Asyncio or Trio. But in this post we will, because we're going to build our own mini-async-framework!

A sleep function

To do so, we're first going to create our own little "awaitable" object:

class Sleeper:
    def __init__(self, delay):
        self.delay = delay

    def __await__(self):
        yield {"action": "sleep", "delay": self.delay}

which can be created with our own sleep() function:

async def sleep(delay):
    await Sleeper(delay)

All async frameworks have their own sleep() function. In a minute you'll see why they're not compatible.

That __await__ method on the Sleeper class is a special method that is called when an object is awaited. It must be a generator, but it is not specified what it should yield. In fact, this is the implementation detail of the async framework. E.g. Ayncio will yield different things than Trio. And we yield our own little info object.

A task object

A task (in this context) is a wrapper for a coroutine. It keeps track of the coroutine object, whether it errored, and when it should proceed:

class Task:

    def __init__(self, coro):
        self.coro = coro
        self.error = None
        self.proceed_at = 0

To actually run the coroutine, the coro.send() method is used:

return_value = self.coro.send(None)

The coroutine then takes a single step, and one of three things can happen:

(we ignore task cancellation here for the sake of simplicity.)

The below code handles these cases:

import time

class Task:

    def __init__(self, coro):
        self.coro = coro
        self.error = None
        self.proceed_at = 0

    def step(self):
        try:
            wait_info = self.coro.send(None)
        except StopIteration:
            self.coro = None
            return
        except Exception as err:
            self.error = err
            self.coro = None
            return

        action = wait_info["action"]
        if action == "sleep":
            self.proceed_at = time.perf_counter() + wait_info["delay"]
        else:
            self.error = Exception(f"Unknown async action {action!r}.")

A runner

With the task object capable of stepping the coroutine, and exposing when it should be stepped again, we can create an object that can run multiple tasks simultaneously:

class Runner:
    def __init__(self):
        self.tasks = []

    def add_task(self, coro):
        self.tasks.append(Task(coro))

    def run(self):
        proceed_at = 0

        while self.tasks:
            time.sleep(max(0, proceed_at - time.perf_counter()))

            now = time.perf_counter()
            proceed_at = now + 1
            for task in self.tasks:
                if now >= task.proceed_at:
                    task.step()
                    if task.error:
                        print("Error in task:", task.error)
                proceed_at = min(proceed_at, task.proceed_at)

            self.tasks = [task for task in self.tasks if task.coro]

Trying it out

And then we can do:

async def main():
    await sleep(1)
    print("main slept")
    await sleep(1)
    print("main slept")
    runner.add_task(sub("harry", 3))
    runner.add_task(sub("ron", 4))
    runner.add_task(fail())


async def sub(name, n):
    for _ in range(n):
        await sleep(0.1)
        print(f"{name} slept")


async def fail():
    await sleep(2)
    raise ValueError("Meh")


runner = Runner()
runner.add_task(main())
runner.run()

This produces:

main slept
main slept
harry slept
ron slept
harry slept
ron slept
harry slept
ron slept
ron slept
Error in task: Meh

The final code

It's very minimal, and it cuts many corners, but it works!

The full code for out little async framework
import time


class Sleeper:
    def __init__(self, delay):
        self.delay = delay

    def __await__(self):
        yield {"action": "sleep", "delay": self.delay}


async def sleep(delay):
    await Sleeper(delay)


class Task:

    def __init__(self, coro):
        self.coro = coro
        self.error = None
        self.proceed_at = 0

    def step(self):
        try:
            wait_info = self.coro.send(None)
        except StopIteration:
            self.coro = None
            return
        except Exception as err:
            self.error = err
            self.coro = None
            return

        action = wait_info["action"]
        if action == "sleep":
            self.proceed_at = time.perf_counter() + wait_info["delay"]
        else:
            self.error = Exception(f"Unknown async action {action!r}.")


class Runner:
    def __init__(self):
        self.tasks = []

    def add_task(self, coro):
        self.tasks.append(Task(coro))


    def run(self):
        proceed_at = 0

        while self.tasks:
            time.sleep(max(0, proceed_at - time.perf_counter()))

            now = time.perf_counter()
            proceed_at = now + 1
            for task in self.tasks:
                if now >= task.proceed_at:
                    task.step()
                    if task.error:
                        print("Error in task:", task.error)
                proceed_at = min(proceed_at, task.proceed_at)

            self.tasks = [task for task in self.tasks if task.coro]


async def main():
    await sleep(1)
    print("main slept")
    await sleep(1)
    print("main slept")
    runner.add_task(sub("harry", 3))
    runner.add_task(sub("ron", 4))
    runner.add_task(fail())


async def sub(name, n):
    for _ in range(n):
        await sleep(0.1)
        print(f"{name} slept")

async def fail():
    await sleep(2)
    raise ValueError("Meh")

runner = Runner()
runner.add_task(main())
runner.run()

Async adapter in rendercanvas

In rendercanvas I recently implemented a mini-framework that is just a bit more advanced that the one here. It can sleep and listen to an event.

It's called the asyncadapter, because its purpose it to allow running async code in an event loop that is not normally support async Python (i.e. coroutines), like Qt or wx.

Instead of a runner like we created here, it is driven by a call_later function. Any event-loop can provide that, so this adapter can run on any event loop.

The purpose is to allow code to call the async functions of the WebGPU API, i.e. wait for the GPU. We may make the adapter more advanced as the WebGPU's C-API (wgpu-native) evolves.

Take it to the next level

A thing that is obviously missing here is being able to wait for IO. To implement that you'd to meka use of OS-level signals, and/or threading.

Further, awaitable functions may want to return someting:

data = await download(url)

That something must be passed to the coro here:

        wait_info = self.coro.send(something)

That all sounds like a lot of fun, but out of scope for this post. There will be tons of design decisions to make, which is how Asyncio and Trio distinguish themselves.