+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 336 of 365

๐Ÿ“˜ Barriers and Events: Synchronization

Master barriers and events: synchronization in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on Barriers and Events in Python! ๐ŸŽ‰ In this guide, weโ€™ll explore how to synchronize multiple threads or processes using these powerful coordination primitives.

Have you ever organized a group activity where everyone needs to wait for each other? Thatโ€™s exactly what barriers and events help us do in concurrent programming! Whether youโ€™re building a multiplayer game ๐ŸŽฎ, coordinating data processing pipelines ๐Ÿ“Š, or managing distributed systems ๐ŸŒ, understanding barriers and events is essential for writing robust concurrent code.

By the end of this tutorial, youโ€™ll feel confident using barriers and events to orchestrate complex concurrent operations! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Barriers and Events

๐Ÿค” What are Barriers and Events?

A Barrier is like a meeting point at a theme park ๐ŸŽข. Imagine your group agrees to meet at the entrance before going on the next ride - everyone waits until the whole group arrives, then you all proceed together!

An Event is like a starting gun at a race ๐Ÿƒโ€โ™‚๏ธ. All runners wait for the signal, and when it fires, everyone starts running simultaneously!

In Python terms:

  • โœจ Barrier: Synchronizes a fixed number of threads at a common point
  • ๐Ÿš€ Event: Allows threads to wait for a specific condition to occur
  • ๐Ÿ›ก๏ธ Thread-safe: Both are designed for safe concurrent use

๐Ÿ’ก Why Use Barriers and Events?

Hereโ€™s why developers love these synchronization tools:

  1. Coordination ๐Ÿ”„: Synchronize multiple workers at checkpoints
  2. Signal Broadcasting ๐Ÿ“ก: Notify multiple threads of state changes
  3. Phase Synchronization ๐ŸŽฏ: Ensure all threads complete one phase before starting the next
  4. Resource Management ๐Ÿ›ก๏ธ: Control access to shared resources

Real-world example: Imagine building a data processing pipeline ๐Ÿญ. With barriers, you can ensure all workers finish preprocessing before anyone starts the analysis phase!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Working with Barriers

Letโ€™s start with barriers:

import threading
import time
import random

# ๐ŸŽฏ Create a barrier for 3 threads
barrier = threading.Barrier(3)

def worker(worker_id):
    # ๐Ÿ‘ท Simulate some work
    work_time = random.uniform(1, 3)
    print(f"Worker {worker_id} starting work... ๐Ÿ”จ")
    time.sleep(work_time)
    
    print(f"Worker {worker_id} finished! Waiting at barrier... โณ")
    
    # ๐Ÿšง Wait at the barrier
    try:
        barrier.wait()
        print(f"Worker {worker_id} passed the barrier! ๐ŸŽ‰")
    except threading.BrokenBarrierError:
        print(f"Worker {worker_id}: Barrier is broken! ๐Ÿ’ฅ")

# ๐Ÿš€ Start the workers
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# โณ Wait for all threads
for t in threads:
    t.join()

๐Ÿ’ก Explanation: The barrier ensures all three workers wait for each other before proceeding. No matter who finishes first, everyone waits at the barrier!

๐ŸŽฏ Working with Events

Now letโ€™s explore events:

import threading
import time

# ๐Ÿšฆ Create an event
start_event = threading.Event()
finish_event = threading.Event()

def racer(name):
    print(f"{name} is ready at the starting line! ๐Ÿƒโ€โ™‚๏ธ")
    
    # โณ Wait for the starting signal
    start_event.wait()
    
    print(f"{name} is running! ๐Ÿ’จ")
    time.sleep(random.uniform(2, 4))
    
    print(f"{name} finished the race! ๐Ÿ")
    
    # ๐ŸŽฏ Signal that this racer finished
    finish_event.set()

# ๐Ÿ Create racers
racers = ['Alice ๐ŸŸฅ', 'Bob ๐ŸŸฆ', 'Charlie ๐ŸŸฉ']
threads = []

