+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 320 of 365

๐Ÿš€ Shared Memory: multiprocessing.shared_memory

Master shared memory: multiprocessing.shared_memory in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on shared memory in Python! ๐ŸŽ‰ In this guide, weโ€™ll explore how multiprocessing.shared_memory enables lightning-fast data sharing between processes.

Ever wondered how to share large datasets between processes without the overhead of copying? Traditional inter-process communication methods like pipes and queues involve serialization and deserialization, which can be slow for large data. Shared memory is like having a communal whiteboard where multiple processes can read and write directly! ๐Ÿ“

By the end of this tutorial, youโ€™ll feel confident using shared memory to supercharge your parallel Python applications! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Shared Memory

๐Ÿค” What is Shared Memory?

Shared memory is like a public park bench ๐Ÿช‘ - multiple people (processes) can sit on it simultaneously and share the same space. In Python terms, itโ€™s a memory region that multiple processes can access directly without copying data.

Think of it as a shared Google Doc ๐Ÿ“„ where multiple people can edit simultaneously, rather than emailing copies back and forth!

๐Ÿ’ก Why Use Shared Memory?

Hereโ€™s why developers love shared memory:

  1. Zero-Copy Performance ๐Ÿš€: Access data without serialization
  2. Memory Efficiency ๐Ÿ’พ: Share large arrays without duplication
  3. Low Latency โšก: Direct memory access is incredibly fast
  4. NumPy Integration ๐Ÿ”ข: Perfect for scientific computing

Real-world example: Imagine processing a 4K video ๐ŸŽฌ. With shared memory, multiple processes can work on different frames simultaneously without copying the entire video for each process!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Example

Letโ€™s start with a friendly example:

from multiprocessing import shared_memory, Process
import numpy as np

# ๐Ÿ‘‹ Hello, Shared Memory!
def create_shared_array():
    # ๐ŸŽจ Create a shared memory block
    shm = shared_memory.SharedMemory(create=True, size=1024)
    
    # ๐Ÿ”ข Create a NumPy array using shared memory
    shared_array = np.ndarray((256,), dtype=np.int32, buffer=shm.buf)
    
    # โœจ Initialize with some data
    shared_array[:] = np.arange(256)
    
    print(f"Created shared memory: {shm.name} ๐ŸŽ‰")
    return shm, shared_array

# ๐Ÿš€ Let's create it!
shm, array = create_shared_array()
print(f"First 5 elements: {array[:5]} ๐Ÿ”ข")

๐Ÿ’ก Explanation: We create a shared memory block and wrap it with a NumPy array. The memory persists even after the creating process exits!

๐ŸŽฏ Common Patterns

Here are patterns youโ€™ll use daily:

import multiprocessing as mp
from multiprocessing import shared_memory
import numpy as np

# ๐Ÿ—๏ธ Pattern 1: Producer-Consumer with Shared Memory
def producer(shm_name):
    # ๐Ÿ”„ Connect to existing shared memory
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    shared_array = np.ndarray((100,), dtype=np.float64, buffer=existing_shm.buf)
    
    # ๐ŸŽจ Produce data
    for i in range(100):
        shared_array[i] = i * 3.14
    print("Producer done! ๐Ÿญ")
    
    # ๐Ÿงน Close (but don't unlink)
    existing_shm.close()

def consumer(shm_name):
    # ๐Ÿ”„ Connect to existing shared memory
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    shared_array = np.ndarray((100,), dtype=np.float64, buffer=existing_shm.buf)
    
    # ๐Ÿ›’ Consume data
    total = np.sum(shared_array)
    print(f"Consumer calculated sum: {total:.2f} ๐Ÿงฎ")
    
    # ๐Ÿงน Close (but don't unlink)
    existing_shm.close()

