Using asynchronous for loops in Python

asyncio has brought support for asynchronous I/O with an event loop to Python 3. Initially, it was just a module built on top of the yield keyword used with generators. This changed with the release of Python 3.5 which introduced the async and await keywords, allowing to turn this Python 3.4 code:

@asyncio.coroutine
def get(self, *args, **kwargs):
    yield from self.wait_for_token()
    return self.client.get(*args, **kwargs)

into this Python 3.5 code:

async def get(self, *args, **kwargs):
    await self.wait_for_token()
    return self.client.get(*args, **kwargs)

So that's a nice incremental improvement! But Python 3.5 also brought support for asynchronous context managers (async with) and asynchronous loops (async for). I believe those two also deserve to be known and used! Indeed, they add real expressive power that is difficult to achieve in Python 3.4. In this post, I'm going to focus on async for.

Indeed, when I started looking for information about asynchronous loops, the information I saw was confusing. I typed python async for in a search engine, and the top result was PEP 492, which gives way too much detail for users of the feature, and is only considering Python 3.5 which complicates the situation as we will see below. But if you can use Python 3.6 or above, you're in luck! Asynchronous loops will be much easier to use.

Python 3.6+

async for loops are a natural extension of the usual loops in Python. Let's use the example of a paginated HTTP API that returns documents page after page. Here's how a usual synchronous for loop would work:

def get_docs():
    page = fetch_page()
    while page:
        for doc in page:
            yield doc
        page = fetch_page()

for doc in get_docs():
    pass  # work on doc

Now, an asynchronous for loop will be really similar. As usual with asyncio you just need to add the async/await keywords at the right place:

async def get_docs():
    page = await fetch_page()
    while page:
        for doc in page:
            yield doc
        page = await fetch_page()

async for doc in get_docs():
    pass  # work on doc

And that's it! Your code is now asynchronous. There's nothing magic, it's just a loop over the results yielded by get_docs. The order of execution is the same as in the asynchronous case, except that during await calls, other unrelated code can execute.

(Note that in the real world you could have a fetch_all_pages generator that would fetch pages one by one, which would simplify the implementation of get_docs. But as an example it would be more confusing since we would have an async for loop in get_docs too.)

Python 3.5

However, you may be stuck with Python 3.5, for example if you only use the Python versions provided by Ubuntu 16.04 LTS. Or you want to understand how asynchronous iterators work under the hood.

In Python 3.5, you cannot use yield in an async function. Indeed, async function were initially implemented using yield in Python 3.4, and this limitation was only removed in Python 3.6. So the above code is not going to work. Instead, you need to reimplement get_doc using a class, which is more low level, more complicated and more verbose. Here's what it looks like:

import collections

class AsyncGetDocs:
    def __init__(self):
        self.buffer = collections.deque()

We need to store the results of the fetch_page code in a buffer in order to be able to return them one by one when needed. To preserve the order, we need a last-in first-out (LIFO) queue, which can easily be implemented with collections.deque in Python.

    def __aiter__(self):
        return self

async for loops use the __aiter__ protocol: they need an object with an __aiter__ method. The object return by __aiter__ should have an __anext__ method that will return the objects one by one:

    async def __anext__(self):
        if not self.buffer:
            await self._prefetch()
            if not self.buffer:
                raise StopAsyncIteration
        return self.buffer.popleft()

If the buffer is empty, we try to fill it. If it's still empty, this is the end of the loop, so we stop the iteration.

Now, if the buffer is not empty, we can fetch the last value that was added to it (remember that we're treating our buffer as a queue.) We only have the _prefetch method left:

    async def _prefetch(self):
        for doc in await fetch_page():
            self.buffer.append(doc)

Woah! A lot of boilerplate.

Python 3.5 + async_generator

Alternally, you can have something close to the Python 3.6 version using the third-party async_generator library which allows you to write:

from async_generator import async_generator, yield_

@async_generator
async def get_docs():
    page = await fetch_page()
    while page:
        for doc in page:
            await yield_(doc)
        page = await fetch_page()

async for doc in get_docs():
    pass  # work on doc

This is much closer to the Python 3.6 version! Thanks to Nathaniel J. Smith for having 1/ written the library and 2/ mentioned it in the comments!

Finally, if you're still using Python 3.4, well, what are you waiting for to upgrade?

If you liked this post, you might like:

Consider following me on Twitter.

Comments !