Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to the amazing world of distributed computing with Celery! ๐ In this guide, weโll explore how to build scalable applications that can handle massive workloads by distributing tasks across multiple workers.
Youโll discover how Celery can transform your Python applications from single-threaded bottlenecks into powerful distributed systems! Whether youโre processing images ๐ธ, sending emails ๐ง, or running complex calculations ๐งฎ, understanding Celery is essential for building production-ready applications.
By the end of this tutorial, youโll be orchestrating distributed tasks like a maestro! Letโs dive in! ๐โโ๏ธ
๐ Understanding Celery
๐ค What is Celery?
Celery is like having a team of workers in different offices ๐ข, all working on tasks from a shared to-do list! Think of it as a post office ๐ฎ where you drop off packages (tasks) that get delivered and processed by mail carriers (workers) working independently.
In Python terms, Celery is a distributed task queue that lets you run Python functions asynchronously across multiple processes or machines. This means you can:
- โจ Process tasks in the background without blocking your main application
- ๐ Scale horizontally by adding more workers
- ๐ก๏ธ Handle failures gracefully with automatic retries
- ๐ Monitor and track task execution in real-time
๐ก Why Use Celery?
Hereโs why developers love Celery:
- Asynchronous Processing ๐: Donโt make users wait for slow operations
- Scalability ๐: Add workers as your workload grows
- Reliability ๐ก๏ธ: Built-in retry mechanisms and error handling
- Flexibility ๐จ: Support for multiple message brokers and result backends
Real-world example: Imagine an e-commerce site ๐. When a customer places an order, you need to process payment, update inventory, send confirmation emails, and generate invoices. With Celery, each of these can be a separate task handled by different workers!
๐ง Basic Syntax and Usage
๐ Installation and Setup
Letโs start with a friendly example:
# ๐ First, install Celery and Redis!
# pip install celery redis
# ๐จ Create a celery_app.py file
from celery import Celery
# ๐ Initialize Celery with Redis as broker
app = Celery(
'myapp',
broker='redis://localhost:6379/0', # ๐ฎ Message broker
backend='redis://localhost:6379/0' # ๐ Result storage
)
# ๐ฏ Configure Celery settings
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
๐ก Explanation: Redis acts as our message broker - think of it as the post office where tasks are dropped off and picked up by workers!
๐ฏ Creating Your First Task
Here are the patterns youโll use daily:
# ๐๏ธ tasks.py - Define your tasks
from celery_app import app
import time
# ๐จ Simple task decorator
@app.task
def add_numbers(x, y):
"""โจ Add two numbers together!"""
return x + y
# ๐ Task with progress tracking
@app.task(bind=True)
def long_running_task(self, duration):
"""๐โโ๏ธ Simulate a long-running task"""
for i in range(duration):
# ๐ Update task progress
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': duration}
)
time.sleep(1)
return f"Task completed in {duration} seconds! ๐"
# ๐ก๏ธ Task with error handling
@app.task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def send_email(recipient, subject, body):
"""๐ง Send an email with automatic retries"""
# Simulate email sending
print(f"Sending email to {recipient}... ๐ฎ")
# Could raise exception, will auto-retry!
return f"Email sent successfully! โ
"
๐ก Practical Examples
๐ Example 1: E-commerce Order Processing
Letโs build something real:
# ๐๏ธ E-commerce order processing system
from celery import group, chain, chord
from celery_app import app
import random
# ๐ณ Process payment
@app.task
def process_payment(order_id, amount):
"""๐ฐ Process customer payment"""
print(f"๐ณ Processing ${amount} for order {order_id}")
# Simulate payment processing
time.sleep(2)
return {
'order_id': order_id,
'payment_id': f"PAY-{random.randint(1000, 9999)}",
'status': 'completed'
}
# ๐ฆ Update inventory
@app.task
def update_inventory(payment_result, items):
"""๐ฆ Reduce inventory for purchased items"""
order_id = payment_result['order_id']
print(f"๐ฆ Updating inventory for order {order_id}")
for item in items:
print(f" - Reducing stock for {item['name']} ๐")
return {'order_id': order_id, 'inventory_updated': True}
# ๐ง Send confirmation email
@app.task
def send_confirmation(inventory_result, customer_email):
"""๐ง Send order confirmation"""
order_id = inventory_result['order_id']
print(f"๐ง Sending confirmation to {customer_email}")
return {'order_id': order_id, 'email_sent': True}
# ๐ฏ Process complete order using workflow
def process_order(order_id, amount, items, customer_email):
"""๐ Complete order processing workflow"""
# Chain tasks: payment โ inventory โ email
workflow = chain(
process_payment.s(order_id, amount),
update_inventory.s(items),
send_confirmation.s(customer_email)
)
# ๐ Execute workflow asynchronously
result = workflow.apply_async()
return result
# ๐ฎ Let's use it!
if __name__ == "__main__":
# Sample order
order = {
'id': 'ORD-123',
'amount': 99.99,
'items': [
{'name': 'Python Book ๐', 'quantity': 1},
{'name': 'Coffee Mug โ', 'quantity': 2}
],
'email': '[email protected]'
}
result = process_order(
order['id'],
order['amount'],
order['items'],
order['email']
)
print(f"Order processing started! Task ID: {result.id} ๐")
๐ฏ Try it yourself: Add a task for generating PDF invoices and include it in the workflow!
๐ฎ Example 2: Image Processing Pipeline
Letโs make it fun with image processing:
# ๐ Image processing pipeline
from PIL import Image
import io
import base64
# ๐ธ Download image from URL
@app.task
def download_image(image_url):
"""๐ฅ Download image from URL"""
print(f"๐ฅ Downloading image from {image_url}")
# Simulate download
return {
'url': image_url,
'data': 'base64_image_data_here',
'size': (1920, 1080)
}
# ๐จ Generate thumbnail
@app.task
def create_thumbnail(image_data, size=(128, 128)):
"""๐ผ๏ธ Create thumbnail from image"""
print(f"๐จ Creating {size[0]}x{size[1]} thumbnail")
return {
'original': image_data,
'thumbnail': f'thumb_{size[0]}x{size[1]}.jpg',
'size': size
}
# ๐ง Apply filters
@app.task
def apply_filters(thumbnail_data, filters=['blur', 'sharpen']):
"""โจ Apply image filters"""
print(f"๐ง Applying filters: {', '.join(filters)}")
return {
**thumbnail_data,
'filters_applied': filters,
'processed': True
}
# ๐พ Save to storage
@app.task
def save_to_storage(processed_data):
"""๐พ Save processed image to storage"""
print(f"๐พ Saving processed image...")
return {
'status': 'saved',
'location': f'/images/processed/{processed_data["thumbnail"]}',
'message': 'Image processing complete! ๐'
}
# ๐ Parallel processing for multiple sizes
@app.task
def process_image_batch(image_url):
"""๐ฏ Process image in multiple sizes"""
# Download once, process in parallel
download_task = download_image.s(image_url)
# Create thumbnails in different sizes
thumbnail_tasks = group([
create_thumbnail.s(size=(128, 128)),
create_thumbnail.s(size=(256, 256)),
create_thumbnail.s(size=(512, 512))
])
# Apply filters to all thumbnails
filter_tasks = apply_filters.s(['blur', 'enhance'])
# Save all results
save_task = save_to_storage.s()
# ๐ญ Combine into workflow
workflow = download_task | thumbnail_tasks | filter_tasks | save_task
return workflow.apply_async()
๐ Advanced Concepts
๐งโโ๏ธ Task Routing and Queues
When youโre ready to level up, try advanced routing:
# ๐ฏ Configure task routing
app.conf.task_routes = {
'tasks.send_email': {'queue': 'email'},
'tasks.process_payment': {'queue': 'priority'},
'tasks.generate_report': {'queue': 'reports'}
}
# ๐ช Priority task with custom options
@app.task(
queue='priority',
rate_limit='100/m', # 100 tasks per minute
time_limit=300, # 5 minute timeout
soft_time_limit=240 # 4 minute soft limit
)
def critical_task(data):
"""โก High-priority task with rate limiting"""
try:
# Process critical data
result = process_critical_data(data)
return {'status': 'success', 'result': result}
except SoftTimeLimitExceeded:
# ๐ก๏ธ Graceful shutdown on soft limit
return {'status': 'partial', 'message': 'Time limit reached'}
๐๏ธ Periodic Tasks with Beat
For the brave developers - scheduled tasks:
# ๐ Configure periodic tasks
from celery.schedules import crontab
app.conf.beat_schedule = {
# ๐ Generate daily reports at 2 AM
'daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=2, minute=0),
'args': (),
},
# ๐งน Cleanup old files every hour
'hourly-cleanup': {
'task': 'tasks.cleanup_temp_files',
'schedule': crontab(minute=0),
'kwargs': {'days_old': 7},
},
# ๐ Send newsletter every Monday at 9 AM
'weekly-newsletter': {
'task': 'tasks.send_newsletter',
'schedule': crontab(hour=9, minute=0, day_of_week=1),
},
}
@app.task
def generate_daily_report():
"""๐ Generate daily analytics report"""
print("๐ Generating daily report...")
# Your report logic here
return "Daily report generated! ๐"
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Task Serialization Issues
# โ Wrong way - passing non-serializable objects!
class User:
def __init__(self, name):
self.name = name
@app.task
def process_user(user): # ๐ฅ Can't serialize custom objects!
return f"Processing {user.name}"
# โ
Correct way - pass simple data types!
@app.task
def process_user(user_id):
# ๐ก๏ธ Fetch user inside the task
user = get_user_by_id(user_id)
return f"Processing {user.name}"
๐คฏ Pitfall 2: Memory Leaks in Workers
# โ Dangerous - accumulating data in global scope!
results = [] # ๐ฅ This grows forever!
@app.task
def accumulate_data(data):
results.append(data)
return len(results)
# โ
Safe - use proper storage!
@app.task
def store_data(data):
# ๐พ Store in database or cache
redis_client.lpush('results', json.dumps(data))
return redis_client.llen('results')
๐ ๏ธ Best Practices
- ๐ฏ Keep Tasks Simple: One task, one responsibility
- ๐ Use Meaningful Names:
send_welcome_email
nottask1
- ๐ก๏ธ Handle Failures Gracefully: Always plan for retries
- ๐จ Monitor Everything: Use Flower for real-time monitoring
- โจ Test Locally: Use
CELERY_TASK_ALWAYS_EAGER=True
for testing
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Video Processing Pipeline
Create a distributed video processing system:
๐ Requirements:
- โ Upload video files to processing queue
- ๐ท๏ธ Extract metadata (duration, resolution, codec)
- ๐ค Generate multiple quality versions (480p, 720p, 1080p)
- ๐ Create thumbnail previews at different timestamps
- ๐จ Apply watermark to all versions
๐ Bonus Points:
- Add progress tracking for long videos
- Implement priority queues for premium users
- Create a dashboard showing processing status
๐ก Solution
๐ Click to see solution
# ๐ฏ Video processing pipeline!
from celery import group, chain, chord
import ffmpeg
# ๐น Extract video metadata
@app.task
def extract_metadata(video_path):
"""๐ Extract video information"""
print(f"๐ Extracting metadata from {video_path}")
probe = ffmpeg.probe(video_path)
video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')
return {
'path': video_path,
'duration': float(probe['format']['duration']),
'width': video_info['width'],
'height': video_info['height'],
'codec': video_info['codec_name'],
'bitrate': probe['format']['bit_rate']
}
# ๐ฌ Generate quality versions
@app.task(bind=True)
def transcode_video(self, metadata, quality):
"""๐ Transcode video to different quality"""
resolutions = {
'480p': (854, 480),
'720p': (1280, 720),
'1080p': (1920, 1080)
}
width, height = resolutions[quality]
output_path = f"output_{quality}.mp4"
# ๐ Update progress
self.update_state(
state='TRANSCODING',
meta={'quality': quality, 'progress': 0}
)
print(f"๐ฌ Transcoding to {quality}...")
# Actual transcoding would happen here
return {
**metadata,
'transcoded': {
'quality': quality,
'path': output_path,
'resolution': f"{width}x{height}"
}
}
# ๐ผ๏ธ Generate thumbnails
@app.task
def generate_thumbnails(metadata, count=5):
"""๐ธ Generate video thumbnails"""
duration = metadata['duration']
interval = duration / (count + 1)
thumbnails = []
for i in range(1, count + 1):
timestamp = interval * i
thumb_path = f"thumb_{i}.jpg"
print(f"๐ธ Generating thumbnail at {timestamp:.1f}s")
thumbnails.append({
'timestamp': timestamp,
'path': thumb_path
})
return {**metadata, 'thumbnails': thumbnails}
# ๐ง Apply watermark
@app.task
def apply_watermark(transcoded_data, watermark_path='watermark.png'):
"""๐ง Add watermark to video"""
print(f"๐ง Applying watermark to {transcoded_data['transcoded']['quality']}")
output_path = transcoded_data['transcoded']['path'].replace('.mp4', '_watermarked.mp4')
return {
**transcoded_data,
'watermarked': True,
'final_path': output_path
}
# ๐ Complete pipeline
@app.task
def process_video_pipeline(video_path, qualities=['480p', '720p', '1080p']):
"""๐ฏ Complete video processing workflow"""
# Extract metadata first
metadata_task = extract_metadata.s(video_path)
# Transcode to multiple qualities in parallel
transcode_group = group([
transcode_video.s(quality) for quality in qualities
])
# Apply watermark to each version
watermark_group = group([
apply_watermark.s() for _ in qualities
])
# Generate thumbnails (once for all versions)
thumbnail_task = generate_thumbnails.s(count=5)
# ๐ญ Combine workflows
workflow = (
metadata_task |
group(transcode_group, thumbnail_task) |
watermark_group
)
return workflow.apply_async()
# ๐ฎ Test it out!
if __name__ == "__main__":
result = process_video_pipeline.delay('sample_video.mp4')
print(f"๐ Video processing started! Task ID: {result.id}")
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Create distributed tasks with Celery ๐ช
- โ Build complex workflows using chains, groups, and chords ๐ก๏ธ
- โ Handle failures gracefully with retries and error handling ๐ฏ
- โ Monitor task execution in real-time ๐
- โ Scale applications horizontally with multiple workers! ๐
Remember: Celery is your friend for building scalable Python applications! Itโs here to help you handle any workload. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered Celery basics!
Hereโs what to do next:
- ๐ป Set up Redis and try the examples above
- ๐๏ธ Build a real application using Celery for background tasks
- ๐ Explore advanced features like task signatures and canvas
- ๐ Learn about Celery monitoring with Flower
Remember: Every distributed systems expert started with their first task. Keep experimenting, keep learning, and most importantly, have fun building scalable applications! ๐
Happy distributed computing! ๐๐โจ