Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the actor model fundamentals ๐ฏ
- Apply message passing in real projects ๐๏ธ
- Debug common concurrency issues ๐
- Write clean, scalable concurrent code โจ
๐ฏ Introduction
Welcome to the world of concurrent programming with the Actor Model! ๐ In this advanced tutorial, weโll explore how message passing can revolutionize the way you handle concurrency in Python.
Have you ever struggled with threads stepping on each otherโs toes? ๐ฑ Or dealt with the complexity of shared state and locks? The Actor Model offers an elegant solution where independent โactorsโ communicate through messages, eliminating many traditional concurrency headaches.
By the end of this tutorial, youโll be orchestrating actors like a maestro conducting a symphony! ๐ผ Letโs dive into this powerful paradigm! ๐โโ๏ธ
๐ Understanding the Actor Model
๐ค What is the Actor Model?
The Actor Model is like having a team of specialized workers ๐ทโโ๏ธ๐ทโโ๏ธ in separate offices, communicating only through messages. Think of it as a post office system ๐ฎ where each actor has its own mailbox and processes messages one at a time.
In Python terms, an actor is an independent unit that:
- โจ Has its own state (private data)
- ๐ Processes messages sequentially
- ๐ก๏ธ Communicates only through message passing
- ๐จ Can create new actors
- ๐ฏ Makes local decisions based on messages
๐ก Why Use the Actor Model?
Hereโs why developers love the Actor Model:
- No Shared State ๐: Each actor owns its data exclusively
- Fault Isolation ๐ป: One actorโs failure doesnโt crash others
- Scalability ๐: Easily distribute actors across cores or machines
- Simpler Reasoning ๐ง: Think about one actor at a time
Real-world example: Imagine building a chat server ๐ฌ. With the Actor Model, each user connection can be an actor, handling messages independently without worrying about race conditions!
๐ง Basic Syntax and Usage
๐ Simple Actor Implementation
Letโs start with a friendly example using Pythonโs queue
and threading
:
import queue
import threading
from typing import Any, Callable, Dict
import time
# ๐ Hello, Actor Model!
class Actor:
def __init__(self, name: str):
self.name = name # ๐ค Actor's name
self.mailbox = queue.Queue() # ๐ฎ Message inbox
self.running = True # ๐ฏ Actor state
self.handlers: Dict[str, Callable] = {} # ๐จ Message handlers
# ๐ Start the actor's thread
self.thread = threading.Thread(target=self._run)
self.thread.start()
def _run(self):
"""๐ Main actor loop - process messages"""
while self.running:
try:
# ๐จ Wait for a message
message = self.mailbox.get(timeout=0.1)
self._handle_message(message)
except queue.Empty:
continue # ๐ค No messages, keep waiting
def _handle_message(self, message: Dict[str, Any]):
"""๐ฏ Process incoming message"""
msg_type = message.get('type', 'unknown')
if msg_type in self.handlers:
self.handlers[msg_type](message)
else:
print(f"โ ๏ธ {self.name}: Unknown message type '{msg_type}'")
def send(self, message: Dict[str, Any]):
"""๐ค Send message to this actor"""
self.mailbox.put(message)
def on(self, msg_type: str, handler: Callable):
"""๐จ Register a message handler"""
self.handlers[msg_type] = handler
def stop(self):
"""๐ Stop the actor"""
self.running = False
self.thread.join()
# ๐ฎ Let's create a simple actor!
greeter = Actor("Greeter")
# ๐ Define message handler
def handle_greet(message):
name = message.get('name', 'Friend')
print(f"โจ Greeter says: Hello, {name}! ๐")
# ๐ฏ Register the handler
greeter.on('greet', handle_greet)
# ๐ค Send a message
greeter.send({'type': 'greet', 'name': 'Python Developer'})
time.sleep(0.1) # Give it time to process
greeter.stop()
๐ก Explanation: Notice how the actor processes messages sequentially in its own thread. No locks needed! The mailbox ensures thread-safe communication.
๐ฏ Common Actor Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Stateful Actor
class Counter(Actor):
def __init__(self):
super().__init__("Counter")
self.count = 0 # ๐ฏ Private state
# ๐จ Register handlers
self.on('increment', self._increment)
self.on('get_count', self._get_count)
def _increment(self, message):
self.count += 1
print(f"โจ Count is now: {self.count}")
def _get_count(self, message):
reply_to = message.get('reply_to')
if reply_to:
reply_to.send({'type': 'count_result', 'count': self.count})
# ๐จ Pattern 2: Actor Supervision
class Supervisor(Actor):
def __init__(self):
super().__init__("Supervisor")
self.workers = [] # ๐ฅ Child actors
self.on('create_worker', self._create_worker)
self.on('broadcast', self._broadcast)
def _create_worker(self, message):
worker = Actor(f"Worker-{len(self.workers)}")
self.workers.append(worker)
print(f"๐ Created {worker.name}")
def _broadcast(self, message):
# ๐ข Send to all workers
for worker in self.workers:
worker.send(message.get('payload', {}))
# ๐ Pattern 3: Request-Reply
class Calculator(Actor):
def __init__(self):
super().__init__("Calculator")
self.on('calculate', self._calculate)
def _calculate(self, message):
operation = message.get('operation')
a, b = message.get('a', 0), message.get('b', 0)
result = None
if operation == 'add':
result = a + b
elif operation == 'multiply':
result = a * b
# ๐ค Send reply
reply_to = message.get('reply_to')
if reply_to and result is not None:
reply_to.send({'type': 'result', 'value': result})
๐ก Practical Examples
๐ Example 1: Order Processing System
Letโs build a real-world order processing system:
# ๐๏ธ Order processing with actors
class OrderProcessor(Actor):
def __init__(self):
super().__init__("OrderProcessor")
self.inventory = Actor("Inventory")
self.payment = Actor("PaymentGateway")
self.shipping = Actor("ShippingService")
# ๐จ Set up handlers
self.on('process_order', self._process_order)
self._setup_services()
def _setup_services(self):
# ๐ฆ Inventory service
inventory_stock = {'๐': 10, '๐': 15, '๐ฎ': 20}
def check_stock(msg):
item = msg.get('item')
quantity = msg.get('quantity', 1)
available = inventory_stock.get(item, 0) >= quantity
reply_to = msg.get('reply_to')
if reply_to:
reply_to.send({
'type': 'stock_result',
'available': available,
'item': item
})
if available:
inventory_stock[item] -= quantity
print(f"โ
Reserved {quantity} {item}")
self.inventory.on('check_stock', check_stock)
# ๐ณ Payment service
def process_payment(msg):
amount = msg.get('amount', 0)
print(f"๐ฐ Processing payment of ${amount}")
# Simulate payment processing
time.sleep(0.1)
success = amount < 100 # ๐
Demo logic
reply_to = msg.get('reply_to')
if reply_to:
reply_to.send({
'type': 'payment_result',
'success': success
})
self.payment.on('process_payment', process_payment)
# ๐ Shipping service
def ship_order(msg):
items = msg.get('items', [])
address = msg.get('address', 'Unknown')
print(f"๐ฆ Shipping {items} to {address}")
reply_to = msg.get('reply_to')
if reply_to:
reply_to.send({
'type': 'shipping_result',
'tracking': f"TRACK-{hash(str(items))}"
})
self.shipping.on('ship_order', ship_order)
def _process_order(self, message):
order_id = message.get('order_id')
items = message.get('items', [])
total = message.get('total', 0)
print(f"๐ Processing order {order_id}")
# ๐ Check inventory for all items
for item, quantity in items:
self.inventory.send({
'type': 'check_stock',
'item': item,
'quantity': quantity,
'reply_to': self
})
# ๐ณ Process payment
self.payment.send({
'type': 'process_payment',
'amount': total,
'reply_to': self
})
# Note: In real system, you'd wait for responses
# before proceeding to shipping
# ๐ฎ Let's use it!
order_system = OrderProcessor()
# ๐ค Place an order
order_system.send({
'type': 'process_order',
'order_id': 'ORD-001',
'items': [('๐', 2), ('๐', 1)],
'total': 35.99
})
time.sleep(0.5) # Let it process
๐ฏ Try it yourself: Add a notification actor that sends order updates to customers!
๐ฎ Example 2: Game Server with Actors
Letโs make a multiplayer game server:
# ๐ Game server with actor model
class GameRoom(Actor):
def __init__(self, room_id: str):
super().__init__(f"GameRoom-{room_id}")
self.room_id = room_id
self.players = {} # ๐ฅ Player actors
self.game_state = {
'scores': {},
'round': 1,
'active': False
}
# ๐ฏ Register handlers
self.on('join', self._handle_join)
self.on('leave', self._handle_leave)
self.on('player_action', self._handle_action)
self.on('start_game', self._start_game)
def _handle_join(self, message):
player_id = message.get('player_id')
player_name = message.get('name', f'Player-{player_id}')
# ๐ฎ Create player actor
player = PlayerActor(player_id, player_name, self)
self.players[player_id] = player
self.game_state['scores'][player_id] = 0
print(f"โจ {player_name} joined room {self.room_id}!")
# ๐ข Notify other players
self._broadcast({
'type': 'player_joined',
'player': player_name
}, exclude=player_id)
def _handle_leave(self, message):
player_id = message.get('player_id')
if player_id in self.players:
player = self.players[player_id]
player.stop()
del self.players[player_id]
del self.game_state['scores'][player_id]
print(f"๐ Player {player_id} left the room")
def _handle_action(self, message):
player_id = message.get('player_id')
action = message.get('action')
if action == 'score':
points = message.get('points', 1)
self.game_state['scores'][player_id] += points
# ๐ Check for winner
if self.game_state['scores'][player_id] >= 10:
self._end_game(player_id)
else:
self._broadcast({
'type': 'score_update',
'scores': self.game_state['scores']
})
def _start_game(self, message):
self.game_state['active'] = True
print(f"๐ฎ Game started in room {self.room_id}!")
self._broadcast({
'type': 'game_started',
'round': self.game_state['round']
})
def _end_game(self, winner_id):
self.game_state['active'] = False
winner_name = self.players[winner_id].name
print(f"๐ {winner_name} wins in room {self.room_id}!")
self._broadcast({
'type': 'game_ended',
'winner': winner_name,
'final_scores': self.game_state['scores']
})
def _broadcast(self, message, exclude=None):
"""๐ข Send message to all players"""
for player_id, player in self.players.items():
if player_id != exclude:
player.send(message)
class PlayerActor(Actor):
def __init__(self, player_id: str, name: str, room: GameRoom):
super().__init__(f"Player-{player_id}")
self.player_id = player_id
self.name = name
self.room = room
self.score = 0
# ๐ฏ Register handlers
self.on('game_update', self._handle_update)
self.on('power_up', self._use_power_up)
def _handle_update(self, message):
update_type = message.get('type')
print(f"๐ฑ {self.name} received: {update_type}")
def _use_power_up(self, message):
power = message.get('power', 'boost')
print(f"โก {self.name} used {power}!")
# ๐ค Notify room
self.room.send({
'type': 'player_action',
'player_id': self.player_id,
'action': 'score',
'points': 2 if power == 'double' else 1
})
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Actor Hierarchies and Supervision
When youโre ready to level up, implement sophisticated supervision strategies:
# ๐ฏ Advanced supervision with restart policies
class SupervisedActor(Actor):
def __init__(self, name: str, supervisor=None):
super().__init__(name)
self.supervisor = supervisor
self.restart_count = 0
self.max_restarts = 3
def _run(self):
"""๐ Enhanced run with error handling"""
while self.running:
try:
message = self.mailbox.get(timeout=0.1)
self._handle_message(message)
except queue.Empty:
continue
except Exception as e:
print(f"๐ฅ {self.name} crashed: {e}")
self._notify_supervisor('crash', {'error': str(e)})
if self.restart_count < self.max_restarts:
self.restart_count += 1
print(f"๐ Restarting {self.name} (attempt {self.restart_count})")
continue
else:
print(f"โ ๏ธ {self.name} exceeded restart limit")
self.running = False
def _notify_supervisor(self, event_type: str, data: Dict):
"""๐ค Notify supervisor of events"""
if self.supervisor:
self.supervisor.send({
'type': 'supervision_event',
'actor': self.name,
'event': event_type,
'data': data
})
# ๐ช Supervisor with strategies
class SupervisorActor(Actor):
def __init__(self):
super().__init__("Supervisor")
self.children = {}
self.strategies = {
'one_for_one': self._restart_one,
'one_for_all': self._restart_all,
'rest_for_one': self._restart_rest
}
self.strategy = 'one_for_one'
self.on('supervision_event', self._handle_supervision)
self.on('create_child', self._create_child)
def _create_child(self, message):
name = message.get('name')
child = SupervisedActor(name, supervisor=self)
self.children[name] = child
print(f"โจ Supervisor created child: {name}")
def _handle_supervision(self, message):
actor_name = message.get('actor')
event = message.get('event')
if event == 'crash':
print(f"๐จ Supervisor handling crash of {actor_name}")
self.strategies[self.strategy](actor_name)
def _restart_one(self, actor_name):
"""๐ฏ Restart only the failed actor"""
if actor_name in self.children:
old_actor = self.children[actor_name]
old_actor.stop()
new_actor = SupervisedActor(actor_name, supervisor=self)
self.children[actor_name] = new_actor
print(f"โ
Restarted {actor_name}")
def _restart_all(self, actor_name):
"""๐ Restart all children"""
print("๐ซ Restarting all children...")
for name in list(self.children.keys()):
self._restart_one(name)
def _restart_rest(self, actor_name):
"""๐ Restart failed actor and all created after it"""
# Implementation depends on creation order tracking
pass
๐๏ธ Advanced Topic 2: Distributed Actors
For the brave developers, hereโs distributed actor communication:
import json
import socket
from typing import Optional
# ๐ Network-enabled actor
class NetworkActor(Actor):
def __init__(self, name: str, host='localhost', port=None):
super().__init__(name)
self.host = host
self.port = port or self._get_free_port()
self.socket = None
self.remote_actors = {} # ๐ Remote actor addresses
self._start_server()
self.on('remote_message', self._handle_remote)
self.on('register_remote', self._register_remote)
def _get_free_port(self):
"""๐ Find available port"""
with socket.socket() as s:
s.bind(('', 0))
return s.getsockname()[1]
def _start_server(self):
"""๐ฅ๏ธ Start network server"""
def server_thread():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(5)
server.settimeout(0.1)
print(f"๐ {self.name} listening on {self.host}:{self.port}")
while self.running:
try:
client, addr = server.accept()
data = client.recv(4096)
if data:
message = json.loads(data.decode())
self.send(message)
client.close()
except socket.timeout:
continue
except Exception as e:
print(f"โ ๏ธ Network error: {e}")
server.close()
threading.Thread(target=server_thread, daemon=True).start()
def _register_remote(self, message):
"""๐ Register remote actor address"""
actor_name = message.get('name')
host = message.get('host')
port = message.get('port')
self.remote_actors[actor_name] = (host, port)
print(f"๐ก Registered remote actor: {actor_name} at {host}:{port}")
def send_remote(self, actor_name: str, message: Dict):
"""๐ค Send message to remote actor"""
if actor_name in self.remote_actors:
host, port = self.remote_actors[actor_name]
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.send(json.dumps(message).encode())
sock.close()
print(f"๐จ Sent to {actor_name}: {message['type']}")
except Exception as e:
print(f"โ Failed to send to {actor_name}: {e}")
# ๐ฏ Example: Distributed chat system
class ChatNode(NetworkActor):
def __init__(self, node_id: str):
super().__init__(f"ChatNode-{node_id}")
self.messages = []
self.on('chat_message', self._handle_chat)
self.on('sync_request', self._handle_sync)
def _handle_chat(self, message):
user = message.get('user')
text = message.get('text')
timestamp = message.get('timestamp', time.time())
chat_msg = {
'user': user,
'text': text,
'timestamp': timestamp,
'node': self.name
}
self.messages.append(chat_msg)
print(f"๐ฌ {user}: {text}")
# ๐ข Broadcast to other nodes
for remote_node in self.remote_actors:
self.send_remote(remote_node, {
'type': 'chat_message',
'user': user,
'text': text,
'timestamp': timestamp
})
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Deadlocks in Request-Reply
# โ Wrong way - can cause deadlock!
class BadActor(Actor):
def __init__(self):
super().__init__("BadActor")
self.other_actor = None
def process(self, msg):
# ๐ฅ Blocking wait for reply - DON'T DO THIS!
reply_queue = queue.Queue()
self.other_actor.send({
'type': 'request',
'reply_to': reply_queue
})
# This blocks the actor's thread!
result = reply_queue.get() # ๐ฐ Deadlock risk!
# โ
Correct way - use callbacks or futures!
class GoodActor(Actor):
def __init__(self):
super().__init__("GoodActor")
self.pending_requests = {}
def process(self, msg):
request_id = str(uuid.uuid4())
self.pending_requests[request_id] = msg.get('original_sender')
# ๐ค Non-blocking send
self.other_actor.send({
'type': 'request',
'request_id': request_id,
'reply_to': self
})
# โ
Continue processing other messages!
def handle_reply(self, msg):
request_id = msg.get('request_id')
if request_id in self.pending_requests:
# Process the reply asynchronously
original_sender = self.pending_requests.pop(request_id)
# Do something with the result
๐คฏ Pitfall 2: Message Ordering Assumptions
# โ Dangerous - assuming message order!
class OrderDependentActor(Actor):
def process_sequence(self):
self.send({'type': 'step1'})
self.send({'type': 'step2'}) # ๐ฅ May arrive before step1!
self.send({'type': 'step3'})
# โ
Safe - explicit sequencing!
class SequencedActor(Actor):
def __init__(self):
super().__init__("SequencedActor")
self.sequence_number = 0
def send_sequenced(self, message):
# ๐ฏ Add sequence number
message['sequence'] = self.sequence_number
self.sequence_number += 1
self.send(message)
def _handle_message(self, message):
# โ
Process in order
seq = message.get('sequence', -1)
# Buffer out-of-order messages if needed
๐ ๏ธ Best Practices
- ๐ฏ Keep Actors Focused: One responsibility per actor
- ๐ Immutable Messages: Never modify messages after sending
- ๐ก๏ธ Fail Fast: Let actors crash and restart cleanly
- ๐จ Use Supervision: Always have a supervision strategy
- โจ Avoid Blocking: Never block in message handlers
- ๐ Monitor Mailboxes: Watch for message queue buildup
- ๐ Design for Failure: Assume actors can crash anytime
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Distributed Task Queue
Create an actor-based task queue system:
๐ Requirements:
- โ Master actor that distributes tasks
- ๐ท๏ธ Worker actors that process tasks
- ๐ค Result aggregator actor
- ๐ Task priorities and deadlines
- ๐จ Fault tolerance with restart capability
๐ Bonus Points:
- Add work stealing between workers
- Implement backpressure handling
- Create a monitoring dashboard actor
๐ก Solution
๐ Click to see solution
import heapq
from dataclasses import dataclass
from datetime import datetime, timedelta
# ๐ฏ Our distributed task queue system!
@dataclass
class Task:
id: str
priority: int # Lower = higher priority
deadline: datetime
payload: Dict[str, Any]
retries: int = 0
def __lt__(self, other):
return self.priority < other.priority
class TaskMaster(Actor):
def __init__(self):
super().__init__("TaskMaster")
self.task_queue = [] # ๐ Priority queue
self.workers = [] # ๐ฅ Worker actors
self.results = {} # ๐ Task results
self.pending_tasks = {} # ๐ Tasks being processed
# ๐จ Register handlers
self.on('submit_task', self._submit_task)
self.on('worker_ready', self._worker_ready)
self.on('task_complete', self._task_complete)
self.on('task_failed', self._task_failed)
self.on('create_worker', self._create_worker)
def _submit_task(self, message):
task = Task(
id=message.get('id', str(uuid.uuid4())),
priority=message.get('priority', 5),
deadline=datetime.now() + timedelta(seconds=message.get('timeout', 60)),
payload=message.get('payload', {})
)
heapq.heappush(self.task_queue, task)
print(f"โ
Task {task.id} submitted with priority {task.priority}")
# ๐ Try to assign immediately
self._try_assign_tasks()
def _create_worker(self, message):
worker_id = f"Worker-{len(self.workers)}"
worker = TaskWorker(worker_id, self)
self.workers.append(worker)
print(f"๐ Created {worker_id}")
def _worker_ready(self, message):
worker = message.get('worker')
self._assign_task_to_worker(worker)
def _try_assign_tasks(self):
"""๐ฏ Assign tasks to available workers"""
for worker in self.workers:
if not self.task_queue:
break
# Check if worker is available
worker_id = worker.name
if worker_id not in self.pending_tasks:
self._assign_task_to_worker(worker)
def _assign_task_to_worker(self, worker):
if not self.task_queue:
return
task = heapq.heappop(self.task_queue)
# โฐ Check deadline
if datetime.now() > task.deadline:
print(f"โ ๏ธ Task {task.id} expired!")
self.results[task.id] = {'status': 'expired'}
return
self.pending_tasks[worker.name] = task
worker.send({
'type': 'process_task',
'task': task
})
print(f"๐ค Assigned task {task.id} to {worker.name}")
def _task_complete(self, message):
worker_id = message.get('worker_id')
task_id = message.get('task_id')
result = message.get('result')
if worker_id in self.pending_tasks:
del self.pending_tasks[worker_id]
self.results[task_id] = {
'status': 'complete',
'result': result,
'worker': worker_id
}
print(f"โ
Task {task_id} completed by {worker_id}")
# ๐ Worker is ready for next task
self._worker_ready({'worker': next(w for w in self.workers if w.name == worker_id)})
def _task_failed(self, message):
worker_id = message.get('worker_id')
task_id = message.get('task_id')
error = message.get('error')
task = self.pending_tasks.get(worker_id)
if task and task.retries < 3:
# ๐ Retry the task
task.retries += 1
heapq.heappush(self.task_queue, task)
print(f"๐ Retrying task {task_id} (attempt {task.retries + 1})")
else:
# โ Task failed permanently
self.results[task_id] = {
'status': 'failed',
'error': error
}
print(f"โ Task {task_id} failed permanently")
if worker_id in self.pending_tasks:
del self.pending_tasks[worker_id]
class TaskWorker(Actor):
def __init__(self, worker_id: str, master: TaskMaster):
super().__init__(worker_id)
self.master = master
self.current_task = None
self.on('process_task', self._process_task)
# ๐ฏ Tell master we're ready
self.master.send({
'type': 'worker_ready',
'worker': self
})
def _process_task(self, message):
task = message.get('task')
self.current_task = task
try:
# ๐ฎ Simulate task processing
print(f"โ๏ธ {self.name} processing task {task.id}")
# Your actual task processing logic here
result = self._execute_task(task.payload)
# ๐ค Report success
self.master.send({
'type': 'task_complete',
'worker_id': self.name,
'task_id': task.id,
'result': result
})
except Exception as e:
# ๐ฅ Report failure
self.master.send({
'type': 'task_failed',
'worker_id': self.name,
'task_id': task.id,
'error': str(e)
})
finally:
self.current_task = None
def _execute_task(self, payload):
# ๐ฏ Actual task execution
operation = payload.get('operation', 'default')
if operation == 'compute':
# Heavy computation simulation
time.sleep(0.1)
return {'computed': payload.get('value', 0) * 2}
elif operation == 'fetch':
# I/O simulation
time.sleep(0.05)
return {'fetched': 'data'}
else:
return {'processed': True}
# ๐ Result Aggregator
class ResultAggregator(Actor):
def __init__(self, master: TaskMaster):
super().__init__("ResultAggregator")
self.master = master
self.aggregated_results = {}
self.on('aggregate', self._aggregate)
self.on('get_report', self._get_report)
def _aggregate(self, message):
batch_id = message.get('batch_id')
task_ids = message.get('task_ids', [])
results = []
for task_id in task_ids:
if task_id in self.master.results:
results.append(self.master.results[task_id])
self.aggregated_results[batch_id] = {
'total': len(task_ids),
'completed': sum(1 for r in results if r.get('status') == 'complete'),
'failed': sum(1 for r in results if r.get('status') == 'failed'),
'results': results
}
print(f"๐ Aggregated batch {batch_id}: {self.aggregated_results[batch_id]['completed']}/{len(task_ids)} completed")
def _get_report(self, message):
print("๐ Task Queue Report:")
for batch_id, stats in self.aggregated_results.items():
print(f" Batch {batch_id}: {stats['completed']} completed, {stats['failed']} failed")
# ๐ฎ Test it out!
task_queue = TaskMaster()
# Create workers
for i in range(3):
task_queue.send({'type': 'create_worker'})
# Submit tasks
for i in range(10):
task_queue.send({
'type': 'submit_task',
'id': f'task-{i}',
'priority': i % 3, # Different priorities
'payload': {
'operation': ['compute', 'fetch'][i % 2],
'value': i
}
})
# Let it process
time.sleep(1)
# Create aggregator and get report
aggregator = ResultAggregator(task_queue)
aggregator.send({
'type': 'aggregate',
'batch_id': 'batch-1',
'task_ids': [f'task-{i}' for i in range(10)]
})
aggregator.send({'type': 'get_report'})
๐ Key Takeaways
Youโve mastered the Actor Model! Hereโs what you can now do:
- โ Design concurrent systems without shared state complexity ๐ช
- โ Build fault-tolerant applications with supervision ๐ก๏ธ
- โ Scale across cores and machines with distributed actors ๐ฏ
- โ Debug message-passing systems like a pro ๐
- โ Create reactive, event-driven architectures with confidence! ๐
Remember: The Actor Model transforms complex concurrency into simple message passing. Let each actor do one thing well! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve conquered the Actor Model and message passing!
Hereโs what to do next:
- ๐ป Build a real-time chat system with actors
- ๐๏ธ Explore actor frameworks like Pykka or Ray
- ๐ Move on to our next tutorial: Async/Await Patterns
- ๐ Share your actor-based creations with the community!
Remember: Every distributed system expert started with their first actor. Keep experimenting, keep learning, and most importantly, have fun with concurrent programming! ๐
Happy coding! ๐๐โจ