Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand asyncio gather and wait fundamentals ๐ฏ
- Apply concurrent patterns in real projects ๐๏ธ
- Debug common asyncio issues ๐
- Write clean, efficient async code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on asyncio patterns! ๐ In this guide, weโll explore the powerful gather
and wait
functions that make concurrent programming in Python a breeze.
Have you ever waited for multiple web requests to complete? Or needed to run several database queries at once? Thatโs where asyncioโs concurrent execution patterns shine! ๐ Instead of waiting for tasks one by one (boring! ๐ด), weโll learn how to run them simultaneously and handle their results elegantly.
By the end of this tutorial, youโll feel confident using asyncio.gather()
and asyncio.wait()
to turbocharge your Python applications! Letโs dive in! ๐โโ๏ธ
๐ Understanding Asyncio Patterns
๐ค What are Gather and Wait?
Think of gather
and wait
as your async task coordinators ๐ญ. Theyโre like a restaurant manager who can handle multiple orders at once instead of making customers wait in a single line!
asyncio.gather()
is like a careful waiter who collects all the dishes and serves them together in order ๐ฝ๏ธasyncio.wait()
is like a flexible manager who tells you which tasks are done, letting you handle them as they complete ๐ฏ
In Python terms, these functions help you run multiple coroutines concurrently and manage their results. This means you can:
- โจ Execute multiple operations simultaneously
- ๐ Significantly reduce waiting time
- ๐ก๏ธ Handle errors gracefully
- ๐ Process results as they arrive
๐ก Why Use Concurrent Patterns?
Hereโs why developers love these patterns:
- Speed Boost โก: Run operations in parallel instead of sequentially
- Resource Efficiency ๐ป: Better CPU and I/O utilization
- Responsive Applications ๐ฎ: Keep your app responsive while processing
- Scalability ๐: Handle more operations without blocking
Real-world example: Imagine fetching data from 5 different APIs ๐. Sequential approach: 5 seconds. Concurrent approach with gather: 1 second! Thatโs the power weโre unlocking today! ๐ช
๐ง Basic Syntax and Usage
๐ asyncio.gather() - The Orderly Approach
Letโs start with gather
, your go-to for running tasks and collecting results in order:
import asyncio
import time
# ๐ฏ Simulating async operations
async def fetch_user(user_id: int) -> dict:
print(f"๐ Fetching user {user_id}...")
await asyncio.sleep(1) # Simulate network delay
return {"id": user_id, "name": f"User {user_id}", "emoji": "๐"}
async def fetch_posts(user_id: int) -> list:
print(f"๐ Fetching posts for user {user_id}...")
await asyncio.sleep(1.5) # Simulate slower operation
return [{"title": f"Post {i}", "emoji": "๐"} for i in range(3)]
async def fetch_comments(post_id: int) -> list:
print(f"๐ฌ Fetching comments for post {post_id}...")
await asyncio.sleep(0.5)
return [{"text": "Great post! ๐"}, {"text": "Thanks for sharing! ๐"}]
# ๐ Using gather to run everything concurrently
async def main():
start_time = time.time()
# Run all operations at once and get results in order
user, posts, comments = await asyncio.gather(
fetch_user(1),
fetch_posts(1),
fetch_comments(1)
)
elapsed = time.time() - start_time
print(f"\nโจ All done in {elapsed:.2f} seconds!")
print(f"๐ค User: {user['name']} {user['emoji']}")
print(f"๐ Posts: {len(posts)} posts found")
print(f"๐ฌ Comments: {len(comments)} comments")
# Run it!
asyncio.run(main())
๐ก Explanation: Notice how all three operations start immediately! Instead of waiting 3 seconds (1 + 1.5 + 0.5), we only wait 1.5 seconds (the longest operation). The results come back in the order we specified, making it easy to unpack them.
๐ฏ asyncio.wait() - The Flexible Approach
Now letโs explore wait
, which gives you more control over task completion:
# ๐๏ธ Using wait for flexible task handling
async def process_data(data_id: int, delay: float) -> str:
print(f"๐ Processing data {data_id}...")
await asyncio.sleep(delay)
if data_id == 3: # Simulate an error
raise ValueError(f"Oops! Data {data_id} is corrupted ๐ฑ")
return f"โ
Data {data_id} processed!"
async def main_with_wait():
# Create tasks
tasks = [
asyncio.create_task(process_data(1, 2.0)),
asyncio.create_task(process_data(2, 1.0)),
asyncio.create_task(process_data(3, 1.5)), # This will error!
asyncio.create_task(process_data(4, 0.5)),
]
# Wait for tasks with different strategies
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"\n๐ First task completed!")
for task in done:
try:
result = task.result()
print(f" {result}")
except Exception as e:
print(f" โ Error: {e}")
print(f"\nโณ Still pending: {len(pending)} tasks")
asyncio.run(main_with_wait())
๐ก Practical Examples
๐ Example 1: E-commerce Price Checker
Letโs build a real-world price comparison tool:
# ๐๏ธ Checking prices across multiple stores
async def check_price(store: str, product: str) -> dict:
delays = {"Amazon": 1.2, "eBay": 0.8, "Walmart": 1.5, "Target": 0.9}
price_ranges = {"Amazon": (50, 60), "eBay": (45, 55), "Walmart": (48, 58), "Target": (52, 62)}
print(f"๐ Checking {store} for {product}...")
await asyncio.sleep(delays.get(store, 1.0))
# Simulate price calculation
import random
min_price, max_price = price_ranges.get(store, (50, 60))
price = random.uniform(min_price, max_price)
return {
"store": store,
"product": product,
"price": round(price, 2),
"emoji": "๐ช",
"in_stock": random.choice([True, True, False]) # 66% chance in stock
}
async def find_best_deal(product: str):
stores = ["Amazon", "eBay", "Walmart", "Target"]
print(f"๐ Finding best price for: {product}\n")
start_time = time.time()
# Check all stores concurrently
results = await asyncio.gather(
*[check_price(store, product) for store in stores],
return_exceptions=True # Don't fail if one store is down
)
# Process results
valid_results = []
for result in results:
if isinstance(result, Exception):
print(f"โ Error checking store: {result}")
elif result["in_stock"]:
valid_results.append(result)
print(f"โ
{result['store']}: ${result['price']} {result['emoji']}")
else:
print(f"โ ๏ธ {result['store']}: Out of stock!")
# Find best deal
if valid_results:
best_deal = min(valid_results, key=lambda x: x["price"])
print(f"\n๐ Best deal: {best_deal['store']} at ${best_deal['price']}!")
else:
print("\n๐ No stores have the product in stock!")
print(f"โฑ๏ธ Search completed in {time.time() - start_time:.2f} seconds")
# ๐ฎ Let's shop!
asyncio.run(find_best_deal("Gaming Laptop"))
๐ฎ Example 2: Game Server Health Monitor
Monitor multiple game servers simultaneously:
# ๐ฅ Health monitoring system
async def ping_server(server_name: str, ip: str) -> dict:
# Simulate different server response times
response_times = {
"US-East": 0.05,
"US-West": 0.08,
"Europe": 0.15,
"Asia": 0.25,
"Australia": 0.30
}
print(f"๐ก Pinging {server_name} ({ip})...")
# Simulate network latency
base_time = response_times.get(server_name, 0.1)
jitter = random.uniform(-0.02, 0.02)
await asyncio.sleep(base_time + jitter)
# Simulate server health
health_score = random.randint(85, 100)
status = "๐ข" if health_score > 95 else "๐ก" if health_score > 85 else "๐ด"
return {
"server": server_name,
"ip": ip,
"ping": int((base_time + jitter) * 1000), # Convert to ms
"health": health_score,
"status": status,
"players": random.randint(100, 1000)
}
async def monitor_game_servers():
servers = {
"US-East": "192.168.1.10",
"US-West": "192.168.1.20",
"Europe": "192.168.1.30",
"Asia": "192.168.1.40",
"Australia": "192.168.1.50"
}
print("๐ฎ Game Server Health Monitor\n")
while True:
print("=" * 50)
print(f"๐ Checking servers at {time.strftime('%H:%M:%S')}")
# Create tasks for all servers
tasks = [
asyncio.create_task(ping_server(name, ip))
for name, ip in servers.items()
]
# Wait for all with timeout
try:
done, pending = await asyncio.wait(tasks, timeout=1.0)
# Process completed pings
results = []
for task in done:
result = await task
results.append(result)
print(f"{result['status']} {result['server']}: "
f"{result['ping']}ms | "
f"Health: {result['health']}% | "
f"Players: {result['players']}")
# Handle timeouts
for task in pending:
task.cancel()
print(f"โ ๏ธ Timeout waiting for response")
# Summary
if results:
avg_ping = sum(r['ping'] for r in results) / len(results)
total_players = sum(r['players'] for r in results)
print(f"\n๐ Average ping: {avg_ping:.1f}ms | "
f"Total players: {total_players}")
except Exception as e:
print(f"โ Monitor error: {e}")
print("\n๐ค Waiting 5 seconds before next check...")
await asyncio.sleep(5)
# Run for a bit then stop (in production, this would run forever)
async def run_monitor():
monitor_task = asyncio.create_task(monitor_game_servers())
await asyncio.sleep(15) # Run for 15 seconds
monitor_task.cancel()
# asyncio.run(run_monitor()) # Uncomment to run!
๐ Advanced Concepts
๐งโโ๏ธ Advanced Pattern: Task Groups and Exception Handling
When you need fine-grained control over concurrent tasks:
# ๐ฏ Advanced task management with asyncio.TaskGroup (Python 3.11+)
async def risky_operation(task_id: int) -> str:
await asyncio.sleep(random.uniform(0.5, 2.0))
# Randomly fail some tasks
if random.random() < 0.3: # 30% failure rate
raise RuntimeError(f"Task {task_id} failed! ๐ฅ")
return f"Task {task_id} succeeded! โจ"
async def robust_gather(*coroutines):
"""
๐ก๏ธ A more robust version of gather that handles failures gracefully
"""
results = []
# Create tasks
tasks = [asyncio.create_task(coro) for coro in coroutines]
# Wait for all to complete (including failures)
done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
# Collect results
for task in tasks:
try:
result = task.result()
results.append(("success", result))
except Exception as e:
results.append(("error", str(e)))
return results
async def advanced_example():
print("๐ Running advanced concurrent operations\n")
# Create many tasks
operations = [risky_operation(i) for i in range(10)]
# Method 1: Using our robust gather
print("๐ Method 1: Robust Gather")
results = await robust_gather(*operations)
success_count = sum(1 for status, _ in results if status == "success")
print(f"โ
Succeeded: {success_count}/10")
print(f"โ Failed: {10 - success_count}/10\n")
# Method 2: Using wait with as_completed
print("๐ Method 2: Process as completed")
tasks = [asyncio.create_task(risky_operation(i)) for i in range(10, 20)]
for task in asyncio.as_completed(tasks):
try:
result = await task
print(f" โ
{result}")
except Exception as e:
print(f" โ {e}")
# asyncio.run(advanced_example())
๐๏ธ Advanced Pattern: Semaphore-Limited Concurrency
Control how many operations run simultaneously:
# ๐ฆ Rate limiting with semaphores
async def fetch_api_data(session_id: int, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # Acquire semaphore
print(f"๐ Session {session_id} started...")
# Simulate API call
await asyncio.sleep(random.uniform(1, 3))
data = {
"session": session_id,
"data": f"Result from session {session_id}",
"timestamp": time.time()
}
print(f"โ
Session {session_id} completed!")
return data
async def rate_limited_gathering():
# ๐ฆ Limit to 3 concurrent operations
semaphore = asyncio.Semaphore(3)
print("๐ฏ Starting rate-limited operations (max 3 concurrent)\n")
# Create 10 tasks
tasks = [
fetch_api_data(i, semaphore)
for i in range(10)
]
# Gather all results
start_time = time.time()
results = await asyncio.gather(*tasks)
print(f"\nโฑ๏ธ All operations completed in {time.time() - start_time:.2f} seconds")
print(f"๐ Processed {len(results)} items with max 3 concurrent operations")
# asyncio.run(rate_limited_gathering())
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Not Handling Exceptions in gather()
# โ Wrong way - one failure crashes everything!
async def fragile_gather():
async def good_task():
await asyncio.sleep(1)
return "Success! ๐"
async def bad_task():
await asyncio.sleep(0.5)
raise ValueError("I'm a troublemaker! ๐")
try:
# This will raise ValueError and lose good_task's result
results = await asyncio.gather(good_task(), bad_task())
except ValueError:
print("Lost all results! ๐ญ")
# โ
Correct way - handle exceptions gracefully!
async def robust_gather():
async def good_task():
await asyncio.sleep(1)
return "Success! ๐"
async def bad_task():
await asyncio.sleep(0.5)
raise ValueError("I'm a troublemaker! ๐")
# Use return_exceptions=True to get all results
results = await asyncio.gather(
good_task(),
bad_task(),
return_exceptions=True # ๐ก๏ธ This is the key!
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result} โ")
else:
print(f"Task {i} succeeded: {result} โ
")
๐คฏ Pitfall 2: Creating Tasks Wrong
# โ Dangerous - tasks start immediately without await!
async def wrong_task_creation():
# These start running immediately!
task1 = fetch_user(1) # Already running!
task2 = fetch_user(2) # Already running!
# Some other code...
await asyncio.sleep(5)
# These might already be done or cancelled!
results = await asyncio.gather(task1, task2)
# โ
Safe - proper task creation!
async def correct_task_creation():
# Create proper tasks
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
# Tasks are tracked and won't be garbage collected
await asyncio.sleep(5)
# Safe to gather
results = await asyncio.gather(task1, task2)
๐ ๏ธ Best Practices
- ๐ฏ Choose the Right Tool: Use
gather()
when you need results in order,wait()
when you need flexibility - ๐ก๏ธ Always Handle Exceptions: Use
return_exceptions=True
or wrap in try/except - ๐ Limit Concurrency: Use semaphores to prevent overwhelming resources
- ๐งน Clean Up Tasks: Cancel pending tasks when done
- โฑ๏ธ Set Timeouts: Protect against hanging operations
- ๐ Log Progress: Help debugging with clear status messages
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Weather Dashboard
Create a concurrent weather fetching system:
๐ Requirements:
- โ Fetch weather for multiple cities concurrently
- ๐ท๏ธ Handle API failures gracefully
- โฑ๏ธ Implement timeout for slow responses
- ๐ Calculate average temperature across all cities
- ๐จ Display results with weather emojis!
๐ Bonus Points:
- Rate limit to 3 concurrent API calls
- Retry failed requests once
- Cache results for 5 minutes
๐ก Solution
๐ Click to see solution
# ๐ค๏ธ Weather Dashboard Solution
import asyncio
import random
import time
from datetime import datetime
class WeatherAPI:
"""๐ Simulated weather API"""
def __init__(self):
self.cache = {}
self.cache_duration = 300 # 5 minutes
async def fetch_weather(self, city: str, semaphore: asyncio.Semaphore = None) -> dict:
# Check cache first
if city in self.cache:
cached_time, cached_data = self.cache[city]
if time.time() - cached_time < self.cache_duration:
print(f"๐ฆ Using cached data for {city}")
return cached_data
if semaphore:
async with semaphore:
return await self._actual_fetch(city)
else:
return await self._actual_fetch(city)
async def _actual_fetch(self, city: str) -> dict:
print(f"๐ Fetching weather for {city}...")
# Simulate API delay
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
# Simulate occasional failures
if random.random() < 0.2: # 20% failure rate
raise ConnectionError(f"Failed to fetch weather for {city} ๐ข")
# Generate weather data
temp = random.uniform(-10, 35)
conditions = [
("Sunny", "โ๏ธ", (25, 35)),
("Cloudy", "โ๏ธ", (15, 25)),
("Rainy", "๐ง๏ธ", (10, 20)),
("Snowy", "โ๏ธ", (-10, 5)),
("Stormy", "โ๏ธ", (5, 15))
]
# Pick condition based on temperature
for condition, emoji, (min_temp, max_temp) in conditions:
if min_temp <= temp <= max_temp:
weather_condition = condition
weather_emoji = emoji
break
else:
weather_condition = "Cloudy"
weather_emoji = "โ๏ธ"
data = {
"city": city,
"temperature": round(temp, 1),
"condition": weather_condition,
"emoji": weather_emoji,
"humidity": random.randint(30, 90),
"wind_speed": random.randint(5, 30),
"timestamp": datetime.now().strftime("%H:%M:%S")
}
# Cache the result
self.cache[city] = (time.time(), data)
return data
async def fetch_with_retry(api: WeatherAPI, city: str, semaphore: asyncio.Semaphore) -> dict:
"""๐ Fetch with one retry on failure"""
for attempt in range(2):
try:
return await api.fetch_weather(city, semaphore)
except ConnectionError as e:
if attempt == 0:
print(f"โ ๏ธ Retrying {city}...")
await asyncio.sleep(0.5)
else:
return {"city": city, "error": str(e), "emoji": "โ"}
async def weather_dashboard(cities: list):
"""๐ Main weather dashboard"""
api = WeatherAPI()
semaphore = asyncio.Semaphore(3) # Rate limit to 3 concurrent requests
print("๐ค๏ธ Weather Dashboard")
print("=" * 50)
print(f"๐ Checking weather for {len(cities)} cities...\n")
start_time = time.time()
# Create tasks with timeout
tasks = []
for city in cities:
task = asyncio.create_task(
asyncio.wait_for(
fetch_with_retry(api, city, semaphore),
timeout=3.0 # 3 second timeout
)
)
tasks.append(task)
# Gather all results
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_results = []
failed_cities = []
print("\n๐ Weather Report:")
print("-" * 50)
for result in results:
if isinstance(result, asyncio.TimeoutError):
print(f"โฑ๏ธ Timeout: Request took too long")
elif isinstance(result, Exception):
print(f"โ Error: {result}")
elif "error" in result:
failed_cities.append(result["city"])
print(f"{result['emoji']} {result['city']}: {result['error']}")
else:
successful_results.append(result)
print(f"{result['emoji']} {result['city']}: "
f"{result['temperature']}ยฐC, {result['condition']}, "
f"๐จ {result['wind_speed']}km/h")
# Calculate statistics
if successful_results:
avg_temp = sum(r['temperature'] for r in successful_results) / len(successful_results)
print(f"\n๐ Average temperature: {avg_temp:.1f}ยฐC")
print(f"โ
Successfully fetched: {len(successful_results)} cities")
if failed_cities:
print(f"โ Failed to fetch: {len(failed_cities)} cities")
elapsed = time.time() - start_time
print(f"\nโฑ๏ธ Dashboard updated in {elapsed:.2f} seconds")
# Test cache by re-fetching
print("\n๐ Testing cache (re-fetching London)...")
if "London" in cities:
cached_result = await api.fetch_weather("London")
print(f"โ
Cache works! Got instant result for London")
# Test the dashboard
async def main():
cities = [
"London", "Paris", "Tokyo", "New York", "Sydney",
"Moscow", "Beijing", "Mumbai", "Cairo", "Rio"
]
await weather_dashboard(cities)
# Run again to see caching in action
print("\n" + "=" * 50)
print("๐ Running again to demonstrate caching...\n")
await weather_dashboard(cities[:5]) # Just first 5 cities
# asyncio.run(main())
๐ Key Takeaways
Youโve mastered concurrent programming patterns! Hereโs what you can now do:
- โ Use asyncio.gather() to run multiple tasks and collect ordered results ๐ช
- โ Apply asyncio.wait() for flexible task completion handling ๐ก๏ธ
- โ Handle exceptions gracefully in concurrent operations ๐ฏ
- โ Implement rate limiting with semaphores ๐ฆ
- โ Build robust async applications with proper error handling! ๐
Remember: Concurrency is about doing multiple things at once, not about doing them faster individually. Itโs perfect for I/O-bound operations like network requests, file operations, and database queries! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve unlocked the power of concurrent Python programming!
Hereโs what to do next:
- ๐ป Practice with the weather dashboard exercise
- ๐๏ธ Apply these patterns to your own projects (API aggregators, web scrapers, etc.)
- ๐ Explore more asyncio features like streams and queues
- ๐ Share your concurrent creations with the Python community!
Your next adventure awaits with more advanced asyncio patterns. Keep coding, keep learning, and remember - with great concurrency comes great performance! ๐
Happy async coding! ๐๐โจ