asyncio has brought support for asynchronous I/O with an event loop to
Python 3. Since Python 3.5, we can use the async
and await
keywords:
async def get(self):
await self.wait_for_token()
return self.client.get()
But Python 3.5 also brought support for asynchronous context managers
(async with
) and asynchronous loops (async for
). I believe those
two also deserve to be known and used! Indeed, they add real
expressive power. In this post, I'm going to focus on async for
.
Indeed, when I started looking for information about asynchronous
loops, the information I saw was confusing. I typed python async for
in a search engine, and the top result was PEP
492, which gives way too
much detail for users of the feature, and is only considering Python
3.5 which complicates the situation. Nowadays you can use the modern
Python 3.6+ syntax, which is much easier to use.
Python 3.6+
async for
loops are a natural extension of the usual loops in
Python. Let's use the example of a paginated HTTP API that returns
documents page after page. Here's how a usual synchronous for loop
would work:
def get_docs():
page = fetch_page()
while page:
for doc in page:
yield doc
page = fetch_page()
for doc in get_docs():
pass # work on doc
Now, an asynchronous for loop will be really similar. As usual with asyncio you just need to add the async/await keywords at the right place:
async def get_docs():
page = await fetch_page()
while page:
for doc in page:
yield doc
page = await fetch_page()
async for doc in get_docs():
pass # work on doc
And that's it! Your code is now asynchronous. There's nothing magic,
it's just a loop over the results yielded by get_docs
. The order of
execution is the same as in the asynchronous case, except that during
await
calls, other unrelated code can execute.
(Note that in the real world you could have a fetch_all_pages
generator that would fetch pages one by one, which would simplify the
implementation of get_docs
. But as an example it would be more
confusing since we would have an async for
loop in get_docs
too.)
Under the hood
But maybe you want to understand how asynchronous iterators work under
the hood. To do so, let's reimplement get_doc
using a class. Here's
what it looks like:
import collections
class AsyncGetDocs:
def __init__(self):
self.buffer = collections.deque()
We need to store the results of the fetch_page
code in a buffer in
order to be able to return them one by one when needed. To preserve
the order, we need a first-in first-out (FIFO) queue, which can easily
be implemented with collections.deque
in Python.
def __aiter__(self):
return self
async for
loops use the __aiter__
protocol: they need an object
with an __aiter__
method. In most cases, just returning self
is
enough as it avoids introducing yet another class. The object returned
by __aiter__
should have an __anext__
method that will return the
objects one by one:
async def __anext__(self):
if not self.buffer:
await self._prefetch()
if not self.buffer:
raise StopAsyncIteration
return self.buffer.popleft()
If the buffer is empty, we try to fill it. If it's still empty, this is the end of the loop, so we stop the iteration.
Now, if the buffer is not empty, we can fetch the last value that was
added to it (remember that we're treating our buffer as a queue.) We
only have the _prefetch
method left:
async def _prefetch(self):
for doc in await fetch_page():
self.buffer.append(doc)
Woah! A lot of boilerplate.
If you liked this post, you might like:
I'm on Mastodon!
Comments