for racer_name in racers:
    t = threading.Thread(target=racer, args=(racer_name,))
    threads.append(t)
    t.start()

# ๐ŸŽฌ Race coordinator
print("Get ready... ๐Ÿ“ข")
time.sleep(2)
print("Get set... ๐Ÿ””")
time.sleep(1)
print("GO! ๐Ÿš€")

# ๐Ÿšฆ Fire the starting signal!
start_event.set()

# โณ Wait for all racers
for t in threads:
    t.join()

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: Restaurant Order System

Letโ€™s build a restaurant coordination system:

import threading
import time
import random
from queue import Queue

class Restaurant:
    def __init__(self, num_chefs=3):
        # ๐Ÿณ Kitchen coordination
        self.order_ready = threading.Event()
        self.kitchen_barrier = threading.Barrier(num_chefs)
        self.orders = Queue()
        self.prepared_orders = []
        
    def chef(self, chef_id, dish_part):
        """Each chef prepares one part of the meal"""
        while True:
            # ๐Ÿ“‹ Wait for an order
            self.order_ready.wait()
            
            if self.orders.empty():
                break
                
            order = self.orders.queue[0]  # Peek at order
            
            # ๐Ÿ‘จโ€๐Ÿณ Prepare the dish part
            print(f"Chef {chef_id} preparing {dish_part} for order {order}... ๐Ÿณ")
            time.sleep(random.uniform(1, 2))
            
            # ๐Ÿšง Wait for all chefs to finish their parts
            try:
                index = self.kitchen_barrier.wait()
                
                # ๐ŸŽฏ Last chef to arrive combines the dish
                if index == 0:
                    self.orders.get()  # Remove order
                    print(f"โœจ Order {order} is ready! All parts combined! ๐Ÿฝ๏ธ")
                    self.prepared_orders.append(order)
                    
                    # Reset for next order
                    if self.orders.empty():
                        self.order_ready.clear()
                        
            except threading.BrokenBarrierError:
                print(f"Chef {chef_id}: Kitchen coordination failed! ๐Ÿ’ฅ")
    
    def waiter(self, order_id):
        """Waiter places an order"""
        print(f"๐Ÿ“ Waiter placing order {order_id}...")
        self.orders.put(order_id)
        self.order_ready.set()
        
    def run_service(self):
        # ๐Ÿ‘จโ€๐Ÿณ Start the kitchen staff
        chefs = [
            threading.Thread(target=self.chef, args=(1, "appetizer ๐Ÿฅ—")),
            threading.Thread(target=self.chef, args=(2, "main course ๐Ÿ–")),
            threading.Thread(target=self.chef, args=(3, "dessert ๐Ÿฐ"))
        ]
        
        for chef in chefs:
            chef.start()
        
        # ๐Ÿ“ Place some orders
        for i in range(3):
            time.sleep(0.5)
            self.waiter(f"Table-{i+1}")
        
        # ๐Ÿ›‘ Signal end of service
        time.sleep(5)
        self.order_ready.set()  # Wake up chefs one last time
        
        for chef in chefs:
            chef.join()
            
        print(f"\n๐ŸŽ‰ Service complete! Prepared {len(self.prepared_orders)} orders!")

# ๐ŸŽฎ Run the restaurant
restaurant = Restaurant()
restaurant.run_service()

๐ŸŽฏ Try it yourself: Add a drinks station that operates independently but signals when drinks are ready!

๐ŸŽฎ Example 2: Multiplayer Game Synchronization

Letโ€™s create a turn-based game system:

import threading
import time
import random

