+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 334 of 365

๐Ÿ“˜ Deadlocks: Detection and Prevention

Master deadlocks: detection and prevention in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on deadlocks in Python! ๐ŸŽ‰ In this guide, weโ€™ll explore how to detect and prevent one of the most challenging problems in concurrent programming.

Youโ€™ll discover how understanding deadlocks can transform your ability to write robust concurrent applications. Whether youโ€™re building web services ๐ŸŒ, data pipelines ๐Ÿ–ฅ๏ธ, or distributed systems ๐Ÿ“š, understanding deadlocks is essential for writing reliable, scalable code.

By the end of this tutorial, youโ€™ll feel confident in identifying, preventing, and resolving deadlocks in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Deadlocks

๐Ÿค” What is a Deadlock?

A deadlock is like a traffic jam at a four-way intersection where everyone is waiting for everyone else to move first ๐Ÿš—. Think of it as a circular dependency where threads are stuck forever, each waiting for resources held by others.

In Python terms, a deadlock occurs when two or more threads are blocked forever, each waiting for the other to release a resource. This means your program:

  • โœจ Freezes without crashing
  • ๐Ÿš€ Stops making progress
  • ๐Ÿ›ก๏ธ Becomes unresponsive

๐Ÿ’ก Why Learn About Deadlocks?

Hereโ€™s why developers need to understand deadlocks:

  1. Prevention is Key ๐Ÿ”’: Avoid production disasters
  2. Better System Design ๐Ÿ’ป: Build robust concurrent systems
  3. Debugging Skills ๐Ÿ“–: Quickly identify and fix issues
  4. Performance ๐Ÿ”ง: Avoid system hangs and timeouts

Real-world example: Imagine a banking system ๐Ÿฆ where two customers try to transfer money to each other simultaneously. Without proper deadlock prevention, both transactions could freeze forever!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Classic Deadlock Example

Letโ€™s start with a classic example that creates a deadlock:

import threading
import time

# ๐Ÿ‘‹ Hello, Deadlock!
lock1 = threading.Lock()
lock2 = threading.Lock()

def worker1():
    # ๐ŸŽจ Thread 1 acquires lock1 first
    with lock1:
        print("Thread 1: Got lock1! ๐Ÿ”’")
        time.sleep(0.1)  # ๐Ÿ˜ด Simulate work
        
        print("Thread 1: Waiting for lock2... โณ")
        with lock2:  # ๐Ÿ’ฅ DEADLOCK: Thread 2 has lock2!
            print("Thread 1: Got both locks! โœจ")

def worker2():
    # ๐ŸŽฏ Thread 2 acquires lock2 first
    with lock2:
        print("Thread 2: Got lock2! ๐Ÿ”")
        time.sleep(0.1)  # ๐Ÿ˜ด Simulate work
        
        print("Thread 2: Waiting for lock1... โณ")
        with lock1:  # ๐Ÿ’ฅ DEADLOCK: Thread 1 has lock1!
            print("Thread 2: Got both locks! โœจ")

# ๐Ÿš€ Create and start threads
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)

t1.start()
t2.start()

# โฐ Wait with timeout to detect deadlock
t1.join(timeout=2)
t2.join(timeout=2)

if t1.is_alive() or t2.is_alive():
    print("๐Ÿ˜ฑ DEADLOCK DETECTED! Threads are stuck!")

๐Ÿ’ก Explanation: Each thread acquires one lock and waits for the other, creating a circular wait condition - the classic deadlock scenario!

๐ŸŽฏ Deadlock Conditions (Coffman Conditions)

A deadlock occurs when ALL four conditions are met:

# ๐Ÿ—๏ธ The Four Horsemen of Deadlock

# 1๏ธโƒฃ Mutual Exclusion
# Resources can only be used by one thread at a time
lock = threading.Lock()  # Only one thread can hold this

# 2๏ธโƒฃ Hold and Wait
# Thread holds one resource while waiting for another
with lock1:
    # Holding lock1, waiting for lock2
    with lock2:
        pass

# 3๏ธโƒฃ No Preemption
# Resources can't be forcibly taken away
# Python locks can't be stolen from threads

# 4๏ธโƒฃ Circular Wait
# Circular chain of threads waiting for resources
# Thread A waits for B, B waits for C, C waits for A

๐Ÿ’ก Practical Examples

๐Ÿฆ Example 1: Banking System with Deadlock Prevention

