+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 337 of 365

๐Ÿš€ Actor Model: Message Passing

Master actor model: message passing in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the actor model fundamentals ๐ŸŽฏ
  • Apply message passing in real projects ๐Ÿ—๏ธ
  • Debug common concurrency issues ๐Ÿ›
  • Write clean, scalable concurrent code โœจ

๐ŸŽฏ Introduction

Welcome to the world of concurrent programming with the Actor Model! ๐ŸŽ‰ In this advanced tutorial, weโ€™ll explore how message passing can revolutionize the way you handle concurrency in Python.

Have you ever struggled with threads stepping on each otherโ€™s toes? ๐Ÿ˜ฑ Or dealt with the complexity of shared state and locks? The Actor Model offers an elegant solution where independent โ€œactorsโ€ communicate through messages, eliminating many traditional concurrency headaches.

By the end of this tutorial, youโ€™ll be orchestrating actors like a maestro conducting a symphony! ๐ŸŽผ Letโ€™s dive into this powerful paradigm! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding the Actor Model

๐Ÿค” What is the Actor Model?

The Actor Model is like having a team of specialized workers ๐Ÿ‘ทโ€โ™€๏ธ๐Ÿ‘ทโ€โ™‚๏ธ in separate offices, communicating only through messages. Think of it as a post office system ๐Ÿ“ฎ where each actor has its own mailbox and processes messages one at a time.

In Python terms, an actor is an independent unit that:

  • โœจ Has its own state (private data)
  • ๐Ÿš€ Processes messages sequentially
  • ๐Ÿ›ก๏ธ Communicates only through message passing
  • ๐Ÿ“จ Can create new actors
  • ๐ŸŽฏ Makes local decisions based on messages

๐Ÿ’ก Why Use the Actor Model?

Hereโ€™s why developers love the Actor Model:

  1. No Shared State ๐Ÿ”’: Each actor owns its data exclusively
  2. Fault Isolation ๐Ÿ’ป: One actorโ€™s failure doesnโ€™t crash others
  3. Scalability ๐Ÿ“–: Easily distribute actors across cores or machines
  4. Simpler Reasoning ๐Ÿ”ง: Think about one actor at a time

Real-world example: Imagine building a chat server ๐Ÿ’ฌ. With the Actor Model, each user connection can be an actor, handling messages independently without worrying about race conditions!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Actor Implementation

Letโ€™s start with a friendly example using Pythonโ€™s queue and threading:

import queue
import threading
from typing import Any, Callable, Dict
import time

# ๐Ÿ‘‹ Hello, Actor Model!
class Actor:
    def __init__(self, name: str):
        self.name = name  # ๐Ÿ‘ค Actor's name
        self.mailbox = queue.Queue()  # ๐Ÿ“ฎ Message inbox
        self.running = True  # ๐ŸŽฏ Actor state
        self.handlers: Dict[str, Callable] = {}  # ๐Ÿ“จ Message handlers
        
        # ๐Ÿš€ Start the actor's thread
        self.thread = threading.Thread(target=self._run)
        self.thread.start()
    
    def _run(self):
        """๐Ÿ”„ Main actor loop - process messages"""
        while self.running:
            try:
                # ๐Ÿ“จ Wait for a message
                message = self.mailbox.get(timeout=0.1)
                self._handle_message(message)
            except queue.Empty:
                continue  # ๐Ÿ’ค No messages, keep waiting
    
    def _handle_message(self, message: Dict[str, Any]):
        """๐ŸŽฏ Process incoming message"""
        msg_type = message.get('type', 'unknown')
        if msg_type in self.handlers:
            self.handlers[msg_type](message)
        else:
            print(f"โš ๏ธ {self.name}: Unknown message type '{msg_type}'")
    
    def send(self, message: Dict[str, Any]):
        """๐Ÿ“ค Send message to this actor"""
        self.mailbox.put(message)
    
    def on(self, msg_type: str, handler: Callable):
        """๐ŸŽจ Register a message handler"""
        self.handlers[msg_type] = handler
    
    def stop(self):
        """๐Ÿ›‘ Stop the actor"""
        self.running = False
        self.thread.join()