# ๐ŸŽฎ Pattern 2: Shared State Management
class SharedCounter:
    def __init__(self):
        # ๐Ÿ”ข Create shared memory for a single integer
        self.shm = shared_memory.SharedMemory(create=True, size=8)
        self.counter = np.ndarray((1,), dtype=np.int64, buffer=self.shm.buf)
        self.counter[0] = 0
        
    def increment(self):
        # โš ๏ธ Note: This is NOT thread-safe without locks!
        self.counter[0] += 1
        
    def get_value(self):
        return self.counter[0]
        
    def cleanup(self):
        self.shm.close()
        self.shm.unlink()  # ๐Ÿ—‘๏ธ Remove shared memory

๐Ÿ’ก Practical Examples

๐Ÿ–ผ๏ธ Example 1: Image Processing Pipeline

Letโ€™s build something real - a parallel image processor:

from multiprocessing import shared_memory, Process, Lock
import numpy as np
import time

class ImageProcessor:
    def __init__(self, width=1920, height=1080):
        # ๐Ÿ–ผ๏ธ Create shared memory for RGB image
        self.width = width
        self.height = height
        self.size = width * height * 3  # RGB channels
        
        # ๐ŸŽจ Create shared memory
        self.shm = shared_memory.SharedMemory(create=True, size=self.size)
        self.image = np.ndarray((height, width, 3), dtype=np.uint8, buffer=self.shm.buf)
        
        # ๐Ÿ”’ Create a lock for synchronization
        self.lock = Lock()
        
        print(f"๐Ÿ“ธ Created shared image buffer: {width}x{height}")
    
    def load_image(self):
        # ๐Ÿ“ท Simulate loading an image
        print("๐Ÿ“ฅ Loading image...")
        with self.lock:
            # Create a gradient image
            for y in range(self.height):
                for x in range(self.width):
                    self.image[y, x] = [
                        int(x * 255 / self.width),    # Red gradient
                        int(y * 255 / self.height),   # Green gradient
                        128                            # Blue constant
                    ]
        print("โœ… Image loaded!")
    
    def apply_filter(self, process_id, start_row, end_row):
        # ๐ŸŽจ Apply filter to a portion of the image
        existing_shm = shared_memory.SharedMemory(name=self.shm.name)
        image = np.ndarray((self.height, self.width, 3), dtype=np.uint8, buffer=existing_shm.buf)
        
        print(f"๐Ÿ”ง Process {process_id} working on rows {start_row}-{end_row}")
        
        # Apply a simple brightness filter
        for y in range(start_row, end_row):
            for x in range(self.width):
                with self.lock:
                    # Increase brightness by 20%
                    image[y, x] = np.clip(image[y, x] * 1.2, 0, 255).astype(np.uint8)
        
        print(f"โœจ Process {process_id} finished!")
        existing_shm.close()
    
    def parallel_process(self, num_processes=4):
        # ๐Ÿš€ Process image in parallel
        processes = []
        rows_per_process = self.height // num_processes
        
        start_time = time.time()
        
        for i in range(num_processes):
            start_row = i * rows_per_process
            end_row = (i + 1) * rows_per_process if i < num_processes - 1 else self.height
            
            p = Process(target=self.apply_filter, args=(i, start_row, end_row))
            p.start()
            processes.append(p)
        
        # ๐Ÿ Wait for all processes
        for p in processes:
            p.join()
        
        elapsed = time.time() - start_time
        print(f"๐ŸŽ‰ Parallel processing completed in {elapsed:.2f} seconds!")
    
    def cleanup(self):
        # ๐Ÿงน Clean up shared memory
        self.shm.close()
        self.shm.unlink()

# ๐ŸŽฎ Let's use it!
processor = ImageProcessor(1920, 1080)
processor.load_image()
processor.parallel_process(4)
# processor.cleanup()  # Uncomment when done

๐ŸŽฏ Try it yourself: Add different filters like blur, edge detection, or color adjustments!

๐Ÿ“Š Example 2: Real-time Analytics Dashboard

Letโ€™s make a real-time data analytics system:

from multiprocessing import shared_memory, Process, Event
import numpy as np
import time
import random

