When you first switch away from synchronous sequential code, you realize that you can send hundreds of HTTP requests per second because you don't wait for one request to finish before sending the next one. These requests can be anything, but I focus on HTTP requests in this post because they are ubiquitous: you'll use them whether you're using an API, crawling the web or communicating between microservices. If the receiving endpoint is prepared, sending many requests per second might be OK, but in many cases it's not.
In this post, I want to focus on limiting the number of requests per
second. This is related but different from limiting the number of
requests going on at the same time, which I already
covered
and which is actually available in aiohttp via
limit_per_host
.
However, the limits set by a given server are based on the number of
requests sent during a specific interval. That's what
Twitter,
GitHub,
Salesforce
and so on do. This post is going to explain how to do this with
aiohttp, which does not plan to support this
feature, as noted by
mcarans in comments.
Token bucket
We're going to use the token bucket algorithm for rate limiting. The bucket contains tokens, and you consume one token to perform one call. If the bucket is empty, you cannot perform more calls: you need to wait for at least one new token. Before sending requests, the bucket starts with a number of tokens, and you add a new token at fixed intervals unless the bucket is full. If you add one token every 100ms, in the long run you will not make more than ten requests per second, even if you may have short bursts where you send more than this.
You may want to read this again before continuing: it's nothing complicated, but a single read might not be enough!
Annotating the code
Okay, let's do this. The idea is general, but let's see how to limit
calls done from the aiohttp library. This is going to be a class named
RateLimiter
that will intercept HTTP GET requests. This class will
decorate the
aiohttp ClientSession class and will be used like this:
async with aiohttp.ClientSession(loop=loop) as client:
client = RateLimiter(client)
async with await client.get(...) as resp:
...
So that's a lot like usual aiohttp code, except that we're using
await client.get()
instead of client.get()
because it simplifies
our code below.
Let's see the code and comment it as we go, as if we were using literate programming.
class RateLimiter:
RATE = 10
MAX_TOKENS = 10
We allow 10 requests per second and start with 10 available requests.
For a web crawler, say, we don't want to allow MAX_TOKENS
to be too
high because you either have a lot of requests to send or nothing to
do, so this would only result in a high burst at the beginning for no
good reason. For other applications, it may make sense to use other
values.
def __init__(self, client):
self.client = client
self.tokens = self.MAX_TOKENS
self.updated_at = time.monotonic()
As we've seen in our example code above, we receive the client object
and will use it to perform calls. Next, some bookkeeping: we want to
store the current number of tokens (the bucket starts full) and how
much time has elapsed between two buckets fills. We use
time.monotonic()
to ensure the time always goes forward, which is a
nice property you don't get with time.time()
.
async def get(self, *args, **kwargs):
await self.wait_for_token()
return self.client.get(*args, **kwargs)
When we call client.get()
, we first wait for a token to be free,
then go on to perform the actual call. This is remarkably similar to
synchronous code, except that we won't block the whole process during
the wait_for_token
call: other code may run during the waits. I only
specify get()
here, but in the real world you would probably want to
cover other methods such as post()
.
How do we wait for new tokens?
async def wait_for_token(self):
while self.tokens <= 1:
self.add_new_tokens()
await asyncio.sleep(1)
self.tokens -= 1
If we have one token ready, we use it and return to the get()
call.
Note that if we were using threads, "using a token" would need a lock
and would be error-prone. Here, it's easy, you only need to decrement
it because you know that no other code will be executing at the same
time: all other coroutines will be either waiting their turn or
waiting for I/O.
Now, if there are not enough tokens, we need to see if new ones became available since the last update. If not, just sleep. I'm using a one-second sleep here, but in real code you would start with a smaller delay and use exponential back-off.
The only part of the puzzle left is adding new tokens.
def add_new_tokens(self):
now = time.monotonic()
time_since_update = now - self.updated_at
new_tokens = time_since_update * self.RATE
if self.tokens + new_tokens >= 1:
self.tokens = min(self.tokens + new_tokens, self.MAX_TOKENS)
self.updated_at = now
We simply look at the number of tokens we should add since the last
update and make sure that we do not add more than MAX_TOKENS
.
The only trick here is that we only change self.tokens
if it would
allow for a new request, that is, make self.tokens
go above 1.
Indeed, adding and multiplicating small numbers might lead to
incorrect results due to low clock resolution or underflows. I am not
certain this is actually a problem here, but being sure it never
becomes one is nice!
If you want to use that code, here's a full example so you don't have to copy/paste my code above!
Benefits of asynchronous programming
We don't have to worry about tokens
or updated_at
being updated by
something else. The code looks a lot like the synchronous code we're
used to, and we can reason about it the same way. But we still get
great throughput! The only downside is that you need to sprinkle your
code with async
and await
which prevents you to use it in a
synchronous
context.
Any comments?
If you liked this post, you might like:
Thanks to Kyrean and Miquel for their insightful comments which led to corrections in this blog post.
I'm on Mastodon!
Comments