# ๐ŸŽฎ Let's create a simple actor!
greeter = Actor("Greeter")

# ๐Ÿ“ Define message handler
def handle_greet(message):
    name = message.get('name', 'Friend')
    print(f"โœจ Greeter says: Hello, {name}! ๐Ÿ‘‹")

# ๐ŸŽฏ Register the handler
greeter.on('greet', handle_greet)

# ๐Ÿ“ค Send a message
greeter.send({'type': 'greet', 'name': 'Python Developer'})
time.sleep(0.1)  # Give it time to process
greeter.stop()

๐Ÿ’ก Explanation: Notice how the actor processes messages sequentially in its own thread. No locks needed! The mailbox ensures thread-safe communication.

๐ŸŽฏ Common Actor Patterns

Here are patterns youโ€™ll use daily:

# ๐Ÿ—๏ธ Pattern 1: Stateful Actor
class Counter(Actor):
    def __init__(self):
        super().__init__("Counter")
        self.count = 0  # ๐ŸŽฏ Private state
        
        # ๐Ÿ“จ Register handlers
        self.on('increment', self._increment)
        self.on('get_count', self._get_count)
    
    def _increment(self, message):
        self.count += 1
        print(f"โœจ Count is now: {self.count}")
    
    def _get_count(self, message):
        reply_to = message.get('reply_to')
        if reply_to:
            reply_to.send({'type': 'count_result', 'count': self.count})

# ๐ŸŽจ Pattern 2: Actor Supervision
class Supervisor(Actor):
    def __init__(self):
        super().__init__("Supervisor")
        self.workers = []  # ๐Ÿ‘ฅ Child actors
        self.on('create_worker', self._create_worker)
        self.on('broadcast', self._broadcast)
    
    def _create_worker(self, message):
        worker = Actor(f"Worker-{len(self.workers)}")
        self.workers.append(worker)
        print(f"๐Ÿš€ Created {worker.name}")
    
    def _broadcast(self, message):
        # ๐Ÿ“ข Send to all workers
        for worker in self.workers:
            worker.send(message.get('payload', {}))

# ๐Ÿ”„ Pattern 3: Request-Reply
class Calculator(Actor):
    def __init__(self):
        super().__init__("Calculator")
        self.on('calculate', self._calculate)
    
    def _calculate(self, message):
        operation = message.get('operation')
        a, b = message.get('a', 0), message.get('b', 0)
        
        result = None
        if operation == 'add':
            result = a + b
        elif operation == 'multiply':
            result = a * b
        
        # ๐Ÿ“ค Send reply
        reply_to = message.get('reply_to')
        if reply_to and result is not None:
            reply_to.send({'type': 'result', 'value': result})

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: Order Processing System

Letโ€™s build a real-world order processing system:

