Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ Thread Pools: concurrent.futures
๐ฏ Introduction
Ever tried to download 100 images one by one? ๐ด Itโs like waiting in line at the DMV - painfully slow! What if you could have multiple workers handling tasks simultaneously? Thatโs exactly what thread pools do! ๐
In this tutorial, weโll explore Pythonโs concurrent.futures
module - your Swiss Army knife for parallel execution. Get ready to turbocharge your programs! ๐ช
๐ Understanding Thread Pools
Whatโs a Thread Pool? ๐โโ๏ธ
Think of a thread pool like a team of workers at a restaurant:
- Without thread pool: One waiter serves all tables (slow service! ๐)
- With thread pool: Multiple waiters serve different tables (happy customers! ๐)
# ๐ Restaurant analogy
# Without thread pool - one waiter
def serve_all_tables():
for table in range(10):
serve_table(table) # ๐ด Takes forever!
# With thread pool - team of waiters
def serve_with_team():
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(serve_table, range(10)) # ๐ Much faster!
Why Use concurrent.futures? ๐ค
- Simple API: Easier than manual threading ๐ฏ
- Resource Management: Automatic cleanup ๐งน
- Future Objects: Track task completion ๐
- Exception Handling: Graceful error management ๐ก๏ธ
๐ง Basic Syntax and Usage
Letโs start with the basics of concurrent.futures
:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# ๐ฏ Basic thread pool usage
def process_item(item):
"""Simulate some work"""
time.sleep(1) # ๐ด Pretend we're doing something
return f"Processed {item}! โ
"
# Create and use a thread pool
with ThreadPoolExecutor(max_workers=3) as executor:
# ๐ Submit tasks
items = ['apple', 'banana', 'cherry']
futures = [executor.submit(process_item, item) for item in items]
# ๐ Get results as they complete
for future in as_completed(futures):
result = future.result()
print(result)
The Two Main Executors ๐ญ
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ๐งต ThreadPoolExecutor - for I/O-bound tasks
with ThreadPoolExecutor(max_workers=5) as executor:
# Great for: file I/O, network requests, database queries
results = executor.map(download_file, urls)
# ๐ง ProcessPoolExecutor - for CPU-bound tasks
with ProcessPoolExecutor(max_workers=4) as executor:
# Great for: calculations, data processing, image manipulation
results = executor.map(process_data, datasets)
๐ก Practical Examples
Example 1: Web Scraper ๐ท๏ธ
Letโs build a concurrent web scraper:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url):
"""Fetch a URL and return its status"""
try:
response = requests.get(url, timeout=5)
return f"โ
{url}: {response.status_code}"
except Exception as e:
return f"โ {url}: {str(e)}"
# ๐ URLs to check
urls = [
'https://python.org',
'https://github.com',
'https://stackoverflow.com',
'https://reddit.com',
'https://google.com'
]
# โ Wrong way - sequential (slow!)
def check_urls_sequential():
start = time.time()
for url in urls:
print(fetch_url(url))
print(f"Sequential time: {time.time() - start:.2f}s ๐")
# โ
Right way - concurrent (fast!)
def check_urls_concurrent():
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
# ๐ Submit all tasks
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# ๐ Process results as they complete
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"โ {url} generated an exception: {e}")
print(f"Concurrent time: {time.time() - start:.2f}s ๐")
# Try both approaches!
check_urls_sequential()
print("\n" + "="*50 + "\n")
check_urls_concurrent()
Example 2: Image Processor ๐ผ๏ธ
Process multiple images concurrently:
from PIL import Image
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from pathlib import Path
def resize_image(image_path):
"""Resize an image to thumbnail size"""
try:
# ๐ธ Open and resize image
with Image.open(image_path) as img:
thumbnail = img.resize((150, 150), Image.Resampling.LANCZOS)
# ๐พ Save thumbnail
output_path = Path(image_path).parent / f"thumb_{Path(image_path).name}"
thumbnail.save(output_path)
return f"โ
Processed: {Path(image_path).name}"
except Exception as e:
return f"โ Failed {image_path}: {str(e)}"
def process_images_batch(image_folder):
"""Process all images in a folder"""
# ๐ Find all images
image_files = [f for f in Path(image_folder).glob("*.jpg")]
# ๐ Process with ProcessPoolExecutor (CPU-intensive task)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(resize_image, image_files))
# ๐ Report results
for result in results:
print(result)
success_count = sum(1 for r in results if r.startswith("โ
"))
print(f"\n๐ Processed {success_count}/{len(image_files)} images successfully!")
Example 3: API Data Aggregator ๐
Fetch data from multiple APIs concurrently:
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
class DataAggregator:
"""Aggregate data from multiple APIs"""
def __init__(self, max_workers=5):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.results = {}
def fetch_api_data(self, api_name, url, headers=None):
"""Fetch data from an API"""
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return {
'api': api_name,
'status': 'success',
'data': response.json()
}
except Exception as e:
return {
'api': api_name,
'status': 'error',
'error': str(e)
}
def aggregate_data(self, api_configs):
"""Fetch data from multiple APIs concurrently"""
# ๐ Submit all API requests
futures = {}
for config in api_configs:
future = self.executor.submit(
self.fetch_api_data,
config['name'],
config['url'],
config.get('headers')
)
futures[future] = config['name']
# ๐ Collect results as they complete
for future in as_completed(futures):
api_name = futures[future]
try:
result = future.result()
self.results[api_name] = result
if result['status'] == 'success':
print(f"โ
{api_name}: Data received!")
else:
print(f"โ {api_name}: {result['error']}")
except Exception as e:
print(f"โ {api_name}: Unexpected error - {e}")
return self.results
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.executor.shutdown(wait=True)
# ๐ฏ Usage example
api_configs = [
{
'name': 'Weather API',
'url': 'https://api.weather.example.com/current',
'headers': {'API-Key': 'your-key'}
},
{
'name': 'News API',
'url': 'https://api.news.example.com/latest'
},
{
'name': 'Stock API',
'url': 'https://api.stocks.example.com/quotes'
}
]
with DataAggregator(max_workers=3) as aggregator:
results = aggregator.aggregate_data(api_configs)
print(f"\n๐ Aggregated data from {len(results)} APIs!")
๐ Advanced Concepts
Future Objects and Callbacks ๐ฎ
from concurrent.futures import ThreadPoolExecutor
import time
def long_task(task_id):
"""Simulate a long-running task"""
time.sleep(2)
return f"Task {task_id} complete! ๐"
def task_done_callback(future):
"""Called when a task completes"""
result = future.result()
print(f"Callback: {result}")
# ๐ Using callbacks
with ThreadPoolExecutor(max_workers=3) as executor:
futures = []
for i in range(5):
future = executor.submit(long_task, i)
# ๐ฏ Add callback to be notified when done
future.add_done_callback(task_done_callback)
futures.append(future)
# ๐ Do other work while tasks run
print("Doing other work while tasks run...")
time.sleep(1)
print("Still working...")
# ๐ Wait for all to complete
for future in futures:
future.result() # Block until complete
Handling Timeouts and Cancellation โฑ๏ธ
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def slow_task(duration):
"""Task that takes variable time"""
time.sleep(duration)
return f"Slept for {duration}s ๐ด"
with ThreadPoolExecutor(max_workers=2) as executor:
# ๐ Submit tasks with different durations
futures = [
executor.submit(slow_task, 1),
executor.submit(slow_task, 3),
executor.submit(slow_task, 5)
]
# โฑ๏ธ Try to get results with timeout
for i, future in enumerate(futures):
try:
result = future.result(timeout=2.5)
print(f"โ
Task {i}: {result}")
except TimeoutError:
print(f"โฐ Task {i}: Timeout! Cancelling...")
future.cancel() # Try to cancel (may not work if already started)
Custom Thread Pool Configuration ๐ง
from concurrent.futures import ThreadPoolExecutor
import threading
import queue
class CustomThreadPoolExecutor(ThreadPoolExecutor):
"""Enhanced thread pool with monitoring"""
def __init__(self, max_workers=None, thread_name_prefix=''):
super().__init__(max_workers, thread_name_prefix)
self.task_count = 0
self.completed_count = 0
self._lock = threading.Lock()
def submit(self, fn, *args, **kwargs):
"""Track submitted tasks"""
with self._lock:
self.task_count += 1
# ๐ฏ Wrap function to track completion
def wrapped_fn(*args, **kwargs):
try:
return fn(*args, **kwargs)
finally:
with self._lock:
self.completed_count += 1
return super().submit(wrapped_fn, *args, **kwargs)
def get_stats(self):
"""Get pool statistics"""
with self._lock:
return {
'submitted': self.task_count,
'completed': self.completed_count,
'pending': self.task_count - self.completed_count
}
# ๐ฏ Usage
with CustomThreadPoolExecutor(max_workers=3, thread_name_prefix='Worker') as executor:
# Submit tasks
futures = [executor.submit(time.sleep, 1) for _ in range(10)]
# ๐ Monitor progress
while any(not f.done() for f in futures):
stats = executor.get_stats()
print(f"Progress: {stats['completed']}/{stats['submitted']} tasks")
time.sleep(0.5)
print("All tasks complete! ๐")
โ ๏ธ Common Pitfalls and Solutions
Pitfall 1: Using ThreadPool for CPU-bound Tasks ๐ฅ
# โ Wrong - Using threads for CPU-intensive work
def cpu_intensive_task(n):
"""Calculate prime numbers"""
return sum(1 for i in range(2, n) if all(i % j != 0 for j in range(2, int(i**0.5) + 1)))
# This won't speed up due to GIL!
with ThreadPoolExecutor() as executor:
results = list(executor.map(cpu_intensive_task, [100000] * 4))
# โ
Right - Use ProcessPoolExecutor for CPU-bound tasks
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive_task, [100000] * 4))
Pitfall 2: Not Handling Exceptions ๐ฅ
# โ Wrong - Ignoring exceptions
def risky_task(value):
if value < 0:
raise ValueError("Negative value!")
return value * 2
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, i) for i in [-1, 2, 3]]
# This will crash when accessing results!
results = [f.result() for f in futures]
# โ
Right - Handle exceptions properly
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, i) for i in [-1, 2, 3]]
results = []
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"โ Task failed: {e}")
results.append(None)
Pitfall 3: Resource Exhaustion ๐
# โ Wrong - Too many workers
with ThreadPoolExecutor(max_workers=1000) as executor:
# This can exhaust system resources!
futures = [executor.submit(some_task, i) for i in range(10000)]
# โ
Right - Reasonable worker count
import os
# Use CPU count for CPU-bound tasks
cpu_workers = os.cpu_count() or 1
# Use more workers for I/O-bound tasks (but be reasonable)
io_workers = min(32, (os.cpu_count() or 1) * 5)
with ThreadPoolExecutor(max_workers=io_workers) as executor:
futures = [executor.submit(io_task, i) for i in range(10000)]
๐ ๏ธ Best Practices
1. Choose the Right Executor ๐ฏ
# ๐งต ThreadPoolExecutor for I/O-bound tasks
def download_files(urls):
with ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(download_file, urls))
# ๐ง ProcessPoolExecutor for CPU-bound tasks
def process_data_batch(data_chunks):
with ProcessPoolExecutor() as executor:
return list(executor.map(heavy_computation, data_chunks))
2. Use Context Managers ๐
# โ
Always use with statement
with ThreadPoolExecutor() as executor:
results = executor.map(task, items)
# Automatic cleanup when done!
# โ Avoid manual management
executor = ThreadPoolExecutor()
results = executor.map(task, items)
executor.shutdown() # Easy to forget!
3. Batch Operations Wisely ๐ฆ
def process_large_dataset(items, batch_size=100):
"""Process items in batches"""
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
# ๐ฆ Process in batches to avoid memory issues
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_futures = [executor.submit(process_item, item) for item in batch]
# ๐ Collect batch results
for future in as_completed(batch_futures):
results.append(future.result())
print(f"Processed batch {i//batch_size + 1} โ
")
return results
4. Monitor and Log Progress ๐
import logging
from tqdm import tqdm
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_with_progress(items):
"""Process items with progress bar"""
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
# ๐ Submit all tasks
futures = {executor.submit(process_item, item): item for item in items}
# ๐ Track progress with tqdm
with tqdm(total=len(futures)) as pbar:
for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
results.append(result)
logger.info(f"โ
Processed: {item}")
except Exception as e:
logger.error(f"โ Failed {item}: {e}")
finally:
pbar.update(1)
return results
๐งช Hands-On Exercise
Ready to put your skills to the test? Letโs build a concurrent file processor! ๐
Challenge: Create a program that:
- Scans a directory for text files ๐
- Counts words in each file concurrently ๐
- Finds the top 10 most common words across all files ๐
- Handles errors gracefully ๐ก๏ธ
๐ก Click for hints
- Use
ThreadPoolExecutor
for file I/O - Consider using
collections.Counter
for word counting - Remember to handle encoding errors
- Use
as_completed()
to process results as they finish
๐ฏ Click for solution
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import Counter
from pathlib import Path
import re
class ConcurrentWordCounter:
"""Count words across multiple files concurrently"""
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.word_counts = Counter()
def count_words_in_file(self, file_path):
"""Count words in a single file"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read().lower()
# ๐ Extract words (alphanumeric only)
words = re.findall(r'\b\w+\b', text)
return {
'file': file_path.name,
'word_count': len(words),
'words': Counter(words),
'status': 'success'
}
except Exception as e:
return {
'file': file_path.name,
'error': str(e),
'status': 'error'
}
def process_directory(self, directory_path):
"""Process all text files in a directory"""
directory = Path(directory_path)
text_files = list(directory.glob('*.txt'))
if not text_files:
print("โ No text files found!")
return
print(f"๐ Found {len(text_files)} text files to process")
results = []
total_words = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# ๐ Submit all file processing tasks
future_to_file = {
executor.submit(self.count_words_in_file, file): file
for file in text_files
}
# ๐ Process results as they complete
for future in as_completed(future_to_file):
file_path = future_to_file[future]
result = future.result()
if result['status'] == 'success':
print(f"โ
{result['file']}: {result['word_count']} words")
self.word_counts.update(result['words'])
total_words += result['word_count']
else:
print(f"โ {result['file']}: {result['error']}")
results.append(result)
# ๐ Display summary
successful = sum(1 for r in results if r['status'] == 'success')
print(f"\n๐ Processed {successful}/{len(text_files)} files successfully!")
print(f"๐ Total words counted: {total_words:,}")
# ๐ Show top 10 words
print("\n๐ Top 10 Most Common Words:")
for word, count in self.word_counts.most_common(10):
print(f" {word}: {count:,}")
return results
# ๐ฏ Test the word counter
if __name__ == "__main__":
# Create test files
test_dir = Path("test_files")
test_dir.mkdir(exist_ok=True)
# Generate sample files
sample_texts = [
"Python is awesome. Python makes concurrent programming easy.",
"Thread pools are great for I/O bound tasks. Threads rock!",
"Concurrent futures module is powerful and simple to use."
]
for i, text in enumerate(sample_texts):
(test_dir / f"file_{i}.txt").write_text(text)
# Run the concurrent word counter
counter = ConcurrentWordCounter(max_workers=3)
counter.process_directory(test_dir)
๐ Key Takeaways
Youโve mastered thread pools with concurrent.futures! Hereโs what you learned:
- Thread pools manage worker threads efficiently ๐โโ๏ธ
- ThreadPoolExecutor is perfect for I/O-bound tasks ๐งต
- ProcessPoolExecutor handles CPU-bound work ๐ง
- Future objects let you track and manage async tasks ๐ฎ
- Context managers ensure proper cleanup ๐งน
- as_completed() processes results as they finish ๐
Remember:
- Choose the right executor for your task type
- Handle exceptions gracefully
- Use reasonable worker counts
- Monitor progress for long-running operations
๐ค Next Steps
Congratulations on completing this tutorial! ๐ Youโre now ready to:
- Practice: Build a concurrent web crawler ๐ท๏ธ
- Explore: Try
asyncio
for async/await patterns ๐ - Advance: Learn about multiprocessing queues ๐ฎ
- Optimize: Profile your concurrent code ๐
Next up: Tutorial #316 - Process Pools: Multiprocessing ๐
Keep coding, keep learning, and remember - with great concurrency comes great performance! ๐ชโจ