Letโ€™s build a thread-safe banking system:

import threading
import time
from typing import Dict
import random

# ๐Ÿฆ Bank account management system
class BankAccount:
    def __init__(self, account_id: str, balance: float):
        self.id = account_id
        self.balance = balance
        self.lock = threading.Lock()
        self.emoji = "๐Ÿ’ฐ"
    
    def __lt__(self, other):
        # ๐ŸŽฏ For lock ordering
        return self.id < other.id

class Bank:
    def __init__(self):
        self.accounts: Dict[str, BankAccount] = {}
        self.transfer_count = 0
        self.lock = threading.Lock()
    
    # โž• Create account
    def create_account(self, account_id: str, initial_balance: float):
        account = BankAccount(account_id, initial_balance)
        self.accounts[account_id] = account
        print(f"๐Ÿ’ณ Created account {account_id} with ${initial_balance}")
    
    # ๐Ÿ’ธ Transfer with deadlock prevention
    def transfer(self, from_id: str, to_id: str, amount: float):
        from_account = self.accounts.get(from_id)
        to_account = self.accounts.get(to_id)
        
        if not from_account or not to_account:
            print(f"โŒ Invalid account!")
            return False
        
        # ๐Ÿ›ก๏ธ DEADLOCK PREVENTION: Always lock in same order!
        first, second = sorted([from_account, to_account])
        
        with first.lock:
            with second.lock:
                if from_account.balance >= amount:
                    from_account.balance -= amount
                    to_account.balance += amount
                    self.transfer_count += 1
                    print(f"โœ… Transfer #{self.transfer_count}: "
                          f"${amount} from {from_id} to {to_id}")
                    return True
                else:
                    print(f"โŒ Insufficient funds in {from_id}")
                    return False
    
    # ๐Ÿ“Š Get balances safely
    def get_balances(self):
        balances = {}
        # ๐Ÿ”’ Lock accounts in sorted order
        sorted_accounts = sorted(self.accounts.values())
        
        for account in sorted_accounts:
            with account.lock:
                balances[account.id] = account.balance
        
        return balances

# ๐ŸŽฎ Simulate concurrent transfers
def simulate_transfers(bank: Bank, thread_id: int):
    accounts = ["Alice", "Bob", "Charlie", "David"]
    
    for _ in range(5):
        from_acc = random.choice(accounts)
        to_acc = random.choice([a for a in accounts if a != from_acc])
        amount = random.randint(10, 100)
        
        print(f"๐Ÿ”„ Thread {thread_id}: Transferring ${amount} "
              f"from {from_acc} to {to_acc}")
        bank.transfer(from_acc, to_acc, amount)
        time.sleep(0.1)

# ๐Ÿš€ Test the bank system
bank = Bank()
bank.create_account("Alice", 1000)
bank.create_account("Bob", 1000)
bank.create_account("Charlie", 1000)
bank.create_account("David", 1000)

# ๐Ÿ‘ฅ Create multiple threads
threads = []
for i in range(4):
    t = threading.Thread(target=simulate_transfers, args=(bank, i))
    threads.append(t)
    t.start()

# โณ Wait for all transfers
for t in threads:
    t.join()

# ๐Ÿ“Š Final balances
print("\n๐Ÿ’ฐ Final Balances:")
for acc_id, balance in bank.get_balances().items():
    print(f"  {acc_id}: ${balance}")

๐ŸŽฏ Try it yourself: Add a deadlock detection mechanism that monitors lock acquisition times!

๐Ÿด Example 2: Dining Philosophers Problem

The classic deadlock example with a solution:

import threading
import time
import random