# ๐Ÿ›๏ธ Order processing with actors
class OrderProcessor(Actor):
    def __init__(self):
        super().__init__("OrderProcessor")
        self.inventory = Actor("Inventory")
        self.payment = Actor("PaymentGateway")
        self.shipping = Actor("ShippingService")
        
        # ๐Ÿ“จ Set up handlers
        self.on('process_order', self._process_order)
        self._setup_services()
    
    def _setup_services(self):
        # ๐Ÿ“ฆ Inventory service
        inventory_stock = {'๐Ÿ•': 10, '๐Ÿ”': 15, '๐ŸŒฎ': 20}
        
        def check_stock(msg):
            item = msg.get('item')
            quantity = msg.get('quantity', 1)
            available = inventory_stock.get(item, 0) >= quantity
            
            reply_to = msg.get('reply_to')
            if reply_to:
                reply_to.send({
                    'type': 'stock_result',
                    'available': available,
                    'item': item
                })
                
            if available:
                inventory_stock[item] -= quantity
                print(f"โœ… Reserved {quantity} {item}")
        
        self.inventory.on('check_stock', check_stock)
        
        # ๐Ÿ’ณ Payment service
        def process_payment(msg):
            amount = msg.get('amount', 0)
            print(f"๐Ÿ’ฐ Processing payment of ${amount}")
            
            # Simulate payment processing
            time.sleep(0.1)
            success = amount < 100  # ๐Ÿ˜… Demo logic
            
            reply_to = msg.get('reply_to')
            if reply_to:
                reply_to.send({
                    'type': 'payment_result',
                    'success': success
                })
        
        self.payment.on('process_payment', process_payment)
        
        # ๐Ÿšš Shipping service
        def ship_order(msg):
            items = msg.get('items', [])
            address = msg.get('address', 'Unknown')
            print(f"๐Ÿ“ฆ Shipping {items} to {address}")
            
            reply_to = msg.get('reply_to')
            if reply_to:
                reply_to.send({
                    'type': 'shipping_result',
                    'tracking': f"TRACK-{hash(str(items))}"
                })
        
        self.shipping.on('ship_order', ship_order)
    
    def _process_order(self, message):
        order_id = message.get('order_id')
        items = message.get('items', [])
        total = message.get('total', 0)
        
        print(f"๐Ÿ›’ Processing order {order_id}")
        
        # ๐Ÿ“‹ Check inventory for all items
        for item, quantity in items:
            self.inventory.send({
                'type': 'check_stock',
                'item': item,
                'quantity': quantity,
                'reply_to': self
            })
        
        # ๐Ÿ’ณ Process payment
        self.payment.send({
            'type': 'process_payment',
            'amount': total,
            'reply_to': self
        })
        
        # Note: In real system, you'd wait for responses
        # before proceeding to shipping

# ๐ŸŽฎ Let's use it!
order_system = OrderProcessor()

# ๐Ÿ“ค Place an order
order_system.send({
    'type': 'process_order',
    'order_id': 'ORD-001',
    'items': [('๐Ÿ•', 2), ('๐Ÿ”', 1)],
    'total': 35.99
})

time.sleep(0.5)  # Let it process

๐ŸŽฏ Try it yourself: Add a notification actor that sends order updates to customers!

๐ŸŽฎ Example 2: Game Server with Actors

Letโ€™s make a multiplayer game server:

# ๐Ÿ† Game server with actor model
class GameRoom(Actor):
    def __init__(self, room_id: str):
        super().__init__(f"GameRoom-{room_id}")
        self.room_id = room_id
        self.players = {}  # ๐Ÿ‘ฅ Player actors
        self.game_state = {
            'scores': {},
            'round': 1,
            'active': False
        }
        
        # ๐ŸŽฏ Register handlers
        self.on('join', self._handle_join)
        self.on('leave', self._handle_leave)
        self.on('player_action', self._handle_action)
        self.on('start_game', self._start_game)
    
    def _handle_join(self, message):
        player_id = message.get('player_id')
        player_name = message.get('name', f'Player-{player_id}')
        
        # ๐ŸŽฎ Create player actor
        player = PlayerActor(player_id, player_name, self)
        self.players[player_id] = player
        self.game_state['scores'][player_id] = 0
        
        print(f"โœจ {player_name} joined room {self.room_id}!")
        
        # ๐Ÿ“ข Notify other players
        self._broadcast({
            'type': 'player_joined',
            'player': player_name
        }, exclude=player_id)
    
    def _handle_leave(self, message):
        player_id = message.get('player_id')
        if player_id in self.players:
            player = self.players[player_id]
            player.stop()
            del self.players[player_id]
            del self.game_state['scores'][player_id]
            
            print(f"๐Ÿ‘‹ Player {player_id} left the room")
    
    def _handle_action(self, message):
        player_id = message.get('player_id')
        action = message.get('action')
        
        if action == 'score':
            points = message.get('points', 1)
            self.game_state['scores'][player_id] += points
            
            # ๐ŸŽŠ Check for winner
            if self.game_state['scores'][player_id] >= 10:
                self._end_game(player_id)
            else:
                self._broadcast({
                    'type': 'score_update',
                    'scores': self.game_state['scores']
                })
    
    def _start_game(self, message):
        self.game_state['active'] = True
        print(f"๐ŸŽฎ Game started in room {self.room_id}!")
        
        self._broadcast({
            'type': 'game_started',
            'round': self.game_state['round']
        })
    
    def _end_game(self, winner_id):
        self.game_state['active'] = False
        winner_name = self.players[winner_id].name
        
        print(f"๐Ÿ† {winner_name} wins in room {self.room_id}!")
        
        self._broadcast({
            'type': 'game_ended',
            'winner': winner_name,
            'final_scores': self.game_state['scores']
        })
    
    def _broadcast(self, message, exclude=None):
        """๐Ÿ“ข Send message to all players"""
        for player_id, player in self.players.items():
            if player_id != exclude:
                player.send(message)

