Asyncio Patterns in Python

Skyler Lewis
Level Up Coding
Published in
7 min readFeb 25, 2024

--

ASYNC PYTHON!

Update: I wrote a part 2, which talks about adding resiliency in the pattern.

Recently I needed to run millions of API calls to an internal service. API calls are IO blocking. This means that when my service calls the API, it does nothing until it gets a response.

import time

def mock_api_request(i):
print(f"API request started {i}")
time.sleep(1)
print(f"API request completed {i}")

def main():
for i in range(1_000_000):
mock_api_request(i)

main()

So if I have 1 million API calls to make, each one taking 1 second, then it will take 1 million seconds (1 week 4 days 13 hours 46 min 40s) for my work to complete. This will not do.

Note on threading: 
We could use threads to give us some concurrency, but managing multiple
threads can be complicated. You have to handle synchronization issues,
like race conditions, where threads are competing to use the same resource.
Also multi-threading might not offer significant improvements over
single-threaded execution because the bottleneck is waiting for I/O, not
CPU processing.

Let’s take a look at this same code written for asyncio.

Asyncio scaffolding

import asyncio

async def mock_api_request(i):
print(f"API request started {i}")
await asyncio.sleep(1) # this could be an API call, or some other IO bound task
print(f"API request completed {i}")

async def run():
for i in range(1_000_000):
await mock_api_request(i)

asyncio.run(run())

This example showcases Python’s asyncio library, designed for efficient asynchronous programming. By defining functions with async def and using await with asyncio.sleep(1), we create coroutines that can pause and resume, allowing the program to execute other tasks rather than waiting idly. This approach is particularly beneficial for IO-bound tasks, like API calls, where the event loop can manage multiple tasks concurrently without blocking.

We call asyncio.run(run()), which launches our primrary function into the event loop. This is where asyncio shines, seamlessly orchestrating the execution of numerous coroutines.