class MultiplayerGame:
    def __init__(self, num_players=4):
        self.num_players = num_players
        # ๐ŸŽฎ Game synchronization
        self.round_barrier = threading.Barrier(num_players)
        self.game_start = threading.Event()
        self.game_over = threading.Event()
        self.current_round = 0
        self.max_rounds = 5
        self.player_scores = {i: 0 for i in range(num_players)}
        self.lock = threading.Lock()
        
    def player(self, player_id):
        """Each player's game logic"""
        player_name = f"Player-{player_id} {'๐Ÿ”ด๐Ÿ”ต๐ŸŸข๐ŸŸก'[player_id]}"
        
        print(f"{player_name} joined the game! ๐ŸŽฎ")
        
        # โณ Wait for game to start
        self.game_start.wait()
        print(f"{player_name} ready to play! ๐ŸŽฏ")
        
        while not self.game_over.is_set():
            # ๐ŸŽฒ Make a move
            move_score = random.randint(1, 6)
            print(f"{player_name} rolled {move_score} ๐ŸŽฒ")
            
            # ๐Ÿ“Š Update score
            with self.lock:
                self.player_scores[player_id] += move_score
            
            # ๐Ÿšง Wait for all players to finish their turn
            try:
                index = self.round_barrier.wait()
                
                # ๐ŸŽฏ Last player to arrive handles round management
                if index == 0:
                    with self.lock:
                        self.current_round += 1
                        print(f"\n--- Round {self.current_round} Complete ---")
                        self._display_scores()
                        
                        if self.current_round >= self.max_rounds:
                            self.game_over.set()
                            self._announce_winner()
                            
            except threading.BrokenBarrierError:
                print(f"{player_name}: Game sync failed! ๐Ÿ’ฅ")
                break
                
            # โธ๏ธ Small pause between rounds
            time.sleep(0.5)
    
    def _display_scores(self):
        """Display current scores"""
        print("๐Ÿ“Š Current Scores:")
        for player_id, score in self.player_scores.items():
            emoji = '๐Ÿ”ด๐Ÿ”ต๐ŸŸข๐ŸŸก'[player_id]
            print(f"  Player-{player_id} {emoji}: {score} points")
    
    def _announce_winner(self):
        """Announce the game winner"""
        winner_id = max(self.player_scores, key=self.player_scores.get)
        winner_score = self.player_scores[winner_id]
        emoji = '๐Ÿ”ด๐Ÿ”ต๐ŸŸข๐ŸŸก'[winner_id]
        print(f"\n๐Ÿ† Player-{winner_id} {emoji} wins with {winner_score} points! ๐ŸŽ‰")
    
    def start_game(self):
        """Start the multiplayer game"""
        # ๐ŸŽฎ Create player threads
        players = []
        for i in range(self.num_players):
            p = threading.Thread(target=self.player, args=(i,))
            players.append(p)
            p.start()
            time.sleep(0.1)  # Stagger joins
        
        # ๐ŸŽฌ Countdown
        print("\n๐ŸŽฎ GAME STARTING IN...")
        for i in range(3, 0, -1):
            print(f"   {i}...")
            time.sleep(1)
        print("   GO! ๐Ÿš€\n")
        
        # ๐Ÿšฆ Start the game!
        self.game_start.set()
        
        # โณ Wait for game to finish
        for p in players:
            p.join()

# ๐ŸŽฎ Play the game!
game = MultiplayerGame(num_players=4)
game.start_game()

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Barrier Patterns

When youโ€™re ready to level up, try these advanced barrier techniques:

import threading
import time