class PlayerActor(Actor):
    def __init__(self, player_id: str, name: str, room: GameRoom):
        super().__init__(f"Player-{player_id}")
        self.player_id = player_id
        self.name = name
        self.room = room
        self.score = 0
        
        # ๐ŸŽฏ Register handlers
        self.on('game_update', self._handle_update)
        self.on('power_up', self._use_power_up)
    
    def _handle_update(self, message):
        update_type = message.get('type')
        print(f"๐Ÿ“ฑ {self.name} received: {update_type}")
    
    def _use_power_up(self, message):
        power = message.get('power', 'boost')
        print(f"โšก {self.name} used {power}!")
        
        # ๐Ÿ“ค Notify room
        self.room.send({
            'type': 'player_action',
            'player_id': self.player_id,
            'action': 'score',
            'points': 2 if power == 'double' else 1
        })

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Actor Hierarchies and Supervision

When youโ€™re ready to level up, implement sophisticated supervision strategies:

# ๐ŸŽฏ Advanced supervision with restart policies
class SupervisedActor(Actor):
    def __init__(self, name: str, supervisor=None):
        super().__init__(name)
        self.supervisor = supervisor
        self.restart_count = 0
        self.max_restarts = 3
        
    def _run(self):
        """๐Ÿ”„ Enhanced run with error handling"""
        while self.running:
            try:
                message = self.mailbox.get(timeout=0.1)
                self._handle_message(message)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"๐Ÿ’ฅ {self.name} crashed: {e}")
                self._notify_supervisor('crash', {'error': str(e)})
                
                if self.restart_count < self.max_restarts:
                    self.restart_count += 1
                    print(f"๐Ÿ”„ Restarting {self.name} (attempt {self.restart_count})")
                    continue
                else:
                    print(f"โ˜ ๏ธ {self.name} exceeded restart limit")
                    self.running = False
    
    def _notify_supervisor(self, event_type: str, data: Dict):
        """๐Ÿ“ค Notify supervisor of events"""
        if self.supervisor:
            self.supervisor.send({
                'type': 'supervision_event',
                'actor': self.name,
                'event': event_type,
                'data': data
            })

# ๐Ÿช„ Supervisor with strategies
class SupervisorActor(Actor):
    def __init__(self):
        super().__init__("Supervisor")
        self.children = {}
        self.strategies = {
            'one_for_one': self._restart_one,
            'one_for_all': self._restart_all,
            'rest_for_one': self._restart_rest
        }
        self.strategy = 'one_for_one'
        
        self.on('supervision_event', self._handle_supervision)
        self.on('create_child', self._create_child)
    
    def _create_child(self, message):
        name = message.get('name')
        child = SupervisedActor(name, supervisor=self)
        self.children[name] = child
        print(f"โœจ Supervisor created child: {name}")
    
    def _handle_supervision(self, message):
        actor_name = message.get('actor')
        event = message.get('event')
        
        if event == 'crash':
            print(f"๐Ÿšจ Supervisor handling crash of {actor_name}")
            self.strategies[self.strategy](actor_name)
    
    def _restart_one(self, actor_name):
        """๐ŸŽฏ Restart only the failed actor"""
        if actor_name in self.children:
            old_actor = self.children[actor_name]
            old_actor.stop()
            
            new_actor = SupervisedActor(actor_name, supervisor=self)
            self.children[actor_name] = new_actor
            print(f"โœ… Restarted {actor_name}")
    
    def _restart_all(self, actor_name):
        """๐ŸŒŸ Restart all children"""
        print("๐Ÿ’ซ Restarting all children...")
        for name in list(self.children.keys()):
            self._restart_one(name)
    
    def _restart_rest(self, actor_name):
        """๐Ÿ”„ Restart failed actor and all created after it"""
        # Implementation depends on creation order tracking
        pass