Note: 
I am not using "main" as my primary function name. This is because you might
want to introduce an asyncio in your existing synchronous code somewhere (with
"asyncio.run()") or from "main" (ie the "if __name__ == "__main__" section)

Let’s try a common pattern in asyncio. Let’s create a bunch of jobs, put them on the event loop and wait for them to finish.

Memory Hungry Event Loop

# ... mock_api_request

async def run():
tasks = []
for i in range(1_000_000):
tasks.append(asyncio.create_task(mock_api_request(i)))
await asyncio.wait(tasks)

# ... everything else

This doesn’t feel right does it? We just created a list of 1 million tasks for our event loop to manage. Oof. Big oof. What is happening here?

Under the Hood

(running 1M tasks at the same time)
  1. We create a task from our mock_api_request coroutine.
  2. We add all tasks to the list tasks. (With a million items, you may run out of memory)
  3. If you don’t run out of memory, Python will wait for the coroutines in tasks to complete when await asyncio.wait(tasks) is called.
  4. Each task is sitting in the event loop. The event loop nabs a task and begins work on it.
  5. Python will run the task synchronously until it sees the await keyword.
  6. Once it sees await, the event loop yields and switches to the next task on the queue.
  7. Repeat step 5 and 6, occasionally checking to see if one of the tasks it is waiting on finishes.
  8. When the awaited function is complete with the IO work, then it will resume execution until the next await or the end of then function.
  9. This goes on until every items on the list is done.

Cool, that is the broad strokes for what the event loop is doing.

So how do we speed things up from our synchronous-asynchronous program??

Batching

# ... mock_api_request

async def run():
tasks = []
batch_size = 100
for i in range(1_000_000):
tasks.append(asyncio.create_task(mock_api_request(i)))
if len(tasks) >= batch_size:
await asyncio.wait(tasks)
tasks = []

if tasks: # If there are any remaining tasks, wait for them to complete
await asyncio.wait(tasks)

# ... everything else

That’s much better, at least in terms of memory usage. Though, the speed may or may not be any faster than our memory-inefficient version.

You will notice when running as batches, every time you await asyncio.wait(tasks) you will have to wait for every last task to complete before being able to start the next batch. That won’t do.

Producer/Consumer Pattern

We are going to introduce asyncio.Queue() in this next iteration.

# ... mock_api_request

async def producer(queue: asyncio.Queue):
for i in range(1_000_000):
await queue.put(i)

async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
await mock_api_request(item)

async def run():
queue = asyncio.Queue(maxsize=100)
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await asyncio.wait([producer_task, consumer_task])

# ... everything else

When we run this, we will notice that things are more or less synchronous again. What is happening is we are putting all the items in the queue via our producer, but then our consumer is only grabbing one time on the queue at a time.

IMPORTANT NOTE:
You should _always_ use `maxsize` asyncio.Queue(maxsize=100). Not doing so
will open yourself up to OOM (Out of Memory) issues.
Adding the maxsize will pause execution on the producer until the consumer
pulls a task off the queue.

Let’s get this producer/consumer pattern up to speed with the batching version we did earlier.

Multiple Consumers

Let’s modify our run function again.

async def run():
queue = asyncio.Queue(maxsize=100)
number_of_consumers = 10
producer_task = asyncio.create_task(producer(queue))
consumers = []
for _ in range(number_of_consumers):
consumer_task = asyncio.create_task(consumer(queue))
consumers.append(consumer_task)
await asyncio.wait([producer_task, *consumers])

Now we are creating a list and assigning it to the variable consumers. Then we create 10 asyncio tasks, and wait for the producer and all 10 consumers to finish the work. Now if you run it, the consumers pull off the queue whenever it can and you get a constant stream of work being done!

When the producer is done and the queue is empty, the consumers hang, waiting for tasks that will never come. The process never quits. This may be fine if your producer is connected to a source that never ends. For this we need some way to tell our consumers to quit.

Signaling with `None`

Let’s start with the simplest signal. None.

async def consumer(queue: asyncio.Queue):
while True:
if queue.empty():
await asyncio.sleep(0)
continue
item = await queue.get()
if item is None:
break
await mock_api_request(item)

We add a conditional to the consumer that checks if None is passed into the queue. We can use this as a signal to quit, since often we can assume that item shouldn’t be None.

NOTE:
With an empty queue, if the producer has stopped producing messages, calling
`await queue.get()` will prevent the event loop from ever returning to your
coroutine. This will cause the consumer to stall indefinitely. This is why
we first check if the queue is empty first. Just be sure use the no-op
`await asyncio.sleep(0)` to give the event loop a chance to produce new
messages if needed.

We should also update the producer to send this signal.

async def producer(queue: asyncio.Queue, number_of_consumers: int):
for i in range(1_000_000):
await queue.put(i)
for _ in range(number_of_consumers):
await queue.put(None)

Notice how we add number_of_consumers to the producer’s function signature. We need to put as many None's in the queue as we have workers, since each of them need their own signal. Just remember to update the producer task in the run method.

producer_task = asyncio.create_task(producer(queue, number_of_consumers))

Sometimes None is a valid value, or you want something a little more sophisticated signal. Asyncio has you covered.

Asyncio Event

Lets add an asyncio.Event() variable that can be used as our signal.

async def run():
queue = asyncio.Queue(maxsize=100)
stop_event = asyncio.Event()
producer_task = asyncio.create_task(producer(queue, stop_event))
consumers = []
number_of_consumers = 10
for _ in range(number_of_consumers):
consumer_task = asyncio.create_task(consumer(queue, stop_event))
consumers.append(consumer_task)
await asyncio.wait([producer_task, *consumers])

We also need to update the consumer and producer.

async def producer(queue: asyncio.Queue, stop_event: asyncio.Event):
for i in range(1_000_000):
await queue.put(i)
stop_event.set()


async def consumer(queue: asyncio.Queue, stop_event: asyncio.Event):
while True:
if queue.empty() and stop_event.is_set():
break
item = await queue.get()
await mock_api_request(item)

If you used the number_of_consumers to put a bunch of None values in the queue, you will need to remove that from the producer’s function signature.

In the consumer, the important bit is if stop event.is_set() and queue.empty():. This will ensure that all tasks are complete first, before you actually quit. This may vary depending on your needs. For example, you might want a separate asyncio.Event for aborting the function without draining the queue.

Summary

Update: I wrote a part 2, which talks about adding resiliency in the pattern.

You should now have a path of optimization and various patterns for optimizing and enhancing your throughput of IO bound calls in Python.

There are still things you can do to improve the speed. You can change the number of consumers, for instance. You could also offload the `queue` parts to a dedicated queue, like RabbitMQ or NATS, allowing you to scale and/or separate your producers and consumers.

Postscript

Hey you made it this far! Kudos! Give this a clap, a save, a like, or whatever. Follow me too. K thx bye!

— Skyler Lewis (@alairock)

--

--