Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ Asyncio: Event Loop and Tasks
๐ฏ Introduction
Ever felt like your Python program is stuck waiting for one thing to finish before moving to the next? ๐ Imagine youโre a chef ๐จโ๐ณ in a restaurant - would you wait for the soup to boil before starting to chop vegetables? Of course not! Youโd multitask! Thatโs exactly what asyncio helps us do in Python! ๐
In this tutorial, weโll explore the magical world of asyncio - Pythonโs built-in library for writing concurrent code. Youโll learn how to make your programs do multiple things at once, like a master chef juggling multiple dishes! ๐ณโจ
๐ Understanding Asyncio: The Event Loop and Tasks
Think of asyncio like a restaurant kitchen ๐ฝ๏ธ:
- Event Loop = The head chef coordinating everything ๐จโ๐ณ
- Tasks = Individual cooking activities (boiling pasta, grilling steak, etc.) ๐๐ฅฉ
- Await = Waiting for something to finish (like the oven timer) โฐ
The event loop is the heart of asyncio - itโs like a super-efficient manager that keeps track of all your tasks and switches between them whenever one is waiting for something! ๐
The Magic of Concurrency ๐ฉ
# ๐ Without asyncio (synchronous)
import time
def make_coffee():
print("โ Starting coffee...")
time.sleep(3) # Waiting 3 seconds
print("โ Coffee ready!")
def make_toast():
print("๐ Starting toast...")
time.sleep(2) # Waiting 2 seconds
print("๐ Toast ready!")
# This takes 5 seconds total! ๐ด
make_coffee()
make_toast()
# ๐ With asyncio (asynchronous)
import asyncio
async def make_coffee():
print("โ Starting coffee...")
await asyncio.sleep(3) # Non-blocking wait!
print("โ Coffee ready!")
async def make_toast():
print("๐ Starting toast...")
await asyncio.sleep(2) # Non-blocking wait!
print("๐ Toast ready!")
# This takes only 3 seconds! ๐
async def make_breakfast():
await asyncio.gather(make_coffee(), make_toast())
asyncio.run(make_breakfast())
๐ง Basic Syntax and Usage
Letโs break down the essential asyncio components! ๐งฉ
1. Creating Async Functions (Coroutines) ๐
# โ
Async function (coroutine)
async def greet_user(name):
print(f"๐ Hello, {name}!")
await asyncio.sleep(1) # Simulate some async work
return f"Nice to meet you, {name}! ๐"
# โ Common mistake - forgetting 'async'
def greet_user_wrong(name):
print(f"๐ Hello, {name}!")
await asyncio.sleep(1) # SyntaxError! ๐ฅ
return f"Nice to meet you, {name}!"
2. Running the Event Loop ๐โโ๏ธ
import asyncio
async def main():
# Your async code goes here! ๐ฏ
result = await greet_user("Python Learner")
print(result)
# ๐ Modern way (Python 3.7+)
asyncio.run(main())
# ๐ Old way (still works)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
3. Creating and Managing Tasks ๐
import asyncio
async def fetch_data(url, delay):
print(f"๐ Fetching {url}...")
await asyncio.sleep(delay) # Simulate network delay
return f"Data from {url} ๐ฆ"
async def main():
# Create tasks - they start running immediately! ๐โโ๏ธ
task1 = asyncio.create_task(fetch_data("api.com", 2))
task2 = asyncio.create_task(fetch_data("database.com", 3))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Got: {result1}")
print(f"Got: {result2}")
asyncio.run(main())
๐ก Practical Examples
Example 1: Web Scraper with Rate Limiting ๐ท๏ธ
import asyncio
import random
class AsyncWebScraper:
def __init__(self, max_concurrent=3):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def fetch_page(self, url):
async with self.semaphore: # Limit concurrent requests ๐ฆ
print(f"๐ Fetching {url}...")
# Simulate network delay
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
# Simulate page content
content = f"Content from {url} (took {delay:.2f}s)"
print(f"โ
Done: {url}")
return content
async def scrape_all(self, urls):
tasks = [self.fetch_page(url) for url in urls]
self.results = await asyncio.gather(*tasks)
return self.results
# Let's scrape some "websites"! ๐
async def main():
urls = [
"https://example1.com",
"https://example2.com",
"https://example3.com",
"https://example4.com",
"https://example5.com"
]
scraper = AsyncWebScraper(max_concurrent=2)
results = await scraper.scrape_all(urls)
print("\n๐ Results:")
for result in results:
print(f" โข {result}")
asyncio.run(main())
Example 2: Real-Time Chat Server ๐ฌ
import asyncio
from datetime import datetime
class ChatRoom:
def __init__(self):
self.messages = []
self.users = {}
async def add_user(self, user_id, writer):
self.users[user_id] = writer
await self.broadcast(f"๐ {user_id} joined the chat!")
async def remove_user(self, user_id):
if user_id in self.users:
del self.users[user_id]
await self.broadcast(f"๐ {user_id} left the chat!")
async def broadcast(self, message):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_msg = f"[{timestamp}] {message}"
self.messages.append(formatted_msg)
# Send to all connected users ๐ก
tasks = []
for user_id, writer in self.users.items():
tasks.append(self.send_to_user(writer, formatted_msg))
await asyncio.gather(*tasks, return_exceptions=True)
async def send_to_user(self, writer, message):
try:
writer.write(f"{message}\n".encode())
await writer.drain()
except:
pass # Handle disconnected users gracefully
# Simulate a chat session! ๐ฎ
async def simulate_chat():
chat = ChatRoom()
# Simulate users joining
print("๐๏ธ Chat Room Opened!")
# Mock writers (in real app, these would be network connections)
class MockWriter:
def write(self, data):
print(f"๐จ Sending: {data.decode().strip()}")
async def drain(self):
pass
# Simulate chat activity
await chat.add_user("Alice", MockWriter())
await asyncio.sleep(0.5)
await chat.add_user("Bob", MockWriter())
await asyncio.sleep(0.5)
await chat.broadcast("Alice: Hey everyone! ๐")
await asyncio.sleep(0.5)
await chat.broadcast("Bob: Hi Alice! How are you? ๐")
await asyncio.sleep(0.5)
await chat.remove_user("Alice")
asyncio.run(simulate_chat())
Example 3: Task Queue with Worker Pool ๐ญ
import asyncio
import random
class TaskQueue:
def __init__(self, num_workers=3):
self.queue = asyncio.Queue()
self.num_workers = num_workers
self.results = []
async def worker(self, worker_id):
"""Worker coroutine that processes tasks ๐ง"""
while True:
# Get a task from the queue
task = await self.queue.get()
if task is None: # Poison pill to stop worker
self.queue.task_done()
break
print(f"๐ค Worker {worker_id}: Processing {task['name']}...")
# Simulate work
await asyncio.sleep(task['duration'])
result = f"Task {task['name']} completed by Worker {worker_id}"
self.results.append(result)
print(f"โ
Worker {worker_id}: Finished {task['name']}")
self.queue.task_done()
async def add_task(self, name, duration):
"""Add a task to the queue ๐ฅ"""
await self.queue.put({'name': name, 'duration': duration})
async def process_all(self, tasks):
"""Process all tasks with worker pool ๐โโ๏ธ"""
# Start workers
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.num_workers)
]
# Add all tasks to queue
for task in tasks:
await self.add_task(task['name'], task['duration'])
# Wait for all tasks to be processed
await self.queue.join()
# Stop workers
for _ in range(self.num_workers):
await self.queue.put(None)
# Wait for workers to finish
await asyncio.gather(*workers)
return self.results
# Let's process some tasks! ๐
async def main():
tasks = [
{'name': 'Download File A', 'duration': 2},
{'name': 'Process Image B', 'duration': 1.5},
{'name': 'Send Email C', 'duration': 0.5},
{'name': 'Generate Report D', 'duration': 3},
{'name': 'Backup Database E', 'duration': 2.5},
]
print("๐ญ Starting task processing...")
queue = TaskQueue(num_workers=2)
results = await queue.process_all(tasks)
print("\n๐ All tasks completed!")
for result in results:
print(f" โข {result}")
asyncio.run(main())
๐ Advanced Concepts
1. Task Cancellation and Timeouts โฑ๏ธ
import asyncio
async def long_running_task():
try:
print("๐โโ๏ธ Starting long task...")
await asyncio.sleep(10)
print("โ
Task completed!")
return "Success"
except asyncio.CancelledError:
print("โ Task was cancelled!")
# Cleanup code here
raise # Re-raise to properly handle cancellation
async def main():
# Create a task
task = asyncio.create_task(long_running_task())
# Cancel after 2 seconds
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("๐ฏ Successfully cancelled the task")
# Using timeout
print("\nโฐ Trying with timeout...")
try:
async with asyncio.timeout(2): # Python 3.11+
await long_running_task()
except asyncio.TimeoutError:
print("โฑ๏ธ Task timed out!")
# For Python < 3.11, use wait_for
async def main_legacy():
try:
await asyncio.wait_for(long_running_task(), timeout=2)
except asyncio.TimeoutError:
print("โฑ๏ธ Task timed out!")
asyncio.run(main())
2. Task Groups and Exception Handling ๐ก๏ธ
import asyncio
async def risky_task(task_id, should_fail=False):
print(f"๐ฒ Task {task_id} starting...")
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"Task {task_id} failed! ๐ฅ")
return f"Task {task_id} succeeded! โ
"
async def main():
# Using TaskGroup (Python 3.11+)
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(risky_task(1))
task2 = tg.create_task(risky_task(2, should_fail=True))
task3 = tg.create_task(risky_task(3))
except* ValueError as e:
print(f"๐จ Some tasks failed: {e.exceptions}")
# For Python < 3.11, use gather with return_exceptions
results = await asyncio.gather(
risky_task(1),
risky_task(2, should_fail=True),
risky_task(3),
return_exceptions=True
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"โ Task {i} error: {result}")
else:
print(f"โ
Task {i} result: {result}")
# Run with appropriate version
if hasattr(asyncio, 'TaskGroup'):
asyncio.run(main())
else:
# Fallback for older Python versions
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
3. Event Loop Customization ๐๏ธ
import asyncio
import time
class TimedEventLoop(asyncio.BaseEventLoop):
"""Custom event loop that tracks execution time โฑ๏ธ"""
def __init__(self):
super().__init__()
self.task_times = {}
def create_task(self, coro, *, name=None):
task = super().create_task(coro, name=name)
self.task_times[task] = time.time()
# Add callback to measure completion time
task.add_done_callback(self._task_done)
return task
def _task_done(self, task):
if task in self.task_times:
duration = time.time() - self.task_times[task]
print(f"โฑ๏ธ Task {task.get_name()} took {duration:.2f}s")
# More practical: Event loop with debug mode
async def debug_example():
# Enable debug mode
loop = asyncio.get_running_loop()
loop.set_debug(True)
async def slow_operation():
# This will trigger a warning in debug mode!
time.sleep(0.1) # โ Blocking call!
async def fast_operation():
# This is correct โ
await asyncio.sleep(0.1)
# Run both
await fast_operation()
# await slow_operation() # Uncomment to see debug warning
# Run with debug mode
asyncio.run(debug_example(), debug=True)
โ ๏ธ Common Pitfalls and Solutions
1. Forgetting to await ๐ด
# โ Wrong - coroutine not awaited
async def get_data():
return "Important data! ๐"
async def main_wrong():
data = get_data() # This returns a coroutine object!
print(data) # Prints: <coroutine object get_data at ...>
# โ
Correct - properly awaited
async def main_correct():
data = await get_data() # Now we get the actual data!
print(data) # Prints: Important data! ๐
2. Blocking the Event Loop ๐ซ
import time
import asyncio
# โ Wrong - blocks the event loop
async def bad_sleep():
print("๐ด Sleeping (blocking)...")
time.sleep(2) # This blocks everything!
print("๐ด Awake!")
# โ
Correct - non-blocking sleep
async def good_sleep():
print("๐ด Sleeping (async)...")
await asyncio.sleep(2) # Other tasks can run!
print("๐ด Awake!")
# โ
For CPU-bound tasks, use executor
async def cpu_intensive_task():
loop = asyncio.get_running_loop()
# Run in a separate thread
result = await loop.run_in_executor(
None, # Use default executor
lambda: sum(i**2 for i in range(1000000)) # CPU-bound work
)
return result
3. Creating Tasks Incorrectly ๐ง
# โ Wrong - tasks not running concurrently
async def fetch_all_wrong(urls):
results = []
for url in urls:
result = await fetch_data(url) # Sequential! ๐
results.append(result)
return results
# โ
Correct - tasks run concurrently
async def fetch_all_correct(urls):
tasks = [fetch_data(url) for url in urls]
return await asyncio.gather(*tasks) # Concurrent! ๐
# โ
Even better - with create_task
async def fetch_all_better(urls):
tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
return await asyncio.gather(*tasks)
๐ ๏ธ Best Practices
1. Use Context Managers ๐ฏ
import asyncio
import aiofiles # pip install aiofiles
# โ
Good - automatic cleanup
async def read_file_async(filename):
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return content
# โ
For network connections
import aiohttp # pip install aiohttp
async def fetch_json(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
2. Structure Your Async Code ๐๏ธ
class AsyncService:
"""Well-structured async service ๐ฏ"""
def __init__(self):
self.is_running = False
self.tasks = set()
async def start(self):
"""Start the service ๐"""
self.is_running = True
print("๐ข Service started!")
async def stop(self):
"""Gracefully stop the service ๐"""
self.is_running = False
# Cancel all pending tasks
for task in self.tasks:
task.cancel()
# Wait for all tasks to complete
await asyncio.gather(*self.tasks, return_exceptions=True)
self.tasks.clear()
print("๐ด Service stopped!")
async def add_background_task(self, coro):
"""Add a managed background task ๐"""
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
return task
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
# Usage
async def main():
async with AsyncService() as service:
# Service is running! ๐โโ๏ธ
await service.add_background_task(some_background_work())
# Do other stuff...
# Service automatically stopped! ๐
3. Handle Exceptions Properly ๐ก๏ธ
import asyncio
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def resilient_task(task_id):
"""Task with proper error handling ๐ก๏ธ"""
try:
logger.info(f"๐ Task {task_id} starting...")
# Simulate work that might fail
if task_id == 2:
raise ValueError("Simulated error! ๐ฅ")
await asyncio.sleep(1)
return f"Task {task_id} completed! โ
"
except asyncio.CancelledError:
logger.warning(f"โ ๏ธ Task {task_id} cancelled!")
# Cleanup if needed
raise # Always re-raise CancelledError
except Exception as e:
logger.error(f"โ Task {task_id} failed: {e}")
# Could retry, log to monitoring, etc.
return f"Task {task_id} failed: {e}"
async def main():
# Create tasks with proper exception handling
tasks = [resilient_task(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} exception: {result}")
else:
logger.info(f"Task {i} result: {result}")
asyncio.run(main())
๐งช Hands-On Exercise
Ready to put your asyncio skills to the test? Letโs build a mini web crawler! ๐ท๏ธ
Challenge: Create an async web crawler that:
- Fetches multiple โweb pagesโ concurrently
- Extracts โlinksโ from each page
- Follows links up to a certain depth
- Respects rate limiting (max 3 concurrent requests)
- Handles errors gracefully
import asyncio
import random
from typing import Set, List
# Your task: Complete this web crawler! ๐ฏ
class AsyncWebCrawler:
def __init__(self, max_concurrent=3, max_depth=2):
# TODO: Initialize the crawler
pass
async def fetch_page(self, url: str) -> dict:
"""Simulate fetching a web page"""
# TODO: Implement rate-limited page fetching
# Should return {'url': url, 'links': [...], 'content': '...'}
pass
async def crawl(self, start_url: str) -> List[dict]:
"""Crawl starting from the given URL"""
# TODO: Implement the crawling logic
# Should handle multiple depths and avoid duplicate URLs
pass
# Test your crawler!
async def main():
crawler = AsyncWebCrawler(max_concurrent=3, max_depth=2)
results = await crawler.crawl("https://example.com")
print(f"๐ท๏ธ Crawled {len(results)} pages!")
for page in results:
print(f" โข {page['url']} ({len(page['links'])} links)")
# asyncio.run(main())
๐ก Click here for the solution
import asyncio
import random
from typing import Set, List, Dict
class AsyncWebCrawler:
def __init__(self, max_concurrent=3, max_depth=2):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.max_depth = max_depth
self.visited_urls: Set[str] = set()
self.results: List[Dict] = []
async def fetch_page(self, url: str) -> dict:
"""Simulate fetching a web page ๐"""
async with self.semaphore: # Rate limiting! ๐ฆ
print(f"๐ Fetching {url}...")
# Simulate network delay
await asyncio.sleep(random.uniform(0.5, 1.5))
# Simulate page content with random links
num_links = random.randint(2, 5)
links = [
f"{url}/page{i}"
for i in range(num_links)
]
# Simulate occasional errors
if random.random() < 0.1: # 10% chance of error
raise Exception(f"Failed to fetch {url}! ๐ฅ")
page_data = {
'url': url,
'links': links,
'content': f'Content of {url} ๐'
}
print(f"โ
Fetched {url}")
return page_data
async def crawl_url(self, url: str, depth: int):
"""Crawl a single URL at given depth ๐ท๏ธ"""
if depth > self.max_depth or url in self.visited_urls:
return
self.visited_urls.add(url)
try:
page = await self.fetch_page(url)
self.results.append(page)
# Crawl links if not at max depth
if depth < self.max_depth:
tasks = [
self.crawl_url(link, depth + 1)
for link in page['links']
if link not in self.visited_urls
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
print(f"โ Error crawling {url}: {e}")
async def crawl(self, start_url: str) -> List[dict]:
"""Start crawling from the given URL ๐"""
print(f"๐ท๏ธ Starting crawl from {start_url}")
print(f"๐ Max depth: {self.max_depth}, Max concurrent: {self.semaphore._value}")
await self.crawl_url(start_url, depth=0)
print(f"\nโจ Crawl complete! Visited {len(self.visited_urls)} URLs")
return self.results
# Test the crawler! ๐ฎ
async def main():
crawler = AsyncWebCrawler(max_concurrent=3, max_depth=2)
results = await crawler.crawl("https://example.com")
print(f"\n๐ท๏ธ Crawled {len(results)} pages successfully!")
for page in results:
print(f" โข {page['url']} ({len(page['links'])} links found)")
asyncio.run(main())
Fantastic work! Youโve built a concurrent web crawler! ๐ The key concepts demonstrated:
- Semaphore for rate limiting ๐ฆ
- Recursive async crawling with depth control ๐
- Error handling for resilient operation ๐ก๏ธ
- Set tracking to avoid duplicate visits ๐
๐ Key Takeaways
Congratulations! Youโve mastered asyncio fundamentals! ๐ Hereโs what youโve learned:
- Event Loop ๐ - The conductor orchestrating your async symphony
- Coroutines & Tasks ๐ฏ - Your async building blocks for concurrent execution
- Concurrency Patterns ๐ - Running multiple operations without blocking
- Error Handling ๐ก๏ธ - Gracefully managing failures in async code
- Best Practices ๐ - Writing clean, efficient, and maintainable async code
Remember:
- Use
async
/await
for I/O-bound operations ๐ - Donโt block the event loop with synchronous code ๐ซ
- Always handle exceptions properly ๐ก๏ธ
- Use context managers for resource cleanup ๐งน
๐ค Next Steps
Youโre now ready to build amazing concurrent applications! Hereโs what to explore next:
- ๐ง AsyncIO Streams - Network programming with asyncio
- ๐ Synchronization Primitives - Locks, events, and conditions
- ๐ Async Web Frameworks - FastAPI, aiohttp, Quart
- ๐ Async Database Operations - asyncpg, motor, aioredis
- ๐ฎ Real-time Applications - WebSockets, chat servers, game servers
Keep practicing, and remember: concurrency is like juggling ๐คนโโ๏ธ - it gets easier with practice! Youโve got this! ๐ช
Happy async coding! ๐โจ