‘asyncio’ tutorial for the programmer in a hurry

Introduction

In the evolving world of Python programming, understanding the asyncio library is becoming increasingly important. asyncio allows for asynchronous, concurrent, and parallel programming in a language that was traditionally synchronous and single-threaded.

As Python applications grow in complexity and depend on I/O-bound tasks like web requests or database calls, it’s crucial to manage these tasks efficiently. That’s where asyncio shines. However, getting started with asyncio can be daunting, especially if you’re not familiar with coroutines, tasks, futures, and event loops. This tutorial aims to guide you through the basics of asyncio, complete with functional examples.

By the end of this tutorial, you’ll have a solid grasp of asyncio and be ready to implement it in your projects, whether you’re building a web scraper, microservice, or GUI application.

So let’s dive into the world of Python asyncio!

Coroutines

A coroutine is a generalization of a subroutine that can be paused and resumed, allowing other code to run during the pauses. Coroutines are a key part of asyncio and allow us to write asynchronous code in a procedural style.

In Python, coroutines are defined with the async def syntax:

async def my_coroutine():
    print("Hello, Coroutine!")

To run the coroutine, you can’t just call it like a normal function. You must use the await keyword:

await my_coroutine()

Let’s simulate a long-running task with asyncio’s sleep function:

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)  # sleep for 1 second
    print("Hello, Coroutine!")

async def main():
    await my_coroutine()

if __name__ == "__main__":
    asyncio.run(main())

Async Context Managers and Async Iterators

In real-world applications, you often need to manage resources like file streams, network connections, or database connections. Using context managers and iterators asynchronously can be very useful for such tasks. Luckily, Python’s asyncio provides native support for async context managers and async iterators, which make your code not only more efficient but also cleaner and more maintainable.

Using async with

The async with statement is used to invoke an asynchronous context manager. For example, when working with asynchronous file operations, you can read a file without blocking the event loop like this:

import aiofiles
import chardet
import asyncio

async def read_large_file():
    async with aiofiles.open('large_file.txt', mode='rb') as f:
        raw_data = await f.read()

    detected = chardet.detect(raw_data)
    encoding_type = detected['encoding']
    contents = raw_data.decode(encoding_type)
    print(contents)

async def main():
    await read_large_file()

if __name__ == "__main__":
    asyncio.run(main())

One more example:

# Async context managers for managing resources
async with aiohttp.ClientSession() as session:
    async with session.get('https://api.example.com/data') as resp:
        data = await resp.json()

Best Practice: Always prefer using async with for acquiring and releasing resources.

Using async for

The async for statement can be used with an object that is an asynchronous iterator. For instance, to asynchronously fetch multiple URLs:

import asyncio

# Define an asynchronous generator simulating fetching data from URLs
async def fetch_data_from_urls():
    urls = ["https://www.google.com", "https://www.apple.com", "https://www.microsoft.com"]
    for url in urls:
        await asyncio.sleep(2)  # Simulate network delay
        yield f"Fetched data from {url}"

# Define a coroutine that uses the asynchronous generator
async def process_fetched_data():
    async for data in fetch_data_from_urls():
        print(f"Processing: {data}")

# Run the asynchronous event loop
asyncio.run(process_fetched_data())

Both for and async for loops in Python have similar purposes: they are used for iterating over elements. However, they are not interchangeable and are designed for different types of iterables.

  • The for loop is used for iterating over synchronous iterables, such as lists, or synchronous generators.
for item in [1, 2, 3]:
    print(item)
  • The async for loop is designed for iterating over asynchronous iterables or asynchronous generators.
async for item in async_generator():
    print(item)

The key difference is that you can’t use a standard for loop to iterate over asynchronous iterables or asynchronous generators. Likewise, you can’t use an async for loop to iterate over traditional, synchronous iterables or generators.

If you try to mix them up, Python will raise a TypeError.

Best Practice: Use async for when working with asynchronous iterators to make your code more readable and maintainable.

Tasks and concurrency

Tasks are used to schedule coroutines concurrently. When a coroutine is wrapped into a Task with functions like asyncio.create_task(), the coroutine is automatically scheduled to run soon.

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    print("Hello, Coroutine!")

async def main():
    # Create a task out of a coroutine
    task = asyncio.create_task(my_coroutine())
    # wait for the task to finish
    await task