class PhaseBarrier:
    """Multi-phase synchronization with barriers"""
    
    def __init__(self, num_workers):
        self.num_workers = num_workers
        self.phase_barriers = {}
        self.results = {}
        self.lock = threading.Lock()
    
    def get_barrier(self, phase_name):
        """Get or create a barrier for a phase"""
        with self.lock:
            if phase_name not in self.phase_barriers:
                self.phase_barriers[phase_name] = threading.Barrier(
                    self.num_workers,
                    action=lambda: print(f"โœจ Phase '{phase_name}' complete!")
                )
            return self.phase_barriers[phase_name]
    
    def worker(self, worker_id):
        """Worker with multiple synchronization phases"""
        
        # ๐Ÿ”„ Phase 1: Data Loading
        print(f"Worker-{worker_id}: Loading data... ๐Ÿ“‚")
        time.sleep(random.uniform(0.5, 1.5))
        self.get_barrier("data_loading").wait()
        
        # ๐Ÿ”„ Phase 2: Processing
        print(f"Worker-{worker_id}: Processing... โš™๏ธ")
        time.sleep(random.uniform(1, 2))
        result = worker_id * 100  # Simulate computation
        
        with self.lock:
            self.results[worker_id] = result
            
        self.get_barrier("processing").wait()
        
        # ๐Ÿ”„ Phase 3: Aggregation (only one worker does this)
        barrier = self.get_barrier("aggregation")
        index = barrier.wait()
        
        if index == 0:  # First worker to complete
            total = sum(self.results.values())
            print(f"๐Ÿ“Š Aggregated result: {total}")
            
        # ๐Ÿ”„ Phase 4: Cleanup
        print(f"Worker-{worker_id}: Cleaning up... ๐Ÿงน")
        time.sleep(0.5)
        self.get_barrier("cleanup").wait()
        
        print(f"Worker-{worker_id}: All done! ๐ŸŽ‰")

# ๐Ÿš€ Run the phased computation
system = PhaseBarrier(num_workers=3)
workers = []

for i in range(3):
    w = threading.Thread(target=system.worker, args=(i,))
    workers.append(w)
    w.start()

for w in workers:
    w.join()

๐Ÿ—๏ธ Advanced Event Patterns

For the brave developers, hereโ€™s an advanced event coordination system:

import threading
import time
from enum import Enum

class SystemState(Enum):
    INITIALIZING = "๐Ÿ”„ Initializing"
    READY = "โœ… Ready"
    PROCESSING = "โš™๏ธ Processing"
    ERROR = "โŒ Error"
    SHUTDOWN = "๐Ÿ›‘ Shutdown"

class EventOrchestrator:
    """Complex event-driven system orchestration"""
    
    def __init__(self):
        # ๐ŸŽฏ State events
        self.state_events = {
            state: threading.Event() for state in SystemState
        }
        self.current_state = SystemState.INITIALIZING
        self.state_lock = threading.Lock()
        
        # ๐Ÿ“Š Component readiness
        self.component_ready = {}
        self.all_ready = threading.Event()
        
    def set_state(self, new_state):
        """Transition to a new state"""
        with self.state_lock:
            old_state = self.current_state
            self.current_state = new_state
            
            # Clear old state event
            self.state_events[old_state].clear()
            
            # Set new state event
            self.state_events[new_state].set()
            
            print(f"๐Ÿ”„ State transition: {old_state.value} โ†’ {new_state.value}")
    
    def wait_for_state(self, state, timeout=None):
        """Wait for a specific state"""
        return self.state_events[state].wait(timeout)
    
    def component(self, name, init_time):
        """Component lifecycle"""
        print(f"๐Ÿ”ง {name} starting initialization...")
        
        # ๐Ÿ”„ Initialize
        time.sleep(init_time)
        
        # โœ… Signal ready
        with self.state_lock:
            self.component_ready[name] = True
            print(f"โœ… {name} is ready!")
            
            # Check if all components are ready
            if len(self.component_ready) == 3:  # Expecting 3 components
                self.all_ready.set()
        
        # โณ Wait for system ready state
        self.wait_for_state(SystemState.READY)
        print(f"๐Ÿš€ {name} starting work...")
        
        # ๐Ÿ’ผ Do work
        if self.wait_for_state(SystemState.PROCESSING, timeout=5):
            time.sleep(random.uniform(1, 3))
            print(f"โœจ {name} completed work!")
        
        # ๐Ÿ›‘ Wait for shutdown
        self.wait_for_state(SystemState.SHUTDOWN)
        print(f"๐Ÿ‘‹ {name} shutting down...")
    
    def orchestrate(self):
        """Main orchestration logic"""
        # ๐Ÿš€ Start components
        components = [
            threading.Thread(target=self.component, args=("Database ๐Ÿ—„๏ธ", 1)),
            threading.Thread(target=self.component, args=("API Server ๐ŸŒ", 1.5)),
            threading.Thread(target=self.component, args=("Cache ๐Ÿ’พ", 0.5))
        ]
        
        for comp in components:
            comp.start()
        
        # โณ Wait for all components
        print("โณ Waiting for all components to be ready...")
        self.all_ready.wait()
        
        # ๐ŸŽฏ System is ready!
        self.set_state(SystemState.READY)
        time.sleep(1)
        
        # โš™๏ธ Start processing
        print("\n๐ŸŽฌ Starting main processing...")
        self.set_state(SystemState.PROCESSING)
        time.sleep(4)
        
        # ๐Ÿ›‘ Shutdown
        print("\n๐Ÿ”ด Initiating shutdown...")
        self.set_state(SystemState.SHUTDOWN)
        
        # Wait for all components
        for comp in components:
            comp.join()
        
        print("\nโœ… System shutdown complete!")