class RealtimeAnalytics:
    def __init__(self, num_metrics=10, buffer_size=1000):
        # ๐Ÿ“Š Create shared memory for metrics
        self.num_metrics = num_metrics
        self.buffer_size = buffer_size
        
        # ๐ŸŽฏ Shared memory for circular buffer
        self.data_shm = shared_memory.SharedMemory(
            create=True, 
            size=num_metrics * buffer_size * 8  # 8 bytes per float64
        )
        self.data = np.ndarray(
            (num_metrics, buffer_size), 
            dtype=np.float64, 
            buffer=self.data_shm.buf
        )
        
        # ๐Ÿ“ Shared memory for current position
        self.pos_shm = shared_memory.SharedMemory(create=True, size=8)
        self.position = np.ndarray((1,), dtype=np.int64, buffer=self.pos_shm.buf)
        self.position[0] = 0
        
        # ๐Ÿšฆ Event for coordination
        self.stop_event = Event()
        
        print(f"๐Ÿ“ˆ Analytics system initialized with {num_metrics} metrics!")
    
    def data_collector(self, metric_id):
        # ๐Ÿ“ก Collect data for a specific metric
        existing_data_shm = shared_memory.SharedMemory(name=self.data_shm.name)
        existing_pos_shm = shared_memory.SharedMemory(name=self.pos_shm.name)
        
        data = np.ndarray(
            (self.num_metrics, self.buffer_size), 
            dtype=np.float64, 
            buffer=existing_data_shm.buf
        )
        position = np.ndarray((1,), dtype=np.int64, buffer=existing_pos_shm.buf)
        
        print(f"๐ŸŽฏ Collector {metric_id} started!")
        
        while not self.stop_event.is_set():
            # ๐ŸŽฒ Generate random data (simulate sensor)
            value = 50 + random.gauss(0, 10) + metric_id * 5
            
            # ๐Ÿ’พ Store in circular buffer
            pos = position[0] % self.buffer_size
            data[metric_id, pos] = value
            
            time.sleep(0.1)  # Simulate 10Hz sensor
        
        print(f"๐Ÿ›‘ Collector {metric_id} stopped!")
        existing_data_shm.close()
        existing_pos_shm.close()
    
    def analyzer(self):
        # ๐Ÿงฎ Analyze data in real-time
        existing_data_shm = shared_memory.SharedMemory(name=self.data_shm.name)
        existing_pos_shm = shared_memory.SharedMemory(name=self.pos_shm.name)
        
        data = np.ndarray(
            (self.num_metrics, self.buffer_size), 
            dtype=np.float64, 
            buffer=existing_data_shm.buf
        )
        position = np.ndarray((1,), dtype=np.int64, buffer=existing_pos_shm.buf)
        
        print("๐Ÿ” Analyzer started!")
        
        while not self.stop_event.is_set():
            # ๐Ÿ“Š Calculate statistics
            current_pos = position[0]
            if current_pos > 0:
                # Get last 100 samples
                samples = min(100, current_pos)
                
                stats = []
                for metric in range(self.num_metrics):
                    recent_data = []
                    for i in range(samples):
                        idx = (current_pos - i - 1) % self.buffer_size
                        recent_data.append(data[metric, idx])
                    
                    if recent_data:
                        mean = np.mean(recent_data)
                        std = np.std(recent_data)
                        stats.append(f"Metric {metric}: ฮผ={mean:.2f}, ฯƒ={std:.2f}")
                
                print(f"\n๐Ÿ“ˆ Real-time Stats (Position: {current_pos}):")
                for stat in stats:
                    print(f"   {stat}")
                
                # โฑ๏ธ Update position
                position[0] = current_pos + 1
            
            time.sleep(1)  # Analyze every second
        
        print("๐Ÿ›‘ Analyzer stopped!")
        existing_data_shm.close()
        existing_pos_shm.close()
    
    def start(self):
        # ๐Ÿš€ Start all processes
        processes = []
        
        # Start data collectors
        for i in range(self.num_metrics):
            p = Process(target=self.data_collector, args=(i,))
            p.start()
            processes.append(p)
        
        # Start analyzer
        analyzer_p = Process(target=self.analyzer)
        analyzer_p.start()
        processes.append(analyzer_p)
        
        return processes
    
    def stop(self, processes):
        # ๐Ÿ›‘ Stop all processes
        print("\n๐Ÿ”ด Stopping analytics system...")
        self.stop_event.set()
        
        for p in processes:
            p.join()
        
        print("โœ… All processes stopped!")
    
    def cleanup(self):
        # ๐Ÿงน Clean up shared memory
        self.data_shm.close()
        self.data_shm.unlink()
        self.pos_shm.close()
        self.pos_shm.unlink()