# Run the main coroutine
asyncio.run(main())

IMPORTANT! You cannot just execute my_coroutine() by invoking it directly in the next line:

task = asyncio.create_task(my_coroutine())

If you do this outside of an asynchronous function, you will receive the following error:

RuntimeError: no running event loop

In Python’s asyncio, the “event loop” is like a manager for tasks that should run concurrently. This error happens when you’re trying to do something that requires this manager, but it’s not there.

In simpler terms, you’re trying to perform an operation that should run in the background (asynchronously), but there’s no system (the “event loop” that I will explain later in this post) in place to handle this kind of operation.

Commonly, this error happens because you’re running asyncio code outside of an async function. To fix this, ensure your asyncio code is being run inside an async function and an event loop is running. To fix this error, just run it in this way:

asyncio.run(my_coroutine())

Tasks can be canceled if they’re no longer needed. This is done with the cancel method on the Task object. You can also set timeouts for tasks by using the asyncio.wait_for function:

import asyncio

async def my_coroutine():
    await asyncio.sleep(10)
    print("Hello, Coroutine!")

async def main():
    # Create a task out of a coroutine
    task = asyncio.create_task(my_coroutine())

    # Cancel the task
    task.cancel()

    # Set a timeout for a task
    try:
        await asyncio.wait_for(my_coroutine(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Task took too long!")

if __name__ == "__main__":
    asyncio.run(main())

Tasks, coroutines, and futures are all types of awaitables. An awaitable is something you can use in an await expression. Futures are lower-level awaitables that represent an eventual result of a computation.

Measure Execution Time in Coroutines with Decorators

You can use decorators to measure the execution time of your coroutines. Here’s an example:

import asyncio
import time

def timer(func):
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        result = await func(*args, **kwargs)
        end_time = time.time()
        print(f"Time taken: {end_time - start_time} seconds")
        return result
    return wrapper

@timer
async def my_coroutine():
    await asyncio.sleep(1)
    print("Hello, Coroutine!")

async def main():
    await my_coroutine()

if __name__ == "__main__":
    asyncio.run(main())

Why measure execution time?

Before diving into the code, it’s crucial to understand why measuring the execution time of your coroutines is beneficial in the first place. Here are some reasons:

  1. Performance Optimization: One of the key benefits is to identify bottlenecks or performance issues in your code. If a particular coroutine is taking too long to execute, it might become a candidate for optimization.
  2. Resource Allocation: Understanding the time complexity of your tasks can help in better resource allocation. For example, you might decide to offload a time-consuming task to a worker thread or another machine.
  3. Debugging: Occasionally, tasks may hang or enter into an infinite loop. Measuring execution time can serve as a debugging tool to identify such issues.
  4. User Experience: For user-facing applications, response time is critical. Knowing how long tasks take can help you make data-driven decisions to improve user experience. For example, you may introduce a loading screen for tasks that take more than a few seconds.
  5. Scalability: When building a system that needs to scale, it’s essential to understand how individual components perform. Measuring the execution time of your coroutines gives you insights into how your application will perform under different loads.

By keeping an eye on the time it takes for each coroutine to execute, you’re not just writing code, but also ensuring that it’s efficient, scalable, and provides a smooth user experience.

Problems with Coroutines and Tasks

One potential issue with coroutines and tasks is that they can be difficult to debug. Errors might not be raised until the coroutine is awaited, which can make it hard to track down the source of the problem.

Deferred Error Handling in Coroutines

It’s important to note that errors in a coroutine won’t be raised until the coroutine is awaited. This is because coroutines are essentially ‘lazy’, meaning they don’t do any work until you explicitly ask them to by using await. This can be a double-edged sword: on the one hand, it provides more control over when the code is executed, but on the other, it can make debugging a bit tricky since errors are deferred.

Solution and Workaround:
  • Using gather with return_exceptions: If you’re running multiple coroutines concurrently and want to capture all exceptions, consider using asyncio.gather with the return_exceptions=True parameter. This way, exceptions will be returned instead of being raised, allowing you to handle them gracefully
import asyncio

async def problematic_coroutine():
    raise ValueError("Some error")

async def another_problematic_coroutine():
    raise KeyError("Another error")

async def main():
    results = await asyncio.gather(
        problematic_coroutine(),
        another_problematic_coroutine(),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Caught an error: {result}")

if __name__ == "__main__":
    asyncio.run(main())

The Event-loop in asyncio

The event loop is the core of every asyncio application. You can access and manage the event loop manually, although in many cases, asyncio’s high-level APIs are all you need.

Here’s how to access the event loop and use it to run a coroutine:

import asyncio

async def my_coroutine(n):
    print(f"Coroutine {n} starting")
    await asyncio.sleep(n)  # simulate IO-bound task with sleep
    print(f"Coroutine {n} completed")

# Get the current event loop
loop = asyncio.get_event_loop()

# Create multiple tasks to run
tasks = [loop.create_task(my_coroutine(i)) for i in range(1, 4)]

# Gather tasks and run them
loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

In this script, we define an asynchronous function my_coroutine that simulates a time-consuming IO-bound task by sleeping for a number of seconds given by n. We then get the current event loop and use it to create a number of tasks. Each task is an instance of my_coroutine with a different argument for n.

Finally, we use asyncio.gather to combine these tasks into a single awaitable object, and we use run_until_complete to run this object. The script prints a message when each coroutine starts and completes, so you can see that the coroutines run concurrently.

Remember, asyncio is designed for IO-bound tasks and not for CPU-bound tasks. If you try to use asyncio with CPU-bound tasks, you won’t get true parallelism, because Python’s asyncio is based on coroutines and an event loop, which is a single-threaded architecture. If you have CPU-bound tasks, consider using threading or multiprocessing in Python.

Also, when we’re finished with a loop, it’s good practice to close() it.

Debug Mode

Asyncio’s debug mode can provide more detailed information about your asyncio code and can help you diagnose issues. To enable debug mode, you can set the PYTHONASYNCIODEBUG environment variable to 1, or you can enable it in your code.

asyncio performs check for PYTHONASYNCIODEBUG on module importing. Thus you need setup environment variable before very first asyncio import:

import os
os.environ['PYTHONASYNCIODEBUG'] = '1'
import logging

logging.basicConfig(level=logging.DEBUG)

When debug mode is enabled, asyncio logs more events and checks for common mistakes, such as blocking the event loop.

import os
os.environ['PYTHONASYNCIODEBUG'] = '1'
import asyncio
import time

async def my_coroutine(n):
    print(f"Coroutine {n} starting")
    await asyncio.sleep(n)
    print(f"Coroutine {n} completed")

async def blocking_coroutine():
    print("Blocking coroutine starting")
    time.sleep(2)  # Blocking call
    print("Blocking coroutine completed")

async def main():
    tasks = [
        asyncio.create_task(my_coroutine(1)),
        asyncio.create_task(my_coroutine(2)),
        asyncio.create_task(blocking_coroutine())
    ]
    await asyncio.gather(*tasks)

# Get the current event loop
loop = asyncio.get_event_loop()

# Enable debug mode
loop.set_debug(True)

# Run the main coroutine
try:
    loop.run_until_complete(main())
finally:
    loop.close()

In this example, we define two async functions: my_coroutine and blocking_coroutine. The my_coroutine function is similar to the one in the previous example, while blocking_coroutine is designed to make a blocking call with time.sleep().

We also create a main coroutine that creates tasks for both my_coroutine and blocking_coroutine. We then run the main coroutine using loop.run_until_complete().

In this case, the blocking_coroutine function will block the event loop, which is a common mistake when using asyncio. When running this script in debug mode, asyncio will print a warning message to notify you that the event loop is being blocked by the blocking_coroutine function.

Remember to enable debug mode in production cautiously, as it may slow down your application due to the additional checks. It’s better to use it during development and testing to catch any potential issues with your asyncio code.

Conclusion

Well done on wrapping up this tutorial on asyncio basics! You now have a firm grasp of core asyncio principles, from coroutines and tasks to debugging and managing the event loop.

But there’s more to asyncio! Its true potential unfolds when managing complex workflows and coordinating between different tasks. This is where advanced topics like synchronization primitives, Queues, Semaphores, and Transports and Protocols come into play.

In our next article, we’ll delve deeper into these advanced asyncio concepts, equipping you with a comprehensive skill set for handling complex asynchronous programming challenges in Python. Continue to practice, explore, and we’ll see you in our next tutorial for more learning adventures!

Share