# ๐ŸŽฎ Run the orchestration
orchestrator = EventOrchestrator()
orchestrator.orchestrate()

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Barrier Deadlock

# โŒ Wrong way - mismatched thread count!
barrier = threading.Barrier(3)

def worker():
    print("Working... ๐Ÿ’ผ")
    barrier.wait()  # ๐Ÿ’ฅ Deadlock if only 2 threads!

# Only starting 2 threads but barrier expects 3
for i in range(2):  # Should be 3!
    threading.Thread(target=worker).start()

# โœ… Correct way - match thread count!
num_workers = 3
barrier = threading.Barrier(num_workers)

def safe_worker(worker_id):
    print(f"Worker {worker_id} working... ๐Ÿ’ผ")
    try:
        barrier.wait(timeout=5)  # โœ… Always use timeout!
        print(f"Worker {worker_id} passed barrier! โœ…")
    except threading.BrokenBarrierError:
        print(f"Worker {worker_id}: Barrier broken! ๐Ÿšจ")
    except threading.BarrierTimeoutError:
        print(f"Worker {worker_id}: Timeout! โฐ")

# Start exactly the right number of threads
for i in range(num_workers):
    threading.Thread(target=safe_worker, args=(i,)).start()

๐Ÿคฏ Pitfall 2: Event Race Conditions

# โŒ Dangerous - event might be set before wait!
event = threading.Event()

def racer():
    time.sleep(1)  # Simulate work
    event.wait()  # ๐Ÿ’ฅ Might miss the event!
    print("Started!")

# Event set immediately
event.set()
threading.Thread(target=racer).start()  # Thread misses event

# โœ… Safe - check state or use proper sequencing
event = threading.Event()
started = False

def safe_racer():
    print("Racer preparing... ๐Ÿƒโ€โ™‚๏ธ")
    
    # โœ… Check if already signaled
    if event.is_set():
        print("Event already fired! Starting immediately! ๐Ÿš€")
    else:
        print("Waiting for signal... โณ")
        event.wait()
    
    print("Racing! ๐Ÿ’จ")

# Start thread first
t = threading.Thread(target=safe_racer)
t.start()

# Then signal
time.sleep(0.5)
print("Firing starting gun! ๐Ÿ”ซ")
event.set()
t.join()

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Always Use Timeouts: Prevent indefinite blocking with timeout parameters
  2. ๐Ÿ“ Match Counts: Ensure barrier parties match actual thread count
  3. ๐Ÿ›ก๏ธ Handle Exceptions: Always catch BrokenBarrierError and timeout errors
  4. ๐ŸŽจ Clear Event Intent: Use descriptive names for events (e.g., data_ready, shutdown_signal)
  5. โœจ Reset Carefully: Understand when barriers reset automatically vs. manually

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Pipeline Processing System

Create a multi-stage data processing pipeline:

๐Ÿ“‹ Requirements:

  • โœ… Three processing stages: Load, Transform, Save
  • ๐Ÿท๏ธ Multiple workers per stage
  • ๐Ÿ‘ค Barriers between stages to ensure order
  • ๐Ÿ“… Events for error handling and shutdown
  • ๐ŸŽจ Progress tracking with visual indicators