# ๐Ÿด The Dining Philosophers Problem
class DiningPhilosophers:
    def __init__(self, num_philosophers: int = 5):
        self.num = num_philosophers
        self.forks = [threading.Lock() for _ in range(num_philosophers)]
        self.eating_count = [0] * num_philosophers
        self.coordinator = threading.Semaphore(num_philosophers - 1)
        
    # ๐Ÿค” Philosopher tries to eat
    def eat(self, philosopher_id: int):
        left_fork = philosopher_id
        right_fork = (philosopher_id + 1) % self.num
        
        # ๐Ÿ›ก๏ธ DEADLOCK PREVENTION: Limit concurrent diners
        with self.coordinator:
            # ๐Ÿฅข Pick up forks in order
            first_fork = min(left_fork, right_fork)
            second_fork = max(left_fork, right_fork)
            
            with self.forks[first_fork]:
                print(f"๐Ÿคš Philosopher {philosopher_id} picked up fork {first_fork}")
                
                with self.forks[second_fork]:
                    print(f"โœ‹ Philosopher {philosopher_id} picked up fork {second_fork}")
                    
                    # ๐Ÿ Eating
                    print(f"๐Ÿ˜‹ Philosopher {philosopher_id} is eating!")
                    self.eating_count[philosopher_id] += 1
                    time.sleep(random.uniform(0.1, 0.3))
                    
                    print(f"โœ… Philosopher {philosopher_id} finished eating "
                          f"(count: {self.eating_count[philosopher_id]})")
    
    # ๐Ÿ’ญ Philosopher thinks
    def think(self, philosopher_id: int):
        print(f"๐Ÿค” Philosopher {philosopher_id} is thinking...")
        time.sleep(random.uniform(0.1, 0.2))
    
    # ๐ŸŽญ Philosopher's life cycle
    def philosopher_lifecycle(self, philosopher_id: int):
        for _ in range(3):  # Each philosopher eats 3 times
            self.think(philosopher_id)
            self.eat(philosopher_id)

# ๐ŸŽฎ Run the simulation
def dining_simulation():
    table = DiningPhilosophers(5)
    philosophers = []
    
    # ๐Ÿš€ Start all philosophers
    for i in range(5):
        t = threading.Thread(
            target=table.philosopher_lifecycle,
            args=(i,)
        )
        philosophers.append(t)
        t.start()
    
    # โณ Wait for dinner to finish
    for t in philosophers:
        t.join()
    
    # ๐Ÿ“Š Statistics
    print("\n๐Ÿฝ๏ธ Dinner Statistics:")
    for i, count in enumerate(table.eating_count):
        print(f"  Philosopher {i}: ate {count} times")

# ๐ŸŽฏ Run the simulation
dining_simulation()

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Deadlock Detection with Threading

Advanced pattern for detecting deadlocks:

import threading
import time
from collections import defaultdict
from typing import Dict, Set, Optional

# ๐ŸŽฏ Advanced deadlock detector
class DeadlockDetector:
    def __init__(self):
        self.lock_graph: Dict[int, Set[int]] = defaultdict(set)
        self.thread_locks: Dict[int, Set[threading.Lock]] = defaultdict(set)
        self.lock = threading.Lock()
        self.monitoring = True
        
    # ๐Ÿ” Detect cycles in wait graph
    def has_cycle(self, thread_id: int, visited: Set[int], rec_stack: Set[int]) -> bool:
        visited.add(thread_id)
        rec_stack.add(thread_id)
        
        for neighbor in self.lock_graph.get(thread_id, []):
            if neighbor not in visited:
                if self.has_cycle(neighbor, visited, rec_stack):
                    return True
            elif neighbor in rec_stack:
                return True
        
        rec_stack.remove(thread_id)
        return False
    
    # ๐Ÿšจ Check for deadlocks
    def check_deadlock(self) -> Optional[bool]:
        with self.lock:
            threads = list(self.lock_graph.keys())
            visited = set()
            rec_stack = set()
            
            for thread in threads:
                if thread not in visited:
                    if self.has_cycle(thread, visited, rec_stack):
                        return True
            return False
    
    # ๐Ÿ“Š Monitor thread activity
    def monitor_threads(self):
        while self.monitoring:
            if self.check_deadlock():
                print("๐Ÿšจ DEADLOCK DETECTED! ๐Ÿ’ฅ")
                self.print_deadlock_info()
            time.sleep(1)
    
    # ๐Ÿ“‹ Print deadlock information
    def print_deadlock_info(self):
        print("๐Ÿ” Deadlock Analysis:")
        for thread_id, waiting_for in self.lock_graph.items():
            if waiting_for:
                print(f"  Thread {thread_id} waiting for threads: {waiting_for}")

# ๐Ÿ”ง Example with monitoring
detector = DeadlockDetector()
monitor_thread = threading.Thread(target=detector.monitor_threads)
monitor_thread.daemon = True
monitor_thread.start()

๐Ÿ—๏ธ Timeout-based Deadlock Prevention

Using timeouts to prevent deadlocks:

import threading
import time
from contextlib import contextmanager

