Using asynchronous for loops in Python

asyncio has brought support for asynchronous I/O with an event loop to Python 3. Since Python 3.5, we can use the async and await keywords:

async def get(self):
    await self.wait_for_token()
    return self.client.get()

But Python 3.5 also brought support for asynchronous context managers (async with) and asynchronous loops (async for). I believe those two also deserve to be known and used! Indeed, they add real expressive power. In this post, I'm going to focus on async for.

Indeed, when I started looking for information about asynchronous loops, the information I saw was confusing. I typed python async for in a search engine, and the top result was PEP 492, which gives way too much detail for users of the feature, and is only considering Python 3.5 which complicates the situation. Nowadays you can use the modern Python 3.6+ syntax, which is much easier to use.

Python 3.6+

async for loops are a natural extension of the usual loops in Python. Let's use the example of a paginated HTTP API that returns documents page after page. Here's how a usual synchronous for loop would work:

def get_docs():
    page = fetch_page()
    while page:
        for doc in page:
            yield doc
        page = fetch_page()

for doc in get_docs():
    pass  # work on doc

Now, an asynchronous for loop will be really similar. As usual with asyncio you just need to add the async/await keywords at the right place:

async def get_docs():
    page = await fetch_page()
    while page:
        for doc in page:
            yield doc
        page = await fetch_page()

async for doc in get_docs():
    pass  # work on doc

And that's it! Your code is now asynchronous. There's nothing magic, it's just a loop over the results yielded by get_docs. The order of execution is the same as in the asynchronous case, except that during await calls, other unrelated code can execute.

(Note that in the real world you could have a fetch_all_pages generator that would fetch pages one by one, which would simplify the implementation of get_docs. But as an example it would be more confusing since we would have an async for loop in get_docs too.)

Under the hood

But maybe you want to understand how asynchronous iterators work under the hood. To do so, let's reimplement get_doc using a class. Here's what it looks like:

import collections

class AsyncGetDocs:
    def __init__(self):
        self.buffer = collections.deque()

We need to store the results of the fetch_page code in a buffer in order to be able to return them one by one when needed. To preserve the order, we need a first-in first-out (FIFO) queue, which can easily be implemented with collections.deque in Python.

    def __aiter__(self):
        return self

async for loops use the __aiter__ protocol: they need an object with an __aiter__ method. In most cases, just returning self is enough as it avoids introducing yet another class. The object returned by __aiter__ should have an __anext__ method that will return the objects one by one:

    async def __anext__(self):
        if not self.buffer:
            await self._prefetch()
            if not self.buffer:
                raise StopAsyncIteration
        return self.buffer.popleft()

If the buffer is empty, we try to fill it. If it's still empty, this is the end of the loop, so we stop the iteration.

Now, if the buffer is not empty, we can fetch the last value that was added to it (remember that we're treating our buffer as a queue.) We only have the _prefetch method left:

    async def _prefetch(self):
        for doc in await fetch_page():
            self.buffer.append(doc)

Woah! A lot of boilerplate.

If you liked this post, you might like:

I'm on Mastodon!

Comments