๐Ÿ—๏ธ Advanced Topic 2: Distributed Actors

For the brave developers, hereโ€™s distributed actor communication:

import json
import socket
from typing import Optional

# ๐Ÿš€ Network-enabled actor
class NetworkActor(Actor):
    def __init__(self, name: str, host='localhost', port=None):
        super().__init__(name)
        self.host = host
        self.port = port or self._get_free_port()
        self.socket = None
        self.remote_actors = {}  # ๐ŸŒ Remote actor addresses
        
        self._start_server()
        self.on('remote_message', self._handle_remote)
        self.on('register_remote', self._register_remote)
    
    def _get_free_port(self):
        """๐Ÿ” Find available port"""
        with socket.socket() as s:
            s.bind(('', 0))
            return s.getsockname()[1]
    
    def _start_server(self):
        """๐Ÿ–ฅ๏ธ Start network server"""
        def server_thread():
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind((self.host, self.port))
            server.listen(5)
            server.settimeout(0.1)
            
            print(f"๐ŸŒ {self.name} listening on {self.host}:{self.port}")
            
            while self.running:
                try:
                    client, addr = server.accept()
                    data = client.recv(4096)
                    if data:
                        message = json.loads(data.decode())
                        self.send(message)
                    client.close()
                except socket.timeout:
                    continue
                except Exception as e:
                    print(f"โš ๏ธ Network error: {e}")
            
            server.close()
        
        threading.Thread(target=server_thread, daemon=True).start()
    
    def _register_remote(self, message):
        """๐Ÿ“ Register remote actor address"""
        actor_name = message.get('name')
        host = message.get('host')
        port = message.get('port')
        
        self.remote_actors[actor_name] = (host, port)
        print(f"๐Ÿ“ก Registered remote actor: {actor_name} at {host}:{port}")
    
    def send_remote(self, actor_name: str, message: Dict):
        """๐Ÿ“ค Send message to remote actor"""
        if actor_name in self.remote_actors:
            host, port = self.remote_actors[actor_name]
            
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((host, port))
                sock.send(json.dumps(message).encode())
                sock.close()
                print(f"๐Ÿ“จ Sent to {actor_name}: {message['type']}")
            except Exception as e:
                print(f"โŒ Failed to send to {actor_name}: {e}")

# ๐ŸŽฏ Example: Distributed chat system
class ChatNode(NetworkActor):
    def __init__(self, node_id: str):
        super().__init__(f"ChatNode-{node_id}")
        self.messages = []
        
        self.on('chat_message', self._handle_chat)
        self.on('sync_request', self._handle_sync)
    
    def _handle_chat(self, message):
        user = message.get('user')
        text = message.get('text')
        timestamp = message.get('timestamp', time.time())
        
        chat_msg = {
            'user': user,
            'text': text,
            'timestamp': timestamp,
            'node': self.name
        }
        
        self.messages.append(chat_msg)
        print(f"๐Ÿ’ฌ {user}: {text}")
        
        # ๐Ÿ“ข Broadcast to other nodes
        for remote_node in self.remote_actors:
            self.send_remote(remote_node, {
                'type': 'chat_message',
                'user': user,
                'text': text,
                'timestamp': timestamp
            })

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Deadlocks in Request-Reply

