Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on concurrent.futures! ๐ Have you ever waited for your Python program to download multiple files, one after another, feeling like time is crawling? What if I told you thereโs a magical way to do many things at once?
Youโll discover how concurrent.futures provides a simple, unified interface for running tasks concurrently. Whether youโre processing images ๐ธ, scraping websites ๐, or crunching numbers ๐, understanding concurrent.futures is essential for writing fast, efficient Python programs.
By the end of this tutorial, youโll feel confident using concurrent execution in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Concurrent.futures
๐ค What is Concurrent.futures?
Concurrent.futures is like having a team of helpers ๐ฅ instead of doing everything yourself. Think of it as a restaurant kitchen ๐ณ where multiple chefs can prepare different dishes simultaneously, rather than one chef cooking everything sequentially.
In Python terms, concurrent.futures provides high-level interfaces for asynchronously executing functions using threads or processes. This means you can:
- โจ Run multiple tasks simultaneously
- ๐ Speed up I/O-bound operations with threads
- ๐ก๏ธ Leverage multiple CPU cores with processes
- ๐ฏ Use the same interface for both approaches
๐ก Why Use Concurrent.futures?
Hereโs why developers love concurrent.futures:
- Unified Interface ๐: Same API for threads and processes
- Simple to Use ๐ป: Easier than manual thread/process management
- Future Objects ๐: Track and manage async results elegantly
- Built-in Features ๐ง: Timeouts, callbacks, and exception handling
Real-world example: Imagine downloading 100 images ๐ธ. With concurrent.futures, you can download them all at once instead of waiting for each one to finish!
๐ง Basic Syntax and Usage
๐ Simple Example with ThreadPoolExecutor
Letโs start with a friendly example:
# ๐ Hello, concurrent.futures!
import concurrent.futures
import time
# ๐จ A simple task that takes time
def greet_slowly(name):
time.sleep(1) # ๐ด Simulate slow operation
return f"Hello, {name}! ๐"
# ๐ Using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit a single task
future = executor.submit(greet_slowly, "Python")
# ๐ฏ Get the result
result = future.result()
print(result) # Hello, Python! ๐
๐ก Explanation: Notice how we use a context manager (with
) to handle the executor lifecycle automatically! The submit()
method returns a Future object immediately.
๐ฏ Common Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Multiple tasks with map
import concurrent.futures
def process_item(item):
# ๐จ Do something with the item
return f"Processed: {item} โ
"
items = ["apple", "banana", "cherry"]
# ๐ Process all items concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(process_item, items))
print(results)
# ๐จ Pattern 2: Submit multiple tasks
with concurrent.futures.ThreadPoolExecutor() as executor:
# ๐ฆ Submit tasks and collect futures
futures = [executor.submit(process_item, item) for item in items]
# ๐ฏ Get results as they complete
for future in concurrent.futures.as_completed(futures):
print(future.result())
# ๐ Pattern 3: ProcessPoolExecutor for CPU-bound tasks
def cpu_intensive_task(n):
# ๐ฅ Simulate CPU-intensive work
total = sum(i * i for i in range(n))
return f"Sum of squares up to {n}: {total} ๐ฏ"
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(cpu_intensive_task, 1000000)
print(future.result())
๐ก Practical Examples
๐ Example 1: Web Scraper
Letโs build something real - a concurrent web scraper:
# ๐ท๏ธ Concurrent web scraper
import concurrent.futures
import requests
import time
def fetch_url(url):
"""๐ Fetch content from a URL"""
try:
response = requests.get(url, timeout=5)
return {
"url": url,
"status": response.status_code,
"size": len(response.content),
"emoji": "โ
" if response.status_code == 200 else "โ"
}
except Exception as e:
return {
"url": url,
"status": "error",
"error": str(e),
"emoji": "๐ฅ"
}
# ๐ฏ URLs to scrape
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500"
]
# โฑ๏ธ Sequential approach (slow)
print("๐ Sequential scraping...")
start_time = time.time()
sequential_results = [fetch_url(url) for url in urls]
sequential_time = time.time() - start_time
# ๐ Concurrent approach (fast!)
print("\n๐ Concurrent scraping...")
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
concurrent_results = list(executor.map(fetch_url, urls))
concurrent_time = time.time() - start_time
# ๐ Compare results
print(f"\n๐ Results:")
print(f"Sequential time: {sequential_time:.2f}s ๐")
print(f"Concurrent time: {concurrent_time:.2f}s ๐")
print(f"Speed up: {sequential_time/concurrent_time:.2f}x faster! ๐")
# ๐ Show results
for result in concurrent_results:
print(f"{result['emoji']} {result['url']} - Status: {result['status']}")
๐ฏ Try it yourself: Add retry logic for failed requests and progress tracking!
๐ธ Example 2: Image Processor
Letโs make a concurrent image processor:
# ๐ผ๏ธ Concurrent image processor
import concurrent.futures
from PIL import Image
import os
import time
class ImageProcessor:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.processed_count = 0
def resize_image(self, image_path, size=(300, 300)):
"""๐ธ Resize a single image"""
try:
# ๐จ Open and resize
with Image.open(image_path) as img:
filename = os.path.basename(image_path)
# ๐ Create thumbnail
img.thumbnail(size)
# ๐พ Save with new name
output_path = f"thumbnail_{filename}"
img.save(output_path)
self.processed_count += 1
return {
"status": "success",
"input": image_path,
"output": output_path,
"emoji": "โ
"
}
except Exception as e:
return {
"status": "error",
"input": image_path,
"error": str(e),
"emoji": "โ"
}
def process_batch(self, image_paths):
"""๐ Process multiple images concurrently"""
print(f"๐ผ๏ธ Processing {len(image_paths)} images...")
results = []
start_time = time.time()
# ๐ฏ Use ProcessPoolExecutor for CPU-intensive image processing
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
# ๐ Submit all tasks
future_to_path = {
executor.submit(self.resize_image, path): path
for path in image_paths
}
# ๐จ Process results as they complete
for future in concurrent.futures.as_completed(future_to_path):
path = future_to_path[future]
try:
result = future.result()
results.append(result)
print(f"{result['emoji']} Processed: {os.path.basename(path)}")
except Exception as e:
print(f"โ Failed: {path} - {e}")
# ๐ Summary
elapsed = time.time() - start_time
success_count = sum(1 for r in results if r["status"] == "success")
print(f"\n๐ Processing complete!")
print(f"โ
Success: {success_count}/{len(image_paths)}")
print(f"โฑ๏ธ Time: {elapsed:.2f}s")
print(f"๐ Speed: {len(image_paths)/elapsed:.2f} images/second")
return results
# ๐ฎ Usage example
processor = ImageProcessor(max_workers=4)
# image_paths = ["photo1.jpg", "photo2.jpg", "photo3.jpg"] # Your images
# results = processor.process_batch(image_paths)
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Future Callbacks and Chaining
When youโre ready to level up, try this advanced pattern:
# ๐ฏ Advanced future handling
import concurrent.futures
import time
def fetch_data(item_id):
"""๐ฆ Simulate fetching data"""
time.sleep(1)
return {"id": item_id, "data": f"Item {item_id} data ๐"}
def process_data(data):
"""๐ง Process the fetched data"""
return {"processed": data["data"].upper(), "emoji": "โจ"}
def save_result(result):
"""๐พ Save the processed result"""
print(f"๐พ Saving: {result['processed']}")
return {"saved": True, "emoji": "โ
"}
# ๐ช Future callback chaining
with concurrent.futures.ThreadPoolExecutor() as executor:
# ๐ฏ Submit initial task
future1 = executor.submit(fetch_data, 42)
# ๐ Chain operations using callbacks
def on_fetch_complete(future):
try:
data = future.result()
print(f"๐ฆ Fetched: {data}")
# Submit next task
future2 = executor.submit(process_data, data)
future2.add_done_callback(on_process_complete)
except Exception as e:
print(f"โ Fetch failed: {e}")
def on_process_complete(future):
try:
result = future.result()
print(f"โจ Processed: {result}")
# Submit final task
future3 = executor.submit(save_result, result)
future3.add_done_callback(lambda f: print(f"๐ Pipeline complete!"))
except Exception as e:
print(f"โ Process failed: {e}")
# ๐ Start the chain
future1.add_done_callback(on_fetch_complete)
# Wait for completion
time.sleep(3)
๐๏ธ Advanced Topic 2: Custom Executor with Progress Tracking
For the brave developers:
# ๐ Custom executor with progress tracking
import concurrent.futures
from typing import Callable, List, Any
import threading
import time
class ProgressExecutor:
"""๐ Executor with built-in progress tracking"""
def __init__(self, max_workers=4, executor_class=concurrent.futures.ThreadPoolExecutor):
self.max_workers = max_workers
self.executor_class = executor_class
self.total_tasks = 0
self.completed_tasks = 0
self.lock = threading.Lock()
def map_with_progress(self, func: Callable, items: List[Any], desc: str = "Processing"):
"""๐ฏ Map function with progress bar"""
self.total_tasks = len(items)
self.completed_tasks = 0
results = []
def wrapped_func(item):
# ๐ง Execute the function
result = func(item)
# ๐ Update progress
with self.lock:
self.completed_tasks += 1
progress = self.completed_tasks / self.total_tasks * 100
print(f"\r{desc}: {self.completed_tasks}/{self.total_tasks} "
f"[{'=' * int(progress/5):<20}] {progress:.1f}% ", end="")
return result
# ๐ Execute with progress tracking
with self.executor_class(max_workers=self.max_workers) as executor:
results = list(executor.map(wrapped_func, items))
print(f"\nโ
{desc} complete!")
return results
def submit_batch(self, tasks: List[tuple], timeout: float = None):
"""๐ฆ Submit multiple tasks with timeout support"""
futures = []
results = []
with self.executor_class(max_workers=self.max_workers) as executor:
# ๐ฏ Submit all tasks
for func, args in tasks:
future = executor.submit(func, *args)
futures.append(future)
# โฑ๏ธ Wait with timeout
done, not_done = concurrent.futures.wait(
futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED
)
# ๐ Collect results
for future in done:
try:
results.append(future.result())
except Exception as e:
results.append({"error": str(e), "emoji": "โ"})
# โ ๏ธ Handle timeouts
for future in not_done:
future.cancel()
results.append({"error": "Timeout", "emoji": "โฑ๏ธ"})
return results
# ๐ฎ Usage example
def slow_task(n):
time.sleep(0.1)
return n * n
# Create progress executor
progress_exec = ProgressExecutor(max_workers=10)
# Run with progress
numbers = list(range(50))
squares = progress_exec.map_with_progress(slow_task, numbers, "Computing squares")
print(f"๐ฏ Results: {squares[:5]}... (showing first 5)")
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Not Handling Exceptions
# โ Wrong way - exceptions silently fail!
import concurrent.futures
def risky_operation(x):
if x == 0:
raise ValueError("Cannot process zero! ๐ฐ")
return 10 / x
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_operation, i) for i in range(-2, 3)]
# This will crash when accessing results!
# results = [f.result() for f in futures] # ๐ฅ
# โ
Correct way - handle exceptions properly!
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_operation, i) for i in range(-2, 3)]
results = []
for future in futures:
try:
result = future.result()
results.append({"value": result, "status": "โ
"})
except Exception as e:
results.append({"error": str(e), "status": "โ"})
# ๐ Show results
for i, result in enumerate(results):
print(f"Task {i}: {result}")
๐คฏ Pitfall 2: Wrong Executor Choice
# โ Dangerous - using threads for CPU-bound tasks!
import concurrent.futures
import time
def cpu_intensive(n):
# ๐ฅ CPU-bound operation
total = 0
for i in range(n):
total += i * i
return total
# ๐ Slow with threads (GIL blocks true parallelism)
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(cpu_intensive, 10000000) for _ in range(4)]
thread_results = [f.result() for f in futures]
thread_time = time.time() - start
# โ
Fast with processes (true parallelism!)
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(cpu_intensive, 10000000) for _ in range(4)]
process_results = [f.result() for f in futures]
process_time = time.time() - start
print(f"๐ Thread time: {thread_time:.2f}s")
print(f"๐ Process time: {process_time:.2f}s")
print(f"โก Speed improvement: {thread_time/process_time:.2f}x")
๐ ๏ธ Best Practices
- ๐ฏ Choose the Right Executor: ThreadPoolExecutor for I/O, ProcessPoolExecutor for CPU
- ๐ Use Context Managers: Always use
with
statements for automatic cleanup - ๐ก๏ธ Handle Exceptions: Always wrap
future.result()
in try-except - ๐จ Set Max Workers: Donโt create too many threads/processes
- โจ Use as_completed(): For processing results as they finish
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Concurrent File Processor
Create a concurrent file processing system:
๐ Requirements:
- โ Process multiple text files concurrently
- ๐ท๏ธ Count words, lines, and characters in each file
- ๐ค Support different processing modes (analyze, transform, validate)
- ๐ Add timeout support for long-running operations
- ๐จ Include progress tracking and statistics!
๐ Bonus Points:
- Add file filtering by extension
- Implement retry logic for failed files
- Create a summary report with emoji indicators
๐ก Solution
๐ Click to see solution
# ๐ฏ Concurrent file processor system!
import concurrent.futures
import os
import time
from pathlib import Path
from typing import Dict, List, Optional
import threading
class FileProcessor:
"""๐ Concurrent file processing system"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.processed_count = 0
self.lock = threading.Lock()
def analyze_file(self, file_path: str) -> Dict:
"""๐ Analyze a single file"""
try:
path = Path(file_path)
# ๐ Read file content
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
# ๐ Calculate statistics
stats = {
"file": path.name,
"path": str(path),
"size_bytes": path.stat().st_size,
"lines": len(content.splitlines()),
"words": len(content.split()),
"characters": len(content),
"emoji": "๐"
}
# ๐ Update progress
with self.lock:
self.processed_count += 1
return {"status": "success", "stats": stats, "emoji": "โ
"}
except Exception as e:
return {
"status": "error",
"file": file_path,
"error": str(e),
"emoji": "โ"
}
def transform_file(self, file_path: str, operation: str = "uppercase") -> Dict:
"""๐ Transform file content"""
try:
# ๐ Read content
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# ๐จ Apply transformation
if operation == "uppercase":
transformed = content.upper()
emoji = "๐ "
elif operation == "lowercase":
transformed = content.lower()
emoji = "๐ก"
elif operation == "reverse":
transformed = content[::-1]
emoji = "๐"
else:
transformed = content
emoji = "โ"
# ๐พ Save transformed file
output_path = f"transformed_{os.path.basename(file_path)}"
with open(output_path, 'w', encoding='utf-8') as f:
f.write(transformed)
return {
"status": "success",
"input": file_path,
"output": output_path,
"operation": operation,
"emoji": emoji
}
except Exception as e:
return {
"status": "error",
"file": file_path,
"error": str(e),
"emoji": "โ"
}
def process_files(self, file_paths: List[str], mode: str = "analyze",
timeout: Optional[float] = None) -> List[Dict]:
"""๐ Process multiple files concurrently"""
print(f"๐ Processing {len(file_paths)} files in {mode} mode...")
self.processed_count = 0
# ๐ฏ Choose processing function
if mode == "analyze":
process_func = self.analyze_file
elif mode == "transform":
process_func = self.transform_file
else:
raise ValueError(f"Unknown mode: {mode}")
results = []
start_time = time.time()
# ๐ง Use ThreadPoolExecutor for I/O-bound file operations
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# ๐ฆ Submit all tasks
future_to_file = {
executor.submit(process_func, path): path
for path in file_paths
}
# โฑ๏ธ Process with timeout support
done, not_done = concurrent.futures.wait(
future_to_file.keys(),
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED
)
# ๐ Collect completed results
for future in done:
file_path = future_to_file[future]
try:
result = future.result()
results.append(result)
print(f"{result['emoji']} Processed: {os.path.basename(file_path)}")
except Exception as e:
results.append({
"status": "error",
"file": file_path,
"error": str(e),
"emoji": "๐ฅ"
})
# โฑ๏ธ Handle timeouts
for future in not_done:
file_path = future_to_file[future]
future.cancel()
results.append({
"status": "timeout",
"file": file_path,
"emoji": "โฑ๏ธ"
})
# ๐ Generate summary
elapsed = time.time() - start_time
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
timeout_count = sum(1 for r in results if r["status"] == "timeout")
print(f"\n๐ Processing Summary:")
print(f"โ
Success: {success_count}")
print(f"โ Errors: {error_count}")
print(f"โฑ๏ธ Timeouts: {timeout_count}")
print(f"โก Time: {elapsed:.2f}s")
print(f"๐ Speed: {len(file_paths)/elapsed:.2f} files/second")
# ๐ Show file statistics if analyzing
if mode == "analyze" and success_count > 0:
total_lines = sum(r["stats"]["lines"] for r in results if r["status"] == "success")
total_words = sum(r["stats"]["words"] for r in results if r["status"] == "success")
print(f"\n๐ Content Statistics:")
print(f"๐ Total lines: {total_lines:,}")
print(f"๐ Total words: {total_words:,}")
return results
def process_directory(self, directory: str, extension: str = ".txt") -> List[Dict]:
"""๐ Process all files in a directory"""
# ๐ Find all matching files
file_paths = [
str(p) for p in Path(directory).rglob(f"*{extension}")
if p.is_file()
]
if not file_paths:
print(f"โ ๏ธ No {extension} files found in {directory}")
return []
print(f"๐ Found {len(file_paths)} {extension} files")
return self.process_files(file_paths)
# ๐ฎ Test it out!
processor = FileProcessor(max_workers=4)
# Create test files
test_files = []
for i in range(5):
filename = f"test_file_{i}.txt"
with open(filename, 'w') as f:
f.write(f"This is test file {i}.\n" * (i + 1))
f.write(f"It contains some sample text! ๐\n")
test_files.append(filename)
# Process files
results = processor.process_files(test_files, mode="analyze", timeout=10)
# Clean up test files
for file in test_files:
os.remove(file)
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Use concurrent.futures for parallel execution ๐ช
- โ Choose between threads and processes wisely ๐ก๏ธ
- โ Handle futures and exceptions properly ๐ฏ
- โ Track progress in concurrent operations ๐
- โ Build fast, scalable Python applications! ๐
Remember: concurrent.futures makes concurrency accessible and manageable. Start simple, handle errors, and watch your programs fly! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered concurrent.futures!
Hereโs what to do next:
- ๐ป Practice with the file processor exercise
- ๐๏ธ Add concurrency to an existing project
- ๐ Explore asyncio for coroutine-based concurrency
- ๐ Share your concurrent creations with others!
Remember: Every parallel programming expert started with a single thread. Keep experimenting, keep learning, and most importantly, have fun making Python faster! ๐
Happy concurrent coding! ๐๐โจ