# ๐ŸŽฎ Demo the system!
if __name__ == "__main__":
    analytics = RealtimeAnalytics(num_metrics=5, buffer_size=1000)
    
    processes = analytics.start()
    
    try:
        # Let it run for 10 seconds
        time.sleep(10)
    except KeyboardInterrupt:
        print("\nโŒจ๏ธ Interrupted by user!")
    
    analytics.stop(processes)
    # analytics.cleanup()  # Uncomment when done

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ ShareableList: High-Level Shared Container

When youโ€™re ready to level up, try ShareableList for sharing Python objects:

from multiprocessing import shared_memory
import multiprocessing as mp

# ๐ŸŽฏ Advanced: ShareableList for mixed data types
def advanced_shareable_list():
    # ๐ŸŽจ Create a ShareableList with different types
    original_list = [42, 3.14, "Hello! ๐Ÿ‘‹", b"bytes", None, True]
    sl = shared_memory.ShareableList(original_list)
    
    print(f"๐ŸŽฏ Created ShareableList: {sl.shm.name}")
    
    def modifier(list_name):
        # ๐Ÿ”„ Connect to existing ShareableList
        existing_sl = shared_memory.ShareableList(name=list_name)
        
        # โœ๏ธ Modify elements
        existing_sl[0] = 100
        existing_sl[2] = "Modified! โœจ"
        existing_sl[5] = False
        
        print(f"๐Ÿ”ง Modified list: {list(existing_sl)}")
        existing_sl.shm.close()
    
    # ๐Ÿš€ Run modifier in separate process
    p = mp.Process(target=modifier, args=(sl.shm.name,))
    p.start()
    p.join()
    
    # ๐Ÿ‘€ Check results
    print(f"๐Ÿ“‹ Final list: {list(sl)}")
    
    # ๐Ÿงน Cleanup
    sl.shm.close()
    sl.shm.unlink()

# ๐Ÿช„ Run the example
advanced_shareable_list()

๐Ÿ—๏ธ Memory-Mapped Files: Persistent Shared Memory

For the brave developers - using memory-mapped files:

import mmap
import os
from multiprocessing import Process
import struct

class PersistentSharedData:
    def __init__(self, filename="shared_data.bin", size=1024*1024):
        self.filename = filename
        self.size = size
        
        # ๐Ÿ“ Create file if it doesn't exist
        if not os.path.exists(filename):
            with open(filename, "wb") as f:
                f.write(b"\x00" * size)
        
        # ๐Ÿ—บ๏ธ Create memory-mapped file
        self.file = open(filename, "r+b")
        self.mmap = mmap.mmap(self.file.fileno(), size)
        
        print(f"๐Ÿ’พ Created persistent shared memory: {filename}")
    
    def write_record(self, offset, record_id, data):
        # โœ๏ธ Write structured data
        # Format: 4 bytes ID, 4 bytes length, variable data
        data_bytes = data.encode('utf-8')
        record = struct.pack("II", record_id, len(data_bytes)) + data_bytes
        
        self.mmap[offset:offset+len(record)] = record
        print(f"๐Ÿ“ Wrote record {record_id} at offset {offset}")
    
    def read_record(self, offset):
        # ๐Ÿ“– Read structured data
        header = self.mmap[offset:offset+8]
        record_id, length = struct.unpack("II", header)
        
        data_bytes = self.mmap[offset+8:offset+8+length]
        data = data_bytes.decode('utf-8')
        
        return record_id, data
    
    def parallel_writer(self, process_id, num_records=10):
        # ๐Ÿ–Š๏ธ Each process writes to different offsets
        offset = process_id * 1000  # Each process gets 1000 bytes
        
        for i in range(num_records):
            record_id = process_id * 100 + i
            data = f"Process {process_id} - Record {i} ๐ŸŽฏ"
            self.write_record(offset + i * 100, record_id, data)
    
    def cleanup(self):
        # ๐Ÿงน Clean up resources
        self.mmap.close()
        self.file.close()