# โŒ Wrong way - can cause deadlock!
class BadActor(Actor):
    def __init__(self):
        super().__init__("BadActor")
        self.other_actor = None
        
    def process(self, msg):
        # ๐Ÿ’ฅ Blocking wait for reply - DON'T DO THIS!
        reply_queue = queue.Queue()
        self.other_actor.send({
            'type': 'request',
            'reply_to': reply_queue  
        })
        # This blocks the actor's thread!
        result = reply_queue.get()  # ๐Ÿ˜ฐ Deadlock risk!

# โœ… Correct way - use callbacks or futures!
class GoodActor(Actor):
    def __init__(self):
        super().__init__("GoodActor")
        self.pending_requests = {}
        
    def process(self, msg):
        request_id = str(uuid.uuid4())
        self.pending_requests[request_id] = msg.get('original_sender')
        
        # ๐Ÿ“ค Non-blocking send
        self.other_actor.send({
            'type': 'request',
            'request_id': request_id,
            'reply_to': self
        })
        # โœ… Continue processing other messages!
    
    def handle_reply(self, msg):
        request_id = msg.get('request_id')
        if request_id in self.pending_requests:
            # Process the reply asynchronously
            original_sender = self.pending_requests.pop(request_id)
            # Do something with the result

๐Ÿคฏ Pitfall 2: Message Ordering Assumptions

# โŒ Dangerous - assuming message order!
class OrderDependentActor(Actor):
    def process_sequence(self):
        self.send({'type': 'step1'})
        self.send({'type': 'step2'})  # ๐Ÿ’ฅ May arrive before step1!
        self.send({'type': 'step3'})

# โœ… Safe - explicit sequencing!
class SequencedActor(Actor):
    def __init__(self):
        super().__init__("SequencedActor")
        self.sequence_number = 0
        
    def send_sequenced(self, message):
        # ๐ŸŽฏ Add sequence number
        message['sequence'] = self.sequence_number
        self.sequence_number += 1
        self.send(message)
    
    def _handle_message(self, message):
        # โœ… Process in order
        seq = message.get('sequence', -1)
        # Buffer out-of-order messages if needed

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Keep Actors Focused: One responsibility per actor
  2. ๐Ÿ“ Immutable Messages: Never modify messages after sending
  3. ๐Ÿ›ก๏ธ Fail Fast: Let actors crash and restart cleanly
  4. ๐ŸŽจ Use Supervision: Always have a supervision strategy
  5. โœจ Avoid Blocking: Never block in message handlers
  6. ๐Ÿ“Š Monitor Mailboxes: Watch for message queue buildup
  7. ๐Ÿ”„ Design for Failure: Assume actors can crash anytime

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Distributed Task Queue

Create an actor-based task queue system:

๐Ÿ“‹ Requirements:

  • โœ… Master actor that distributes tasks
  • ๐Ÿท๏ธ Worker actors that process tasks
  • ๐Ÿ‘ค Result aggregator actor
  • ๐Ÿ“… Task priorities and deadlines
  • ๐ŸŽจ Fault tolerance with restart capability

๐Ÿš€ Bonus Points:

  • Add work stealing between workers
  • Implement backpressure handling
  • Create a monitoring dashboard actor

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
import heapq
from dataclasses import dataclass
from datetime import datetime, timedelta

# ๐ŸŽฏ Our distributed task queue system!
@dataclass
class Task:
    id: str
    priority: int  # Lower = higher priority
    deadline: datetime
    payload: Dict[str, Any]
    retries: int = 0
    
    def __lt__(self, other):
        return self.priority < other.priority

