Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on shared memory in Python! ๐ In this guide, weโll explore how multiprocessing.shared_memory
enables lightning-fast data sharing between processes.
Ever wondered how to share large datasets between processes without the overhead of copying? Traditional inter-process communication methods like pipes and queues involve serialization and deserialization, which can be slow for large data. Shared memory is like having a communal whiteboard where multiple processes can read and write directly! ๐
By the end of this tutorial, youโll feel confident using shared memory to supercharge your parallel Python applications! Letโs dive in! ๐โโ๏ธ
๐ Understanding Shared Memory
๐ค What is Shared Memory?
Shared memory is like a public park bench ๐ช - multiple people (processes) can sit on it simultaneously and share the same space. In Python terms, itโs a memory region that multiple processes can access directly without copying data.
Think of it as a shared Google Doc ๐ where multiple people can edit simultaneously, rather than emailing copies back and forth!
๐ก Why Use Shared Memory?
Hereโs why developers love shared memory:
- Zero-Copy Performance ๐: Access data without serialization
- Memory Efficiency ๐พ: Share large arrays without duplication
- Low Latency โก: Direct memory access is incredibly fast
- NumPy Integration ๐ข: Perfect for scientific computing
Real-world example: Imagine processing a 4K video ๐ฌ. With shared memory, multiple processes can work on different frames simultaneously without copying the entire video for each process!
๐ง Basic Syntax and Usage
๐ Simple Example
Letโs start with a friendly example:
from multiprocessing import shared_memory, Process
import numpy as np
# ๐ Hello, Shared Memory!
def create_shared_array():
# ๐จ Create a shared memory block
shm = shared_memory.SharedMemory(create=True, size=1024)
# ๐ข Create a NumPy array using shared memory
shared_array = np.ndarray((256,), dtype=np.int32, buffer=shm.buf)
# โจ Initialize with some data
shared_array[:] = np.arange(256)
print(f"Created shared memory: {shm.name} ๐")
return shm, shared_array
# ๐ Let's create it!
shm, array = create_shared_array()
print(f"First 5 elements: {array[:5]} ๐ข")
๐ก Explanation: We create a shared memory block and wrap it with a NumPy array. The memory persists even after the creating process exits!
๐ฏ Common Patterns
Here are patterns youโll use daily:
import multiprocessing as mp
from multiprocessing import shared_memory
import numpy as np
# ๐๏ธ Pattern 1: Producer-Consumer with Shared Memory
def producer(shm_name):
# ๐ Connect to existing shared memory
existing_shm = shared_memory.SharedMemory(name=shm_name)
shared_array = np.ndarray((100,), dtype=np.float64, buffer=existing_shm.buf)
# ๐จ Produce data
for i in range(100):
shared_array[i] = i * 3.14
print("Producer done! ๐ญ")
# ๐งน Close (but don't unlink)
existing_shm.close()
def consumer(shm_name):
# ๐ Connect to existing shared memory
existing_shm = shared_memory.SharedMemory(name=shm_name)
shared_array = np.ndarray((100,), dtype=np.float64, buffer=existing_shm.buf)
# ๐ Consume data
total = np.sum(shared_array)
print(f"Consumer calculated sum: {total:.2f} ๐งฎ")
# ๐งน Close (but don't unlink)
existing_shm.close()
# ๐ฎ Pattern 2: Shared State Management
class SharedCounter:
def __init__(self):
# ๐ข Create shared memory for a single integer
self.shm = shared_memory.SharedMemory(create=True, size=8)
self.counter = np.ndarray((1,), dtype=np.int64, buffer=self.shm.buf)
self.counter[0] = 0
def increment(self):
# โ ๏ธ Note: This is NOT thread-safe without locks!
self.counter[0] += 1
def get_value(self):
return self.counter[0]
def cleanup(self):
self.shm.close()
self.shm.unlink() # ๐๏ธ Remove shared memory
๐ก Practical Examples
๐ผ๏ธ Example 1: Image Processing Pipeline
Letโs build something real - a parallel image processor:
from multiprocessing import shared_memory, Process, Lock
import numpy as np
import time
class ImageProcessor:
def __init__(self, width=1920, height=1080):
# ๐ผ๏ธ Create shared memory for RGB image
self.width = width
self.height = height
self.size = width * height * 3 # RGB channels
# ๐จ Create shared memory
self.shm = shared_memory.SharedMemory(create=True, size=self.size)
self.image = np.ndarray((height, width, 3), dtype=np.uint8, buffer=self.shm.buf)
# ๐ Create a lock for synchronization
self.lock = Lock()
print(f"๐ธ Created shared image buffer: {width}x{height}")
def load_image(self):
# ๐ท Simulate loading an image
print("๐ฅ Loading image...")
with self.lock:
# Create a gradient image
for y in range(self.height):
for x in range(self.width):
self.image[y, x] = [
int(x * 255 / self.width), # Red gradient
int(y * 255 / self.height), # Green gradient
128 # Blue constant
]
print("โ
Image loaded!")
def apply_filter(self, process_id, start_row, end_row):
# ๐จ Apply filter to a portion of the image
existing_shm = shared_memory.SharedMemory(name=self.shm.name)
image = np.ndarray((self.height, self.width, 3), dtype=np.uint8, buffer=existing_shm.buf)
print(f"๐ง Process {process_id} working on rows {start_row}-{end_row}")
# Apply a simple brightness filter
for y in range(start_row, end_row):
for x in range(self.width):
with self.lock:
# Increase brightness by 20%
image[y, x] = np.clip(image[y, x] * 1.2, 0, 255).astype(np.uint8)
print(f"โจ Process {process_id} finished!")
existing_shm.close()
def parallel_process(self, num_processes=4):
# ๐ Process image in parallel
processes = []
rows_per_process = self.height // num_processes
start_time = time.time()
for i in range(num_processes):
start_row = i * rows_per_process
end_row = (i + 1) * rows_per_process if i < num_processes - 1 else self.height
p = Process(target=self.apply_filter, args=(i, start_row, end_row))
p.start()
processes.append(p)
# ๐ Wait for all processes
for p in processes:
p.join()
elapsed = time.time() - start_time
print(f"๐ Parallel processing completed in {elapsed:.2f} seconds!")
def cleanup(self):
# ๐งน Clean up shared memory
self.shm.close()
self.shm.unlink()
# ๐ฎ Let's use it!
processor = ImageProcessor(1920, 1080)
processor.load_image()
processor.parallel_process(4)
# processor.cleanup() # Uncomment when done
๐ฏ Try it yourself: Add different filters like blur, edge detection, or color adjustments!
๐ Example 2: Real-time Analytics Dashboard
Letโs make a real-time data analytics system:
from multiprocessing import shared_memory, Process, Event
import numpy as np
import time
import random
class RealtimeAnalytics:
def __init__(self, num_metrics=10, buffer_size=1000):
# ๐ Create shared memory for metrics
self.num_metrics = num_metrics
self.buffer_size = buffer_size
# ๐ฏ Shared memory for circular buffer
self.data_shm = shared_memory.SharedMemory(
create=True,
size=num_metrics * buffer_size * 8 # 8 bytes per float64
)
self.data = np.ndarray(
(num_metrics, buffer_size),
dtype=np.float64,
buffer=self.data_shm.buf
)
# ๐ Shared memory for current position
self.pos_shm = shared_memory.SharedMemory(create=True, size=8)
self.position = np.ndarray((1,), dtype=np.int64, buffer=self.pos_shm.buf)
self.position[0] = 0
# ๐ฆ Event for coordination
self.stop_event = Event()
print(f"๐ Analytics system initialized with {num_metrics} metrics!")
def data_collector(self, metric_id):
# ๐ก Collect data for a specific metric
existing_data_shm = shared_memory.SharedMemory(name=self.data_shm.name)
existing_pos_shm = shared_memory.SharedMemory(name=self.pos_shm.name)
data = np.ndarray(
(self.num_metrics, self.buffer_size),
dtype=np.float64,
buffer=existing_data_shm.buf
)
position = np.ndarray((1,), dtype=np.int64, buffer=existing_pos_shm.buf)
print(f"๐ฏ Collector {metric_id} started!")
while not self.stop_event.is_set():
# ๐ฒ Generate random data (simulate sensor)
value = 50 + random.gauss(0, 10) + metric_id * 5
# ๐พ Store in circular buffer
pos = position[0] % self.buffer_size
data[metric_id, pos] = value
time.sleep(0.1) # Simulate 10Hz sensor
print(f"๐ Collector {metric_id} stopped!")
existing_data_shm.close()
existing_pos_shm.close()
def analyzer(self):
# ๐งฎ Analyze data in real-time
existing_data_shm = shared_memory.SharedMemory(name=self.data_shm.name)
existing_pos_shm = shared_memory.SharedMemory(name=self.pos_shm.name)
data = np.ndarray(
(self.num_metrics, self.buffer_size),
dtype=np.float64,
buffer=existing_data_shm.buf
)
position = np.ndarray((1,), dtype=np.int64, buffer=existing_pos_shm.buf)
print("๐ Analyzer started!")
while not self.stop_event.is_set():
# ๐ Calculate statistics
current_pos = position[0]
if current_pos > 0:
# Get last 100 samples
samples = min(100, current_pos)
stats = []
for metric in range(self.num_metrics):
recent_data = []
for i in range(samples):
idx = (current_pos - i - 1) % self.buffer_size
recent_data.append(data[metric, idx])
if recent_data:
mean = np.mean(recent_data)
std = np.std(recent_data)
stats.append(f"Metric {metric}: ฮผ={mean:.2f}, ฯ={std:.2f}")
print(f"\n๐ Real-time Stats (Position: {current_pos}):")
for stat in stats:
print(f" {stat}")
# โฑ๏ธ Update position
position[0] = current_pos + 1
time.sleep(1) # Analyze every second
print("๐ Analyzer stopped!")
existing_data_shm.close()
existing_pos_shm.close()
def start(self):
# ๐ Start all processes
processes = []
# Start data collectors
for i in range(self.num_metrics):
p = Process(target=self.data_collector, args=(i,))
p.start()
processes.append(p)
# Start analyzer
analyzer_p = Process(target=self.analyzer)
analyzer_p.start()
processes.append(analyzer_p)
return processes
def stop(self, processes):
# ๐ Stop all processes
print("\n๐ด Stopping analytics system...")
self.stop_event.set()
for p in processes:
p.join()
print("โ
All processes stopped!")
def cleanup(self):
# ๐งน Clean up shared memory
self.data_shm.close()
self.data_shm.unlink()
self.pos_shm.close()
self.pos_shm.unlink()
# ๐ฎ Demo the system!
if __name__ == "__main__":
analytics = RealtimeAnalytics(num_metrics=5, buffer_size=1000)
processes = analytics.start()
try:
# Let it run for 10 seconds
time.sleep(10)
except KeyboardInterrupt:
print("\nโจ๏ธ Interrupted by user!")
analytics.stop(processes)
# analytics.cleanup() # Uncomment when done
๐ Advanced Concepts
๐งโโ๏ธ ShareableList: High-Level Shared Container
When youโre ready to level up, try ShareableList for sharing Python objects:
from multiprocessing import shared_memory
import multiprocessing as mp
# ๐ฏ Advanced: ShareableList for mixed data types
def advanced_shareable_list():
# ๐จ Create a ShareableList with different types
original_list = [42, 3.14, "Hello! ๐", b"bytes", None, True]
sl = shared_memory.ShareableList(original_list)
print(f"๐ฏ Created ShareableList: {sl.shm.name}")
def modifier(list_name):
# ๐ Connect to existing ShareableList
existing_sl = shared_memory.ShareableList(name=list_name)
# โ๏ธ Modify elements
existing_sl[0] = 100
existing_sl[2] = "Modified! โจ"
existing_sl[5] = False
print(f"๐ง Modified list: {list(existing_sl)}")
existing_sl.shm.close()
# ๐ Run modifier in separate process
p = mp.Process(target=modifier, args=(sl.shm.name,))
p.start()
p.join()
# ๐ Check results
print(f"๐ Final list: {list(sl)}")
# ๐งน Cleanup
sl.shm.close()
sl.shm.unlink()
# ๐ช Run the example
advanced_shareable_list()
๐๏ธ Memory-Mapped Files: Persistent Shared Memory
For the brave developers - using memory-mapped files:
import mmap
import os
from multiprocessing import Process
import struct
class PersistentSharedData:
def __init__(self, filename="shared_data.bin", size=1024*1024):
self.filename = filename
self.size = size
# ๐ Create file if it doesn't exist
if not os.path.exists(filename):
with open(filename, "wb") as f:
f.write(b"\x00" * size)
# ๐บ๏ธ Create memory-mapped file
self.file = open(filename, "r+b")
self.mmap = mmap.mmap(self.file.fileno(), size)
print(f"๐พ Created persistent shared memory: {filename}")
def write_record(self, offset, record_id, data):
# โ๏ธ Write structured data
# Format: 4 bytes ID, 4 bytes length, variable data
data_bytes = data.encode('utf-8')
record = struct.pack("II", record_id, len(data_bytes)) + data_bytes
self.mmap[offset:offset+len(record)] = record
print(f"๐ Wrote record {record_id} at offset {offset}")
def read_record(self, offset):
# ๐ Read structured data
header = self.mmap[offset:offset+8]
record_id, length = struct.unpack("II", header)
data_bytes = self.mmap[offset+8:offset+8+length]
data = data_bytes.decode('utf-8')
return record_id, data
def parallel_writer(self, process_id, num_records=10):
# ๐๏ธ Each process writes to different offsets
offset = process_id * 1000 # Each process gets 1000 bytes
for i in range(num_records):
record_id = process_id * 100 + i
data = f"Process {process_id} - Record {i} ๐ฏ"
self.write_record(offset + i * 100, record_id, data)
def cleanup(self):
# ๐งน Clean up resources
self.mmap.close()
self.file.close()
# ๐ Demo persistent shared memory
def demo_persistent_memory():
psd = PersistentSharedData()
# Write some data
psd.write_record(0, 1, "Hello from main process! ๐")
# Read it back
record_id, data = psd.read_record(0)
print(f"๐ค Read: ID={record_id}, Data='{data}'")
psd.cleanup()
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Memory Leaks
# โ Wrong way - forgetting to unlink!
def leaky_function():
shm = shared_memory.SharedMemory(create=True, size=1024)
# Do something...
shm.close() # This doesn't free the memory!
# ๐ฅ Memory leak - shared memory persists!
# โ
Correct way - always unlink when done!
def clean_function():
shm = shared_memory.SharedMemory(create=True, size=1024)
try:
# Do something...
pass
finally:
shm.close()
shm.unlink() # ๐งน This frees the memory!
๐คฏ Pitfall 2: Race Conditions
import multiprocessing as mp
from multiprocessing import shared_memory, Lock
import numpy as np
# โ Dangerous - no synchronization!
def unsafe_increment(shm_name):
shm = shared_memory.SharedMemory(name=shm_name)
counter = np.ndarray((1,), dtype=np.int64, buffer=shm.buf)
for _ in range(1000):
counter[0] += 1 # ๐ฅ Race condition!
shm.close()
# โ
Safe - use locks!
def safe_increment(shm_name, lock):
shm = shared_memory.SharedMemory(name=shm_name)
counter = np.ndarray((1,), dtype=np.int64, buffer=shm.buf)
for _ in range(1000):
with lock: # ๐ Thread-safe!
counter[0] += 1
shm.close()
# ๐ฏ Demo the difference
def demo_race_condition():
# Create shared counter
shm = shared_memory.SharedMemory(create=True, size=8)
counter = np.ndarray((1,), dtype=np.int64, buffer=shm.buf)
counter[0] = 0
lock = mp.Lock()
processes = []
# Run with lock
for _ in range(4):
p = mp.Process(target=safe_increment, args=(shm.name, lock))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"โ
With lock: {counter[0]} (expected: 4000)")
# Cleanup
shm.close()
shm.unlink()
๐ ๏ธ Best Practices
- ๐ฏ Always Clean Up: Use try/finally or context managers to ensure cleanup
- ๐ Synchronize Access: Use locks, semaphores, or other synchronization primitives
- ๐ Size Appropriately: Allocate the right amount of memory upfront
- ๐ท๏ธ Name Wisely: Use meaningful names for shared memory segments
- ๐งช Test Thoroughly: Parallel code is tricky - test edge cases!
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Parallel Matrix Multiplier
Create a high-performance matrix multiplication system using shared memory:
๐ Requirements:
- โ Split matrix multiplication across multiple processes
- ๐ข Support large matrices (1000x1000 or larger)
- โก Compare performance with serial multiplication
- ๐ Show speedup metrics
- ๐จ Visualize the computation distribution
๐ Bonus Points:
- Implement different partitioning strategies
- Add progress tracking
- Handle non-square matrices
- Support sparse matrices
๐ก Solution
๐ Click to see solution
from multiprocessing import shared_memory, Process, Queue
import numpy as np
import time
class ParallelMatrixMultiplier:
def __init__(self, size=1000):
self.size = size
# ๐ฏ Create shared memory for matrices
self.matrix_size = size * size * 8 # 8 bytes per float64
# Matrix A
self.shm_a = shared_memory.SharedMemory(create=True, size=self.matrix_size)
self.matrix_a = np.ndarray((size, size), dtype=np.float64, buffer=self.shm_a.buf)
# Matrix B
self.shm_b = shared_memory.SharedMemory(create=True, size=self.matrix_size)
self.matrix_b = np.ndarray((size, size), dtype=np.float64, buffer=self.shm_b.buf)
# Result Matrix C
self.shm_c = shared_memory.SharedMemory(create=True, size=self.matrix_size)
self.matrix_c = np.ndarray((size, size), dtype=np.float64, buffer=self.shm_c.buf)
print(f"๐ข Created {size}x{size} matrix multiplier!")
def initialize_matrices(self):
# ๐ฒ Initialize with random values
self.matrix_a[:] = np.random.rand(self.size, self.size)
self.matrix_b[:] = np.random.rand(self.size, self.size)
self.matrix_c[:] = 0
print("โ
Matrices initialized!")
def multiply_block(self, process_id, start_row, end_row, progress_queue):
# ๐ Connect to shared memory
shm_a = shared_memory.SharedMemory(name=self.shm_a.name)
shm_b = shared_memory.SharedMemory(name=self.shm_b.name)
shm_c = shared_memory.SharedMemory(name=self.shm_c.name)
# ๐ Create numpy arrays
a = np.ndarray((self.size, self.size), dtype=np.float64, buffer=shm_a.buf)
b = np.ndarray((self.size, self.size), dtype=np.float64, buffer=shm_b.buf)
c = np.ndarray((self.size, self.size), dtype=np.float64, buffer=shm_c.buf)
print(f"๐ Process {process_id} computing rows {start_row}-{end_row}")
# ๐งฎ Compute matrix multiplication for assigned rows
total_rows = end_row - start_row
for i, row in enumerate(range(start_row, end_row)):
for col in range(self.size):
c[row, col] = np.dot(a[row, :], b[:, col])
# ๐ Report progress
if i % 10 == 0:
progress = (i + 1) / total_rows * 100
progress_queue.put((process_id, progress))
progress_queue.put((process_id, 100))
print(f"โจ Process {process_id} completed!")
# ๐งน Cleanup
shm_a.close()
shm_b.close()
shm_c.close()
def parallel_multiply(self, num_processes=4):
# ๐ Perform parallel multiplication
processes = []
progress_queue = Queue()
rows_per_process = self.size // num_processes
start_time = time.time()
# ๐ Launch processes
for i in range(num_processes):
start_row = i * rows_per_process
end_row = (i + 1) * rows_per_process if i < num_processes - 1 else self.size
p = Process(
target=self.multiply_block,
args=(i, start_row, end_row, progress_queue)
)
p.start()
processes.append(p)
# ๐ Monitor progress
progress = {i: 0 for i in range(num_processes)}
while any(p < 100 for p in progress.values()):
try:
process_id, prog = progress_queue.get(timeout=0.1)
progress[process_id] = prog
# Display progress bar
total_progress = sum(progress.values()) / num_processes
bar_length = 40
filled = int(bar_length * total_progress / 100)
bar = "โ" * filled + "โ" * (bar_length - filled)
print(f"\r๐ Progress: [{bar}] {total_progress:.1f}%", end="")
except:
pass
print() # New line after progress bar
# ๐ Wait for completion
for p in processes:
p.join()
elapsed = time.time() - start_time
return elapsed
def serial_multiply(self):
# ๐ Serial multiplication for comparison
start_time = time.time()
result = np.dot(self.matrix_a, self.matrix_b)
elapsed = time.time() - start_time
return elapsed
def verify_result(self):
# โ
Verify parallel result matches serial
expected = np.dot(self.matrix_a, self.matrix_b)
difference = np.max(np.abs(self.matrix_c - expected))
print(f"๐ฏ Maximum difference: {difference:.2e}")
return difference < 1e-10
def benchmark(self, num_processes=4):
# ๐ Run benchmark
print(f"\n๐ Running benchmark with {self.size}x{self.size} matrices...\n")
# Parallel multiplication
parallel_time = self.parallel_multiply(num_processes)
print(f"โก Parallel time ({num_processes} processes): {parallel_time:.2f}s")
# Serial multiplication
print("\n๐ Running serial multiplication...")
serial_time = self.serial_multiply()
print(f"๐ Serial time: {serial_time:.2f}s")
# Calculate speedup
speedup = serial_time / parallel_time
efficiency = speedup / num_processes * 100
print(f"\n๐ Performance Metrics:")
print(f" ๐ Speedup: {speedup:.2f}x")
print(f" โก Efficiency: {efficiency:.1f}%")
# Verify correctness
if self.verify_result():
print(" โ
Result verified correct!")
else:
print(" โ Result verification failed!")
def cleanup(self):
# ๐งน Clean up shared memory
self.shm_a.close()
self.shm_a.unlink()
self.shm_b.close()
self.shm_b.unlink()
self.shm_c.close()
self.shm_c.unlink()
# ๐ฎ Test the system!
if __name__ == "__main__":
# Try different sizes and process counts
for size in [500, 1000]:
print(f"\n{'='*60}")
multiplier = ParallelMatrixMultiplier(size=size)
multiplier.initialize_matrices()
multiplier.benchmark(num_processes=4)
multiplier.cleanup()
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Create shared memory blocks for inter-process communication ๐ช
- โ Share NumPy arrays efficiently between processes ๐ก๏ธ
- โ Avoid common pitfalls like memory leaks and race conditions ๐ฏ
- โ Build high-performance parallel applications ๐
- โ Use advanced features like ShareableList and memory-mapped files! ๐
Remember: Shared memory is powerful but requires careful handling. Always synchronize access and clean up resources! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered shared memory in Python!
Hereโs what to do next:
- ๐ป Practice with the matrix multiplication exercise
- ๐๏ธ Build a real-time data processing pipeline using shared memory
- ๐ Move on to our next tutorial: Async/Await: Coroutines Basics
- ๐ Experiment with combining shared memory with other IPC methods!
Remember: With great power comes great responsibility. Use shared memory wisely, and your parallel programs will fly! ๐
Happy coding! ๐๐โจ