# ๐Ÿš€ Demo persistent shared memory
def demo_persistent_memory():
    psd = PersistentSharedData()
    
    # Write some data
    psd.write_record(0, 1, "Hello from main process! ๐Ÿ‘‹")
    
    # Read it back
    record_id, data = psd.read_record(0)
    print(f"๐Ÿ“ค Read: ID={record_id}, Data='{data}'")
    
    psd.cleanup()

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Memory Leaks

# โŒ Wrong way - forgetting to unlink!
def leaky_function():
    shm = shared_memory.SharedMemory(create=True, size=1024)
    # Do something...
    shm.close()  # This doesn't free the memory!
    # ๐Ÿ’ฅ Memory leak - shared memory persists!

# โœ… Correct way - always unlink when done!
def clean_function():
    shm = shared_memory.SharedMemory(create=True, size=1024)
    try:
        # Do something...
        pass
    finally:
        shm.close()
        shm.unlink()  # ๐Ÿงน This frees the memory!

๐Ÿคฏ Pitfall 2: Race Conditions

import multiprocessing as mp
from multiprocessing import shared_memory, Lock
import numpy as np

# โŒ Dangerous - no synchronization!
def unsafe_increment(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    counter = np.ndarray((1,), dtype=np.int64, buffer=shm.buf)
    
    for _ in range(1000):
        counter[0] += 1  # ๐Ÿ’ฅ Race condition!
    
    shm.close()

# โœ… Safe - use locks!
def safe_increment(shm_name, lock):
    shm = shared_memory.SharedMemory(name=shm_name)
    counter = np.ndarray((1,), dtype=np.int64, buffer=shm.buf)
    
    for _ in range(1000):
        with lock:  # ๐Ÿ”’ Thread-safe!
            counter[0] += 1
    
    shm.close()

# ๐ŸŽฏ Demo the difference
def demo_race_condition():
    # Create shared counter
    shm = shared_memory.SharedMemory(create=True, size=8)
    counter = np.ndarray((1,), dtype=np.int64, buffer=shm.buf)
    counter[0] = 0
    
    lock = mp.Lock()
    processes = []
    
    # Run with lock
    for _ in range(4):
        p = mp.Process(target=safe_increment, args=(shm.name, lock))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"โœ… With lock: {counter[0]} (expected: 4000)")
    
    # Cleanup
    shm.close()
    shm.unlink()

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Always Clean Up: Use try/finally or context managers to ensure cleanup
  2. ๐Ÿ”’ Synchronize Access: Use locks, semaphores, or other synchronization primitives
  3. ๐Ÿ“ Size Appropriately: Allocate the right amount of memory upfront
  4. ๐Ÿท๏ธ Name Wisely: Use meaningful names for shared memory segments
  5. ๐Ÿงช Test Thoroughly: Parallel code is tricky - test edge cases!

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Parallel Matrix Multiplier

Create a high-performance matrix multiplication system using shared memory:

๐Ÿ“‹ Requirements:

  • โœ… Split matrix multiplication across multiple processes
  • ๐Ÿ”ข Support large matrices (1000x1000 or larger)
  • โšก Compare performance with serial multiplication
  • ๐Ÿ“Š Show speedup metrics
  • ๐ŸŽจ Visualize the computation distribution

๐Ÿš€ Bonus Points:

  • Implement different partitioning strategies
  • Add progress tracking
  • Handle non-square matrices
  • Support sparse matrices

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
from multiprocessing import shared_memory, Process, Queue
import numpy as np
import time

class ParallelMatrixMultiplier:
    def __init__(self, size=1000):
        self.size = size
        
        # ๐ŸŽฏ Create shared memory for matrices
        self.matrix_size = size * size * 8  # 8 bytes per float64
        
        # Matrix A
        self.shm_a = shared_memory.SharedMemory(create=True, size=self.matrix_size)
        self.matrix_a = np.ndarray((size, size), dtype=np.float64, buffer=self.shm_a.buf)
        
        # Matrix B
        self.shm_b = shared_memory.SharedMemory(create=True, size=self.matrix_size)
        self.matrix_b = np.ndarray((size, size), dtype=np.float64, buffer=self.shm_b.buf)
        
        # Result Matrix C
        self.shm_c = shared_memory.SharedMemory(create=True, size=self.matrix_size)
        self.matrix_c = np.ndarray((size, size), dtype=np.float64, buffer=self.shm_c.buf)
        
        print(f"๐Ÿ”ข Created {size}x{size} matrix multiplier!")
    
    def initialize_matrices(self):
        # ๐ŸŽฒ Initialize with random values
        self.matrix_a[:] = np.random.rand(self.size, self.size)
        self.matrix_b[:] = np.random.rand(self.size, self.size)
        self.matrix_c[:] = 0
        print("โœ… Matrices initialized!")
    
    def multiply_block(self, process_id, start_row, end_row, progress_queue):
        # ๐Ÿ”„ Connect to shared memory
        shm_a = shared_memory.SharedMemory(name=self.shm_a.name)
        shm_b = shared_memory.SharedMemory(name=self.shm_b.name)
        shm_c = shared_memory.SharedMemory(name=self.shm_c.name)
        
        # ๐Ÿ“Š Create numpy arrays
        a = np.ndarray((self.size, self.size), dtype=np.float64, buffer=shm_a.buf)
        b = np.ndarray((self.size, self.size), dtype=np.float64, buffer=shm_b.buf)
        c = np.ndarray((self.size, self.size), dtype=np.float64, buffer=shm_c.buf)
        
        print(f"๐Ÿš€ Process {process_id} computing rows {start_row}-{end_row}")
        
        # ๐Ÿงฎ Compute matrix multiplication for assigned rows
        total_rows = end_row - start_row
        for i, row in enumerate(range(start_row, end_row)):
            for col in range(self.size):
                c[row, col] = np.dot(a[row, :], b[:, col])
            
            # ๐Ÿ“ˆ Report progress
            if i % 10 == 0:
                progress = (i + 1) / total_rows * 100
                progress_queue.put((process_id, progress))
        
        progress_queue.put((process_id, 100))
        print(f"โœจ Process {process_id} completed!")
        
        # ๐Ÿงน Cleanup
        shm_a.close()
        shm_b.close()
        shm_c.close()
    
    def parallel_multiply(self, num_processes=4):
        # ๐Ÿ Perform parallel multiplication
        processes = []
        progress_queue = Queue()
        rows_per_process = self.size // num_processes
        
        start_time = time.time()
        
        # ๐Ÿš€ Launch processes
        for i in range(num_processes):
            start_row = i * rows_per_process
            end_row = (i + 1) * rows_per_process if i < num_processes - 1 else self.size
            
            p = Process(
                target=self.multiply_block, 
                args=(i, start_row, end_row, progress_queue)
            )
            p.start()
            processes.append(p)
        
        # ๐Ÿ“Š Monitor progress
        progress = {i: 0 for i in range(num_processes)}
        while any(p < 100 for p in progress.values()):
            try:
                process_id, prog = progress_queue.get(timeout=0.1)
                progress[process_id] = prog
                
                # Display progress bar
                total_progress = sum(progress.values()) / num_processes
                bar_length = 40
                filled = int(bar_length * total_progress / 100)
                bar = "โ–ˆ" * filled + "โ–‘" * (bar_length - filled)
                print(f"\r๐Ÿ”„ Progress: [{bar}] {total_progress:.1f}%", end="")
            except:
                pass
        
        print()  # New line after progress bar
        
        # ๐Ÿ Wait for completion
        for p in processes:
            p.join()
        
        elapsed = time.time() - start_time
        return elapsed
    
    def serial_multiply(self):
        # ๐ŸŒ Serial multiplication for comparison
        start_time = time.time()
        result = np.dot(self.matrix_a, self.matrix_b)
        elapsed = time.time() - start_time
        return elapsed
    
    def verify_result(self):
        # โœ… Verify parallel result matches serial
        expected = np.dot(self.matrix_a, self.matrix_b)
        difference = np.max(np.abs(self.matrix_c - expected))
        print(f"๐ŸŽฏ Maximum difference: {difference:.2e}")
        return difference < 1e-10
    
    def benchmark(self, num_processes=4):
        # ๐Ÿ“Š Run benchmark
        print(f"\n๐Ÿƒ Running benchmark with {self.size}x{self.size} matrices...\n")
        
        # Parallel multiplication
        parallel_time = self.parallel_multiply(num_processes)
        print(f"โšก Parallel time ({num_processes} processes): {parallel_time:.2f}s")
        
        # Serial multiplication
        print("\n๐ŸŒ Running serial multiplication...")
        serial_time = self.serial_multiply()
        print(f"๐Ÿ• Serial time: {serial_time:.2f}s")
        
        # Calculate speedup
        speedup = serial_time / parallel_time
        efficiency = speedup / num_processes * 100
        
        print(f"\n๐Ÿ“ˆ Performance Metrics:")
        print(f"   ๐Ÿš€ Speedup: {speedup:.2f}x")
        print(f"   โšก Efficiency: {efficiency:.1f}%")
        
        # Verify correctness
        if self.verify_result():
            print("   โœ… Result verified correct!")
        else:
            print("   โŒ Result verification failed!")
    
    def cleanup(self):
        # ๐Ÿงน Clean up shared memory
        self.shm_a.close()
        self.shm_a.unlink()
        self.shm_b.close()
        self.shm_b.unlink()
        self.shm_c.close()
        self.shm_c.unlink()