class TaskMaster(Actor):
    def __init__(self):
        super().__init__("TaskMaster")
        self.task_queue = []  # ๐Ÿ“‹ Priority queue
        self.workers = []  # ๐Ÿ‘ฅ Worker actors
        self.results = {}  # ๐Ÿ“Š Task results
        self.pending_tasks = {}  # ๐Ÿ”„ Tasks being processed
        
        # ๐Ÿ“จ Register handlers
        self.on('submit_task', self._submit_task)
        self.on('worker_ready', self._worker_ready)
        self.on('task_complete', self._task_complete)
        self.on('task_failed', self._task_failed)
        self.on('create_worker', self._create_worker)
    
    def _submit_task(self, message):
        task = Task(
            id=message.get('id', str(uuid.uuid4())),
            priority=message.get('priority', 5),
            deadline=datetime.now() + timedelta(seconds=message.get('timeout', 60)),
            payload=message.get('payload', {})
        )
        
        heapq.heappush(self.task_queue, task)
        print(f"โœ… Task {task.id} submitted with priority {task.priority}")
        
        # ๐Ÿš€ Try to assign immediately
        self._try_assign_tasks()
    
    def _create_worker(self, message):
        worker_id = f"Worker-{len(self.workers)}"
        worker = TaskWorker(worker_id, self)
        self.workers.append(worker)
        print(f"๐Ÿš€ Created {worker_id}")
    
    def _worker_ready(self, message):
        worker = message.get('worker')
        self._assign_task_to_worker(worker)
    
    def _try_assign_tasks(self):
        """๐ŸŽฏ Assign tasks to available workers"""
        for worker in self.workers:
            if not self.task_queue:
                break
                
            # Check if worker is available
            worker_id = worker.name
            if worker_id not in self.pending_tasks:
                self._assign_task_to_worker(worker)
    
    def _assign_task_to_worker(self, worker):
        if not self.task_queue:
            return
            
        task = heapq.heappop(self.task_queue)
        
        # โฐ Check deadline
        if datetime.now() > task.deadline:
            print(f"โš ๏ธ Task {task.id} expired!")
            self.results[task.id] = {'status': 'expired'}
            return
        
        self.pending_tasks[worker.name] = task
        worker.send({
            'type': 'process_task',
            'task': task
        })
        print(f"๐Ÿ“ค Assigned task {task.id} to {worker.name}")
    
    def _task_complete(self, message):
        worker_id = message.get('worker_id')
        task_id = message.get('task_id')
        result = message.get('result')
        
        if worker_id in self.pending_tasks:
            del self.pending_tasks[worker_id]
        
        self.results[task_id] = {
            'status': 'complete',
            'result': result,
            'worker': worker_id
        }
        
        print(f"โœ… Task {task_id} completed by {worker_id}")
        
        # ๐Ÿ”„ Worker is ready for next task
        self._worker_ready({'worker': next(w for w in self.workers if w.name == worker_id)})
    
    def _task_failed(self, message):
        worker_id = message.get('worker_id')
        task_id = message.get('task_id')
        error = message.get('error')
        
        task = self.pending_tasks.get(worker_id)
        if task and task.retries < 3:
            # ๐Ÿ”„ Retry the task
            task.retries += 1
            heapq.heappush(self.task_queue, task)
            print(f"๐Ÿ”„ Retrying task {task_id} (attempt {task.retries + 1})")
        else:
            # โŒ Task failed permanently
            self.results[task_id] = {
                'status': 'failed',
                'error': error
            }
            print(f"โŒ Task {task_id} failed permanently")
        
        if worker_id in self.pending_tasks:
            del self.pending_tasks[worker_id]