# ๐Ÿš€ Smart lock with timeout
class SmartLock:
    def __init__(self, name: str, timeout: float = 1.0):
        self.name = name
        self.lock = threading.Lock()
        self.timeout = timeout
        self.acquisition_count = 0
        self.timeout_count = 0
    
    @contextmanager
    def acquire_with_timeout(self):
        # โฐ Try to acquire with timeout
        acquired = self.lock.acquire(timeout=self.timeout)
        
        if acquired:
            self.acquisition_count += 1
            try:
                yield True
            finally:
                self.lock.release()
        else:
            self.timeout_count += 1
            print(f"โฑ๏ธ Timeout acquiring {self.name}! "
                  f"(timeouts: {self.timeout_count})")
            yield False
    
    # ๐Ÿ“Š Get statistics
    def get_stats(self):
        return {
            "name": self.name,
            "acquisitions": self.acquisition_count,
            "timeouts": self.timeout_count,
            "success_rate": (self.acquisition_count / 
                           (self.acquisition_count + self.timeout_count) * 100
                           if self.acquisition_count + self.timeout_count > 0 
                           else 0)
        }

# ๐ŸŽฎ Test with potential deadlock scenario
def worker_with_timeout(lock1: SmartLock, lock2: SmartLock, worker_id: int):
    for attempt in range(3):
        with lock1.acquire_with_timeout() as got_lock1:
            if got_lock1:
                print(f"Worker {worker_id}: Got {lock1.name} โœ…")
                time.sleep(0.1)
                
                with lock2.acquire_with_timeout() as got_lock2:
                    if got_lock2:
                        print(f"Worker {worker_id}: Got both locks! ๐ŸŽ‰")
                        time.sleep(0.1)
                    else:
                        print(f"Worker {worker_id}: Failed to get {lock2.name}, "
                              f"retrying... ๐Ÿ”„")
            else:
                print(f"Worker {worker_id}: Failed to get {lock1.name}, "
                      f"retrying... ๐Ÿ”„")
        
        time.sleep(0.1)  # Back off before retry

# ๐Ÿš€ Run the test
lock_a = SmartLock("Lock-A")
lock_b = SmartLock("Lock-B")

threads = []
for i in range(2):
    # Alternate lock order to create potential deadlock
    if i % 2 == 0:
        t = threading.Thread(target=worker_with_timeout, 
                            args=(lock_a, lock_b, i))
    else:
        t = threading.Thread(target=worker_with_timeout, 
                            args=(lock_b, lock_a, i))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# ๐Ÿ“Š Print statistics
print("\n๐Ÿ“Š Lock Statistics:")
for lock in [lock_a, lock_b]:
    stats = lock.get_stats()
    print(f"  {stats['name']}: {stats['acquisitions']} acquisitions, "
          f"{stats['timeouts']} timeouts "
          f"({stats['success_rate']:.1f}% success rate)")

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Lock Ordering Mistakes

# โŒ Wrong way - inconsistent lock ordering!
def transfer_bad(account1, account2, amount):
    with account1.lock:  # Different threads might lock in different order
        with account2.lock:
            account1.balance -= amount
            account2.balance += amount

# โœ… Correct way - consistent lock ordering!
def transfer_good(account1, account2, amount):
    # Always lock accounts in same order (by ID)
    first, second = sorted([account1, account2], key=lambda a: a.id)
    with first.lock:
        with second.lock:
            if account1.balance >= amount:
                account1.balance -= amount
                account2.balance += amount
                return True
            return False

๐Ÿคฏ Pitfall 2: Nested Locks Without Timeout

# โŒ Dangerous - can deadlock forever!
def risky_operation():
    lock1.acquire()
    try:
        lock2.acquire()  # ๐Ÿ’ฅ Might wait forever!
        try:
            # Do work
            pass
        finally:
            lock2.release()
    finally:
        lock1.release()

# โœ… Safe - use timeouts!
def safe_operation():
    if lock1.acquire(timeout=1.0):
        try:
            if lock2.acquire(timeout=1.0):
                try:
                    # Do work
                    return True
                finally:
                    lock2.release()
            else:
                print("โš ๏ธ Couldn't acquire lock2, backing out")
                return False
        finally:
            lock1.release()
    else:
        print("โš ๏ธ Couldn't acquire lock1")
        return False

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Lock Ordering: Always acquire locks in the same order
  2. โฐ Use Timeouts: Donโ€™t wait forever for locks
  3. ๐Ÿ›ก๏ธ Minimize Lock Scope: Hold locks for minimal time
  4. ๐ŸŽจ Avoid Nested Locks: Use higher-level constructs when possible
  5. โœจ Monitor and Log: Track lock acquisitions and releases
  6. ๐Ÿ” Test Thoroughly: Use stress tests to find deadlocks
  7. ๐Ÿ“Š Use Lock-Free Structures: Consider queues and atomic operations

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Deadlock-Free Resource Manager