๐Ÿš€ Bonus Points:

  • Add dynamic worker scaling
  • Implement graceful shutdown
  • Create performance metrics
  • Handle worker failures gracefully

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
import threading
import time
import random
from queue import Queue
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class DataItem:
    id: int
    data: str
    processed: bool = False

class DataPipeline:
    def __init__(self, workers_per_stage: Dict[str, int]):
        # ๐ŸŽฏ Pipeline configuration
        self.stages = ["Load ๐Ÿ“‚", "Transform ๐Ÿ”„", "Save ๐Ÿ’พ"]
        self.workers_per_stage = workers_per_stage
        
        # ๐Ÿšง Stage barriers
        self.stage_barriers = {}
        total_workers = sum(workers_per_stage.values())
        for stage in self.stages:
            self.stage_barriers[stage] = threading.Barrier(
                workers_per_stage[stage],
                action=lambda s=stage: print(f"\nโœจ Stage '{s}' complete!\n")
            )
        
        # ๐Ÿ“Š Data management
        self.data_queue = Queue()
        self.processed_data = []
        self.lock = threading.Lock()
        
        # ๐Ÿšฆ Control events
        self.shutdown = threading.Event()
        self.error_occurred = threading.Event()
        self.all_done = threading.Event()
        
        # ๐Ÿ“ˆ Metrics
        self.metrics = {stage: 0 for stage in self.stages}
        
    def worker(self, worker_id: int, stage: str):
        """Pipeline worker for a specific stage"""
        print(f"๐Ÿ‘ท Worker-{worker_id} ready for stage '{stage}'")
        
        try:
            if stage == "Load ๐Ÿ“‚":
                self._load_stage(worker_id)
            elif stage == "Transform ๐Ÿ”„":
                self._transform_stage(worker_id)
            elif stage == "Save ๐Ÿ’พ":
                self._save_stage(worker_id)
                
        except Exception as e:
            print(f"โŒ Worker-{worker_id} error in {stage}: {e}")
            self.error_occurred.set()
            
    def _load_stage(self, worker_id: int):
        """Load data stage"""
        # ๐Ÿ“‚ Simulate loading data
        for i in range(worker_id * 3, (worker_id + 1) * 3):
            if self.shutdown.is_set():
                break
                
            item = DataItem(id=i, data=f"Data-{i}")
            self.data_queue.put(item)
            
            with self.lock:
                self.metrics["Load ๐Ÿ“‚"] += 1
                
            print(f"๐Ÿ“ฅ Worker-{worker_id}: Loaded {item.data}")
            time.sleep(0.2)
        
        # ๐Ÿšง Wait for all loaders
        self.stage_barriers["Load ๐Ÿ“‚"].wait()
        
    def _transform_stage(self, worker_id: int):
        """Transform data stage"""
        # โณ Wait a bit for data to be loaded
        time.sleep(0.5)
        
        processed_items = []
        
        # ๐Ÿ”„ Process available data
        while not self.data_queue.empty() and not self.shutdown.is_set():
            try:
                item = self.data_queue.get_nowait()
                
                # ๐ŸŽจ Transform the data
                print(f"โš™๏ธ Worker-{worker_id}: Transforming {item.data}...")
                time.sleep(random.uniform(0.3, 0.6))
                
                item.data = f"{item.data}-TRANSFORMED-โœจ"
                item.processed = True
                processed_items.append(item)
                
                with self.lock:
                    self.metrics["Transform ๐Ÿ”„"] += 1
                    
            except:
                break
        
        # ๐Ÿšง Wait for all transformers
        self.stage_barriers["Transform ๐Ÿ”„"].wait()
        
        # ๐Ÿ“ค Pass to next stage
        with self.lock:
            self.processed_data.extend(processed_items)
            
    def _save_stage(self, worker_id: int):
        """Save data stage"""
        # โณ Wait for transform stage
        time.sleep(0.5)
        
        # ๐Ÿ’พ Save processed data
        with self.lock:
            items_to_save = [
                item for item in self.processed_data 
                if item.id % self.workers_per_stage["Save ๐Ÿ’พ"] == worker_id
            ]
        
        for item in items_to_save:
            if self.shutdown.is_set():
                break
                
            print(f"๐Ÿ’พ Worker-{worker_id}: Saving {item.data}")
            time.sleep(0.2)
            
            with self.lock:
                self.metrics["Save ๐Ÿ’พ"] += 1
        
        # ๐Ÿšง Final synchronization
        self.stage_barriers["Save ๐Ÿ’พ"].wait()
        
    def run_pipeline(self):
        """Execute the data pipeline"""
        print("๐Ÿš€ Starting data pipeline...\n")
        
        all_workers = []
        worker_id = 0
        
        # ๐ŸŽฏ Start workers for each stage
        for stage in self.stages:
            stage_workers = []
            for _ in range(self.workers_per_stage[stage]):
                worker = threading.Thread(
                    target=self.worker,
                    args=(worker_id, stage)
                )
                worker.start()
                stage_workers.append(worker)
                worker_id += 1
                
            all_workers.extend(stage_workers)
            
            # โธ๏ธ Stagger stage starts
            time.sleep(1)
        
        # โณ Monitor pipeline
        monitor = threading.Thread(target=self._monitor_pipeline)
        monitor.start()
        
        # ๐Ÿ”„ Wait for completion
        for worker in all_workers:
            worker.join()
            
        self.all_done.set()
        monitor.join()
        
        # ๐Ÿ“Š Final report
        self._print_summary()
        
    def _monitor_pipeline(self):
        """Monitor pipeline progress"""
        while not self.all_done.is_set():
            time.sleep(2)
            with self.lock:
                print("\n๐Ÿ“Š Pipeline Status:")
                for stage, count in self.metrics.items():
                    print(f"  {stage}: {count} items processed")
                print()
                
            if self.error_occurred.is_set():
                print("โš ๏ธ Error detected in pipeline!")
                self.shutdown.set()
                break
                
    def _print_summary(self):
        """Print pipeline summary"""
        print("\n" + "="*50)
        print("๐Ÿ“ˆ PIPELINE SUMMARY")
        print("="*50)
        
        with self.lock:
            for stage, count in self.metrics.items():
                print(f"{stage}: {count} items processed โœ…")
                
            print(f"\n๐ŸŽ‰ Total items processed: {len(self.processed_data)}")
            
            if self.error_occurred.is_set():
                print("โš ๏ธ Pipeline completed with errors!")
            else:
                print("โœจ Pipeline completed successfully!")