class TaskWorker(Actor):
    def __init__(self, worker_id: str, master: TaskMaster):
        super().__init__(worker_id)
        self.master = master
        self.current_task = None
        
        self.on('process_task', self._process_task)
        
        # ๐ŸŽฏ Tell master we're ready
        self.master.send({
            'type': 'worker_ready',
            'worker': self
        })
    
    def _process_task(self, message):
        task = message.get('task')
        self.current_task = task
        
        try:
            # ๐ŸŽฎ Simulate task processing
            print(f"โš™๏ธ {self.name} processing task {task.id}")
            
            # Your actual task processing logic here
            result = self._execute_task(task.payload)
            
            # ๐Ÿ“ค Report success
            self.master.send({
                'type': 'task_complete',
                'worker_id': self.name,
                'task_id': task.id,
                'result': result
            })
            
        except Exception as e:
            # ๐Ÿ’ฅ Report failure
            self.master.send({
                'type': 'task_failed',
                'worker_id': self.name,
                'task_id': task.id,
                'error': str(e)
            })
        
        finally:
            self.current_task = None
    
    def _execute_task(self, payload):
        # ๐ŸŽฏ Actual task execution
        operation = payload.get('operation', 'default')
        
        if operation == 'compute':
            # Heavy computation simulation
            time.sleep(0.1)
            return {'computed': payload.get('value', 0) * 2}
        elif operation == 'fetch':
            # I/O simulation
            time.sleep(0.05)
            return {'fetched': 'data'}
        else:
            return {'processed': True}

# ๐Ÿ“Š Result Aggregator
class ResultAggregator(Actor):
    def __init__(self, master: TaskMaster):
        super().__init__("ResultAggregator")
        self.master = master
        self.aggregated_results = {}
        
        self.on('aggregate', self._aggregate)
        self.on('get_report', self._get_report)
    
    def _aggregate(self, message):
        batch_id = message.get('batch_id')
        task_ids = message.get('task_ids', [])
        
        results = []
        for task_id in task_ids:
            if task_id in self.master.results:
                results.append(self.master.results[task_id])
        
        self.aggregated_results[batch_id] = {
            'total': len(task_ids),
            'completed': sum(1 for r in results if r.get('status') == 'complete'),
            'failed': sum(1 for r in results if r.get('status') == 'failed'),
            'results': results
        }
        
        print(f"๐Ÿ“Š Aggregated batch {batch_id}: {self.aggregated_results[batch_id]['completed']}/{len(task_ids)} completed")
    
    def _get_report(self, message):
        print("๐Ÿ“‹ Task Queue Report:")
        for batch_id, stats in self.aggregated_results.items():
            print(f"  Batch {batch_id}: {stats['completed']} completed, {stats['failed']} failed")

# ๐ŸŽฎ Test it out!
task_queue = TaskMaster()

# Create workers
for i in range(3):
    task_queue.send({'type': 'create_worker'})

# Submit tasks
for i in range(10):
    task_queue.send({
        'type': 'submit_task',
        'id': f'task-{i}',
        'priority': i % 3,  # Different priorities
        'payload': {
            'operation': ['compute', 'fetch'][i % 2],
            'value': i
        }
    })

# Let it process
time.sleep(1)

# Create aggregator and get report
aggregator = ResultAggregator(task_queue)
aggregator.send({
    'type': 'aggregate',
    'batch_id': 'batch-1',
    'task_ids': [f'task-{i}' for i in range(10)]
})
aggregator.send({'type': 'get_report'})

๐ŸŽ“ Key Takeaways

Youโ€™ve mastered the Actor Model! Hereโ€™s what you can now do:

  • โœ… Design concurrent systems without shared state complexity ๐Ÿ’ช
  • โœ… Build fault-tolerant applications with supervision ๐Ÿ›ก๏ธ
  • โœ… Scale across cores and machines with distributed actors ๐ŸŽฏ
  • โœ… Debug message-passing systems like a pro ๐Ÿ›
  • โœ… Create reactive, event-driven architectures with confidence! ๐Ÿš€

Remember: The Actor Model transforms complex concurrency into simple message passing. Let each actor do one thing well! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve conquered the Actor Model and message passing!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Build a real-time chat system with actors
  2. ๐Ÿ—๏ธ Explore actor frameworks like Pykka or Ray
  3. ๐Ÿ“š Move on to our next tutorial: Async/Await Patterns
  4. ๐ŸŒŸ Share your actor-based creations with the community!

Remember: Every distributed system expert started with their first actor. Keep experimenting, keep learning, and most importantly, have fun with concurrent programming! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