Create a resource management system that prevents deadlocks:

๐Ÿ“‹ Requirements:

  • โœ… Multiple resources that can be acquired by threads
  • ๐Ÿท๏ธ Priority-based resource allocation
  • ๐Ÿ‘ค Deadlock detection and recovery
  • ๐Ÿ“… Resource usage statistics
  • ๐ŸŽจ Each resource needs a status emoji!

๐Ÿš€ Bonus Points:

  • Add resource reservation system
  • Implement deadlock prediction
  • Create visualization of resource allocation

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
import threading
import time
from typing import Dict, List, Optional, Set
from collections import defaultdict
import heapq

# ๐ŸŽฏ Deadlock-free resource manager!
class Resource:
    def __init__(self, resource_id: str, priority: int = 0):
        self.id = resource_id
        self.priority = priority
        self.lock = threading.RLock()
        self.owner: Optional[int] = None
        self.wait_queue: List[int] = []
        self.usage_count = 0
        self.emoji = "๐Ÿ“ฆ"
    
    def __lt__(self, other):
        # For priority queue
        return self.priority > other.priority

class ResourceManager:
    def __init__(self):
        self.resources: Dict[str, Resource] = {}
        self.thread_resources: Dict[int, Set[str]] = defaultdict(set)
        self.wait_graph: Dict[int, Set[int]] = defaultdict(set)
        self.stats_lock = threading.Lock()
        self.deadlock_count = 0
        
    # โž• Add a resource
    def add_resource(self, resource_id: str, priority: int = 0):
        resource = Resource(resource_id, priority)
        self.resources[resource_id] = resource
        print(f"๐Ÿ“ฆ Added resource {resource_id} with priority {priority}")
    
    # ๐ŸŽฏ Acquire multiple resources (deadlock-free)
    def acquire_resources(self, resource_ids: List[str], 
                         thread_id: Optional[int] = None) -> bool:
        if thread_id is None:
            thread_id = threading.get_ident()
        
        # ๐Ÿ›ก๏ธ Sort resources by ID to prevent deadlock
        sorted_resources = sorted(resource_ids)
        acquired = []
        
        try:
            for res_id in sorted_resources:
                if res_id in self.resources:
                    resource = self.resources[res_id]
                    
                    # โฐ Try to acquire with timeout
                    if resource.lock.acquire(timeout=2.0):
                        acquired.append(resource)
                        resource.owner = thread_id
                        resource.usage_count += 1
                        self.thread_resources[thread_id].add(res_id)
                        print(f"โœ… Thread {thread_id} acquired {res_id}")
                    else:
                        # ๐Ÿšจ Timeout - potential deadlock
                        print(f"โฑ๏ธ Thread {thread_id} timeout on {res_id}")
                        raise TimeoutError(f"Could not acquire {res_id}")
                else:
                    raise ValueError(f"Resource {res_id} not found")
            
            return True
            
        except Exception as e:
            # ๐Ÿ”„ Rollback on failure
            print(f"โŒ Acquisition failed: {e}")
            for resource in acquired:
                resource.lock.release()
                resource.owner = None
                self.thread_resources[thread_id].discard(resource.id)
            return False
    
    # ๐Ÿ”“ Release resources
    def release_resources(self, resource_ids: List[str], 
                         thread_id: Optional[int] = None):
        if thread_id is None:
            thread_id = threading.get_ident()
        
        for res_id in resource_ids:
            if res_id in self.resources:
                resource = self.resources[res_id]
                if resource.owner == thread_id:
                    resource.owner = None
                    resource.lock.release()
                    self.thread_resources[thread_id].discard(res_id)
                    print(f"๐Ÿ”“ Thread {thread_id} released {res_id}")
    
    # ๐Ÿ” Detect potential deadlocks
    def detect_deadlock(self) -> bool:
        with self.stats_lock:
            # Build wait-for graph
            for thread_id, resources in self.thread_resources.items():
                for res_id in resources:
                    resource = self.resources[res_id]
                    if resource.owner and resource.owner != thread_id:
                        self.wait_graph[thread_id].add(resource.owner)
            
            # Check for cycles
            visited = set()
            rec_stack = set()
            
            def has_cycle(node):
                visited.add(node)
                rec_stack.add(node)
                
                for neighbor in self.wait_graph.get(node, []):
                    if neighbor not in visited:
                        if has_cycle(neighbor):
                            return True
                    elif neighbor in rec_stack:
                        return True
                
                rec_stack.remove(node)
                return False
            
            for thread in self.wait_graph:
                if thread not in visited:
                    if has_cycle(thread):
                        self.deadlock_count += 1
                        return True
            
            return False
    
    # ๐Ÿ“Š Get usage statistics
    def get_stats(self):
        stats = {
            "resources": {},
            "deadlock_count": self.deadlock_count,
            "active_threads": len(self.thread_resources)
        }
        
        for res_id, resource in self.resources.items():
            stats["resources"][res_id] = {
                "usage_count": resource.usage_count,
                "currently_held": resource.owner is not None,
                "owner": resource.owner
            }
        
        return stats