# ๐ŸŽฎ Run the pipeline!
pipeline = DataPipeline(
    workers_per_stage={
        "Load ๐Ÿ“‚": 2,
        "Transform ๐Ÿ”„": 3,
        "Save ๐Ÿ’พ": 2
    }
)

pipeline.run_pipeline()

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Create barriers for multi-thread synchronization ๐Ÿ’ช
  • โœ… Use events for signaling between threads ๐Ÿ›ก๏ธ
  • โœ… Coordinate complex workflows with multiple synchronization points ๐ŸŽฏ
  • โœ… Handle synchronization errors gracefully ๐Ÿ›
  • โœ… Build concurrent systems with proper coordination! ๐Ÿš€

Remember: Barriers and events are your friends for building reliable concurrent systems! They help prevent race conditions and ensure proper coordination. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered barriers and events in Python!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the pipeline exercise above
  2. ๐Ÿ—๏ธ Build a multiplayer game server using barriers
  3. ๐Ÿ“š Explore semaphores and conditions for more synchronization options
  4. ๐ŸŒŸ Share your concurrent creations with the community!

Remember: Every concurrent programming expert started by understanding these fundamental synchronization primitives. Keep practicing, keep learning, and most importantly, have fun building amazing concurrent systems! ๐Ÿš€


Happy concurrent coding! ๐ŸŽ‰๐Ÿš€โœจ