# ๐ŸŽฎ Test the system!
if __name__ == "__main__":
    # Try different sizes and process counts
    for size in [500, 1000]:
        print(f"\n{'='*60}")
        multiplier = ParallelMatrixMultiplier(size=size)
        multiplier.initialize_matrices()
        multiplier.benchmark(num_processes=4)
        multiplier.cleanup()

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Create shared memory blocks for inter-process communication ๐Ÿ’ช
  • โœ… Share NumPy arrays efficiently between processes ๐Ÿ›ก๏ธ
  • โœ… Avoid common pitfalls like memory leaks and race conditions ๐ŸŽฏ
  • โœ… Build high-performance parallel applications ๐Ÿ›
  • โœ… Use advanced features like ShareableList and memory-mapped files! ๐Ÿš€

Remember: Shared memory is powerful but requires careful handling. Always synchronize access and clean up resources! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered shared memory in Python!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the matrix multiplication exercise
  2. ๐Ÿ—๏ธ Build a real-time data processing pipeline using shared memory
  3. ๐Ÿ“š Move on to our next tutorial: Async/Await: Coroutines Basics
  4. ๐ŸŒŸ Experiment with combining shared memory with other IPC methods!

Remember: With great power comes great responsibility. Use shared memory wisely, and your parallel programs will fly! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