# ๐ŸŽฎ Test the resource manager
def worker_task(manager: ResourceManager, worker_id: int, resources: List[str]):
    thread_id = threading.get_ident()
    
    for i in range(3):
        print(f"๐Ÿƒ Worker {worker_id} attempting to acquire {resources}")
        
        if manager.acquire_resources(resources, thread_id):
            print(f"๐Ÿ’ผ Worker {worker_id} working with resources...")
            time.sleep(0.2)  # Simulate work
            
            manager.release_resources(resources, thread_id)
            print(f"โœ… Worker {worker_id} completed task {i+1}")
        else:
            print(f"๐Ÿ˜” Worker {worker_id} failed to acquire resources")
        
        time.sleep(0.1)

# ๐Ÿš€ Run the test
manager = ResourceManager()
manager.add_resource("CPU", priority=10)
manager.add_resource("Memory", priority=8)
manager.add_resource("Disk", priority=5)
manager.add_resource("Network", priority=3)

# Create workers with different resource needs
workers = [
    (0, ["CPU", "Memory"]),
    (1, ["Memory", "Disk"]),
    (2, ["Disk", "Network"]),
    (3, ["Network", "CPU"]),
]

threads = []
for worker_id, resources in workers:
    t = threading.Thread(target=worker_task, 
                        args=(manager, worker_id, resources))
    threads.append(t)
    t.start()

# ๐Ÿ” Monitor for deadlocks
monitor_active = True
def monitor_deadlocks():
    while monitor_active:
        if manager.detect_deadlock():
            print("๐Ÿšจ POTENTIAL DEADLOCK DETECTED!")
        time.sleep(0.5)

monitor = threading.Thread(target=monitor_deadlocks)
monitor.daemon = True
monitor.start()

# Wait for workers
for t in threads:
    t.join()

monitor_active = False

# ๐Ÿ“Š Print final statistics
print("\n๐Ÿ“Š Resource Manager Statistics:")
stats = manager.get_stats()
print(f"  Total deadlocks detected: {stats['deadlock_count']}")
print(f"  Active threads: {stats['active_threads']}")
print("\n  Resource Usage:")
for res_id, res_stats in stats['resources'].items():
    print(f"    {res_id}: {res_stats['usage_count']} uses")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Understand deadlocks and their four conditions ๐Ÿ’ช
  • โœ… Detect deadlocks using various techniques ๐Ÿ›ก๏ธ
  • โœ… Prevent deadlocks with proper lock ordering ๐ŸŽฏ
  • โœ… Implement timeouts to avoid infinite waits ๐Ÿ›
  • โœ… Build robust concurrent systems in Python! ๐Ÿš€

Remember: Deadlock prevention is better than deadlock recovery! Always design your concurrent systems with deadlock prevention in mind. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered deadlock detection and prevention!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the exercises above
  2. ๐Ÿ—๏ธ Build a concurrent application with proper deadlock prevention
  3. ๐Ÿ“š Move on to our next tutorial: Semaphores and Conditions
  4. ๐ŸŒŸ Share your deadlock-free code with others!

Remember: Every concurrent programming expert was once confused by deadlocks. Keep practicing, keep learning, and most importantly, keep your threads deadlock-free! ๐Ÿš€


Happy concurrent coding! ๐ŸŽ‰๐Ÿš€โœจ