+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 310 of 365

๐Ÿ“˜ Memory Profiling: Finding Leaks

Master memory profiling: finding leaks in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿš€Intermediate
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on memory profiling and finding memory leaks in Python! ๐ŸŽ‰ In this guide, weโ€™ll explore how to detect, analyze, and fix memory issues that can slow down or crash your applications.

Youโ€™ll discover how proper memory profiling can transform your Python development experience. Whether youโ€™re building web applications ๐ŸŒ, data processing pipelines ๐Ÿ“Š, or scientific computing tools ๐Ÿ”ฌ, understanding memory management is essential for writing robust, performant code.

By the end of this tutorial, youโ€™ll feel confident identifying and fixing memory leaks in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Memory Profiling

๐Ÿค” What is Memory Profiling?

Memory profiling is like being a detective for your programโ€™s memory usage ๐Ÿ•ต๏ธ. Think of it as monitoring your apartmentโ€™s space usage - you need to know whatโ€™s taking up room and whether anything is hoarding space unnecessarily!

In Python terms, memory profiling helps you track how your program allocates and releases memory. This means you can:

  • โœจ Identify memory-hungry operations
  • ๐Ÿš€ Detect memory leaks before they crash your app
  • ๐Ÿ›ก๏ธ Optimize memory usage for better performance

๐Ÿ’ก Why Use Memory Profiling?

Hereโ€™s why developers love memory profiling:

  1. Prevent Crashes ๐Ÿ’ฅ: Catch memory leaks before production
  2. Improve Performance ๐Ÿƒ: Reduce memory usage for faster execution
  3. Scale Better ๐Ÿ“ˆ: Handle more users/data with same resources
  4. Debug Issues ๐Ÿ›: Find the root cause of memory problems

Real-world example: Imagine building an image processing app ๐Ÿ“ธ. Without memory profiling, you might accidentally keep all processed images in memory, eventually crashing your server!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Memory Leak Example

Letโ€™s start with a common memory leak pattern:

# ๐Ÿ‘‹ Hello, Memory Profiling!
import tracemalloc

# ๐Ÿšจ Example of a memory leak
class ImageProcessor:
    def __init__(self):
        self.cache = []  # ๐Ÿ“ฆ This will grow forever!
    
    def process_image(self, image_data):
        # ๐Ÿ–ผ๏ธ Process the image
        processed = image_data * 2  # Simulate processing
        
        # โŒ Bad: Never clearing the cache!
        self.cache.append(processed)
        
        return processed

# ๐Ÿ” Let's track memory usage
tracemalloc.start()

processor = ImageProcessor()
for i in range(1000):
    # ๐Ÿ“ธ Each iteration adds to memory!
    processor.process_image(f"image_{i}" * 100)

# ๐Ÿ“Š Check memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"๐Ÿงฎ Current memory: {current / 1024 / 1024:.2f} MB")
print(f"๐Ÿ“ˆ Peak memory: {peak / 1024 / 1024:.2f} MB")
tracemalloc.stop()

๐Ÿ’ก Explanation: Notice how the cache list keeps growing! This is a classic memory leak - data thatโ€™s no longer needed but never released.

๐ŸŽฏ Using Memory Profilers

Here are the main tools for memory profiling:

# ๐Ÿ—๏ธ Method 1: tracemalloc (built-in)
import tracemalloc

tracemalloc.start()

# ๐ŸŽฎ Your code here
data = [i ** 2 for i in range(1000000)]

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

# ๐Ÿ“Š Display top memory users
print("๐Ÿ” Top 3 memory allocations:")
for stat in top_stats[:3]:
    print(f"  ๐Ÿ“ {stat}")

# ๐ŸŽจ Method 2: memory_profiler (third-party)
# Install: pip install memory-profiler
from memory_profiler import profile

@profile
def memory_hungry_function():
    # ๐Ÿ• Create a large list
    big_list = [i for i in range(1000000)]
    
    # ๐Ÿ”„ Create another one
    another_list = big_list.copy()
    
    return sum(another_list)

# ๐Ÿ”„ Method 3: objgraph for object tracking
# Install: pip install objgraph
import objgraph

# ๐Ÿ“Š Show most common types
objgraph.show_most_common_types()

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: Shopping Cart Memory Leak

Letโ€™s build a shopping cart with a memory issue:

# ๐Ÿ›๏ธ Shopping cart with memory leak
import gc
import sys
from memory_profiler import profile

class Product:
    def __init__(self, name, price, image_data):
        self.name = name
        self.price = price
        self.image_data = image_data  # ๐Ÿ–ผ๏ธ Large image data
        self.emoji = "๐Ÿ›๏ธ"
    
    def __repr__(self):
        return f"{self.emoji} {self.name}: ${self.price}"

class ShoppingCart:
    def __init__(self):
        self.items = []
        self.history = []  # โš ๏ธ Potential memory leak!
        self.session_data = {}
    
    @profile
    def add_item(self, product):
        # โž• Add to cart
        self.items.append(product)
        
        # โŒ Bad: Keeping full history forever
        self.history.append({
            'action': 'add',
            'product': product,
            'timestamp': time.time(),
            'full_cart_snapshot': self.items.copy()  # ๐Ÿ˜ฑ Duplicating data!
        })
        
        print(f"โœ… Added {product.name} to cart!")
    
    def clear_cart(self):
        # ๐Ÿ—‘๏ธ Clear the cart
        self.items = []
        # โŒ But history keeps growing!
        print("๐Ÿ›’ Cart cleared!")
    
    def get_memory_usage(self):
        # ๐Ÿ“Š Check memory usage
        size = sys.getsizeof(self.items) + sys.getsizeof(self.history)
        return size / 1024 / 1024  # MB

# ๐ŸŽฎ Let's test it!
@profile
def shopping_simulation():
    cart = ShoppingCart()
    
    # ๐Ÿ›๏ธ Simulate shopping spree
    for i in range(100):
        # Create product with "large" image
        image_data = b"x" * 1024 * 100  # 100KB per image
        product = Product(f"Item_{i}", 9.99, image_data)
        cart.add_item(product)
        
        # Every 10 items, clear cart
        if i % 10 == 0:
            print(f"๐Ÿ’พ Memory usage: {cart.get_memory_usage():.2f} MB")
            cart.clear_cart()
    
    return cart

# ๐Ÿƒ Run simulation
cart = shopping_simulation()
print(f"๐Ÿ“ˆ Final history size: {len(cart.history)} items")

๐ŸŽฏ Try it yourself: Fix the memory leak by limiting history size or using weak references!

๐ŸŽฎ Example 2: Game Memory Manager

Letโ€™s create a game that properly manages memory:

# ๐Ÿ† Memory-efficient game manager
import weakref
import tracemalloc
from collections import deque

class GameObject:
    def __init__(self, name, sprite_data):
        self.name = name
        self.sprite_data = sprite_data  # ๐ŸŽจ Graphics data
        self.position = [0, 0]
        self.active = True
    
    def __repr__(self):
        return f"๐ŸŽฎ {self.name} at {self.position}"

class MemoryEfficientGame:
    def __init__(self, max_objects=1000, history_size=100):
        # ๐Ÿง  Smart memory management
        self.active_objects = []
        self.object_pool = []  # โ™ป๏ธ Reuse objects
        self.history = deque(maxlen=history_size)  # ๐Ÿ“ Limited history
        self.weak_refs = weakref.WeakValueDictionary()  # ๐Ÿ”— Weak references
        self.max_objects = max_objects
    
    def spawn_object(self, name, sprite_data):
        # โ™ป๏ธ Reuse from pool if available
        if self.object_pool:
            obj = self.object_pool.pop()
            obj.name = name
            obj.sprite_data = sprite_data
            obj.active = True
            print(f"โ™ป๏ธ Reused object for {name}")
        else:
            # ๐Ÿ†• Create new object
            obj = GameObject(name, sprite_data)
            print(f"โœจ Created new {name}")
        
        # ๐Ÿ“Š Memory limit check
        if len(self.active_objects) >= self.max_objects:
            self.cleanup_oldest()
        
        self.active_objects.append(obj)
        self.weak_refs[name] = obj
        
        # ๐Ÿ“ Track action (limited history)
        self.history.append({
            'action': 'spawn',
            'object': name,
            'count': len(self.active_objects)
        })
        
        return obj
    
    def destroy_object(self, obj):
        # ๐Ÿ—‘๏ธ Move to pool for reuse
        if obj in self.active_objects:
            self.active_objects.remove(obj)
            obj.active = False
            obj.sprite_data = None  # ๐Ÿงน Clear heavy data
            self.object_pool.append(obj)
            print(f"๐Ÿ—‘๏ธ Destroyed {obj.name}")
    
    def cleanup_oldest(self):
        # ๐Ÿงน Remove oldest objects
        if self.active_objects:
            oldest = self.active_objects[0]
            self.destroy_object(oldest)
            print(f"๐Ÿงน Auto-cleaned {oldest.name} (memory limit)")
    
    def get_memory_stats(self):
        # ๐Ÿ“Š Memory statistics
        tracemalloc.start()
        snapshot = tracemalloc.take_snapshot()
        
        stats = {
            'active_objects': len(self.active_objects),
            'pooled_objects': len(self.object_pool),
            'history_entries': len(self.history),
            'weak_refs': len(self.weak_refs)
        }
        
        current, peak = tracemalloc.get_traced_memory()
        stats['current_memory_mb'] = current / 1024 / 1024
        stats['peak_memory_mb'] = peak / 1024 / 1024
        
        tracemalloc.stop()
        return stats

# ๐ŸŽฎ Test the game
game = MemoryEfficientGame(max_objects=50, history_size=20)

# ๐Ÿš€ Spawn many objects
for i in range(100):
    sprite_data = f"sprite_{i}" * 1000  # Simulate sprite data
    game.spawn_object(f"Enemy_{i}", sprite_data)
    
    if i % 20 == 0:
        stats = game.get_memory_stats()
        print(f"\n๐Ÿ“Š Memory Stats at iteration {i}:")
        for key, value in stats.items():
            print(f"  {key}: {value}")

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Memory Leak Detection

When youโ€™re ready to level up, try these advanced techniques:

# ๐ŸŽฏ Advanced memory leak detector
import gc
import objgraph
import tracemalloc
from typing import Dict, List, Any

class MemoryLeakDetector:
    def __init__(self):
        self.snapshots: List[Any] = []
        self.growth_tracker: Dict[str, List[int]] = {}
        tracemalloc.start()
    
    def take_snapshot(self, label: str):
        # ๐Ÿ“ธ Take memory snapshot
        gc.collect()  # Force garbage collection
        
        snapshot = {
            'label': label,
            'tracemalloc': tracemalloc.take_snapshot(),
            'object_counts': self._get_object_counts(),
            'memory_usage': tracemalloc.get_traced_memory()[0]
        }
        
        self.snapshots.append(snapshot)
        print(f"๐Ÿ“ธ Snapshot '{label}' taken")
        
        return snapshot
    
    def _get_object_counts(self) -> Dict[str, int]:
        # ๐Ÿ” Count objects by type
        counts = {}
        for obj in gc.get_objects():
            obj_type = type(obj).__name__
            counts[obj_type] = counts.get(obj_type, 0) + 1
        return counts
    
    def compare_snapshots(self, label1: str, label2: str):
        # ๐Ÿ”„ Compare two snapshots
        snap1 = next((s for s in self.snapshots if s['label'] == label1), None)
        snap2 = next((s for s in self.snapshots if s['label'] == label2), None)
        
        if not snap1 or not snap2:
            print("โŒ Snapshots not found!")
            return
        
        # ๐Ÿ“Š Memory difference
        mem_diff = snap2['memory_usage'] - snap1['memory_usage']
        print(f"\n๐Ÿ“Š Memory change: {mem_diff / 1024 / 1024:.2f} MB")
        
        # ๐Ÿ” Object count differences
        print("\n๐Ÿ” Object count changes:")
        all_types = set(snap1['object_counts'].keys()) | set(snap2['object_counts'].keys())
        
        for obj_type in sorted(all_types):
            count1 = snap1['object_counts'].get(obj_type, 0)
            count2 = snap2['object_counts'].get(obj_type, 0)
            diff = count2 - count1
            
            if diff != 0:
                emoji = "๐Ÿ“ˆ" if diff > 0 else "๐Ÿ“‰"
                print(f"  {emoji} {obj_type}: {count1} โ†’ {count2} ({diff:+d})")
        
        # ๐ŸŽฏ Tracemalloc statistics
        print("\n๐ŸŽฏ Top memory allocations:")
        top_stats = snap2['tracemalloc'].compare_to(snap1['tracemalloc'], 'lineno')
        
        for stat in top_stats[:5]:
            print(f"  ๐Ÿ“ {stat}")
    
    def find_growing_types(self, threshold: int = 100):
        # ๐Ÿšจ Find types that keep growing
        if len(self.snapshots) < 2:
            print("โš ๏ธ Need at least 2 snapshots!")
            return
        
        print(f"\n๐Ÿšจ Types growing by more than {threshold} objects:")
        
        for i in range(1, len(self.snapshots)):
            prev = self.snapshots[i-1]['object_counts']
            curr = self.snapshots[i]['object_counts']
            
            for obj_type, count in curr.items():
                prev_count = prev.get(obj_type, 0)
                growth = count - prev_count
                
                if growth > threshold:
                    print(f"  โš ๏ธ {obj_type}: +{growth} objects")
                    
                    # Track growth history
                    if obj_type not in self.growth_tracker:
                        self.growth_tracker[obj_type] = []
                    self.growth_tracker[obj_type].append(count)

# ๐Ÿช„ Using the leak detector
detector = MemoryLeakDetector()

# ๐Ÿ“ธ Initial snapshot
detector.take_snapshot("start")

# ๐Ÿ—๏ธ Create potential memory leak
leaky_list = []
for i in range(1000):
    leaky_list.append([j for j in range(1000)])

detector.take_snapshot("after_allocation")

# ๐Ÿงน Try to clean up
del leaky_list
gc.collect()

detector.take_snapshot("after_cleanup")

# ๐Ÿ“Š Analyze results
detector.compare_snapshots("start", "after_allocation")
detector.compare_snapshots("after_allocation", "after_cleanup")
detector.find_growing_types(threshold=50)

๐Ÿ—๏ธ Memory-Efficient Data Structures

For memory-conscious applications:

# ๐Ÿš€ Memory-efficient alternatives
import array
import sys
from collections import namedtuple
from dataclasses import dataclass

# ๐Ÿ“Š Compare memory usage
def compare_memory_usage():
    # ๐ŸŽจ Regular list vs array
    regular_list = [i for i in range(10000)]
    int_array = array.array('i', range(10000))
    
    print("๐Ÿ“Š Memory Comparison:")
    print(f"  List: {sys.getsizeof(regular_list)} bytes")
    print(f"  Array: {sys.getsizeof(int_array)} bytes")
    print(f"  Savings: {sys.getsizeof(regular_list) - sys.getsizeof(int_array)} bytes โœจ")
    
    # ๐Ÿ—๏ธ Class vs NamedTuple vs Slots
    class RegularPoint:
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
    class SlottedPoint:
        __slots__ = ['x', 'y']  # ๐Ÿ’พ Memory optimization
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
    PointTuple = namedtuple('PointTuple', ['x', 'y'])
    
    # ๐Ÿงช Create instances
    regular = RegularPoint(1, 2)
    slotted = SlottedPoint(1, 2)
    tuple_point = PointTuple(1, 2)
    
    print(f"\n๐Ÿ—๏ธ Object Memory Usage:")
    print(f"  Regular class: {sys.getsizeof(regular.__dict__)} bytes")
    print(f"  Slotted class: ~56 bytes (no __dict__)")
    print(f"  NamedTuple: {sys.getsizeof(tuple_point)} bytes")

compare_memory_usage()

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Circular References

# โŒ Wrong way - circular reference prevents garbage collection
class Node:
    def __init__(self, value):
        self.value = value
        self.parent = None
        self.children = []
    
    def add_child(self, child):
        child.parent = self  # ๐Ÿ”„ Circular reference!
        self.children.append(child)

# ๐Ÿ˜ฐ This creates a memory leak
root = Node("root")
child = Node("child")
root.add_child(child)
# Even after deleting, memory isn't freed!
del root, child

# โœ… Correct way - use weak references
import weakref

class SmartNode:
    def __init__(self, value):
        self.value = value
        self._parent = None  # ๐Ÿ”— Will be a weak reference
        self.children = []
    
    @property
    def parent(self):
        return self._parent() if self._parent else None
    
    @parent.setter
    def parent(self, node):
        self._parent = weakref.ref(node) if node else None
    
    def add_child(self, child):
        child.parent = self  # โœ… Now uses weak reference
        self.children.append(child)

# ๐ŸŽ‰ No more memory leak!
root = SmartNode("root")
child = SmartNode("child")
root.add_child(child)

๐Ÿคฏ Pitfall 2: Global Cache Growth

# โŒ Dangerous - unbounded cache growth
cache = {}  # ๐Ÿ“ฆ Global cache

def expensive_operation(key):
    if key not in cache:
        # ๐Ÿ’ฅ Cache grows forever!
        cache[key] = perform_calculation(key)
    return cache[key]

# โœ… Safe - bounded cache with LRU
from functools import lru_cache

@lru_cache(maxsize=1000)  # ๐Ÿ“ Limited to 1000 entries
def safe_expensive_operation(key):
    return perform_calculation(key)

# โœ… Even better - manual cache with size limit
from collections import OrderedDict

class BoundedCache:
    def __init__(self, max_size=1000):
        self.cache = OrderedDict()
        self.max_size = max_size
    
    def get(self, key, compute_fn):
        if key in self.cache:
            # ๐Ÿ”„ Move to end (LRU)
            self.cache.move_to_end(key)
            return self.cache[key]
        
        # ๐Ÿงฎ Compute new value
        value = compute_fn(key)
        self.cache[key] = value
        
        # ๐Ÿงน Evict oldest if needed
        if len(self.cache) > self.max_size:
            self.cache.popitem(last=False)
            print("๐Ÿงน Evicted oldest cache entry")
        
        return value

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Profile Regularly: Run memory profiling during development
  2. ๐Ÿ“ Set Memory Limits: Use bounded collections and caches
  3. ๐Ÿ”— Use Weak References: For parent-child relationships
  4. โ™ป๏ธ Reuse Objects: Implement object pooling for frequently created objects
  5. ๐Ÿงน Clean Up Explicitly: Donโ€™t rely only on garbage collection

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Memory-Efficient Data Pipeline

Create a data processing pipeline that handles large datasets without memory issues:

๐Ÿ“‹ Requirements:

  • โœ… Process CSV files larger than available RAM
  • ๐Ÿท๏ธ Track memory usage throughout processing
  • ๐Ÿ‘ค Implement streaming/chunking for large files
  • ๐Ÿ“… Add progress reporting without memory overhead
  • ๐ŸŽจ Visualize memory usage over time

๐Ÿš€ Bonus Points:

  • Add automatic memory cleanup when threshold reached
  • Implement parallel processing with memory limits
  • Create memory usage alerts

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Memory-efficient data pipeline
import csv
import gc
import psutil
import tracemalloc
from collections import deque
from datetime import datetime
import matplotlib.pyplot as plt

class MemoryEfficientPipeline:
    def __init__(self, memory_limit_mb=500, chunk_size=1000):
        self.memory_limit_mb = memory_limit_mb
        self.chunk_size = chunk_size
        self.memory_history = deque(maxlen=100)  # ๐Ÿ“Š Track memory usage
        self.processed_count = 0
        tracemalloc.start()
    
    def process_csv(self, filename, process_fn):
        # ๐Ÿ“ Process CSV in chunks
        print(f"๐Ÿš€ Starting processing of {filename}")
        
        with open(filename, 'r') as file:
            reader = csv.DictReader(file)
            chunk = []
            
            for row in reader:
                chunk.append(row)
                
                # ๐Ÿ“ฆ Process chunk when full
                if len(chunk) >= self.chunk_size:
                    self._process_chunk(chunk, process_fn)
                    chunk = []  # ๐Ÿงน Clear chunk
                    
                    # ๐Ÿ” Check memory usage
                    if self._check_memory_limit():
                        self._emergency_cleanup()
            
            # ๐Ÿ“ฆ Process remaining rows
            if chunk:
                self._process_chunk(chunk, process_fn)
        
        print(f"โœ… Processed {self.processed_count} rows!")
        self._plot_memory_usage()
    
    def _process_chunk(self, chunk, process_fn):
        # ๐Ÿ”„ Process a chunk of data
        try:
            results = []
            for row in chunk:
                result = process_fn(row)
                if result:
                    results.append(result)
            
            # ๐Ÿ’พ Here you would save results
            self.processed_count += len(chunk)
            
            # ๐Ÿ“Š Track memory
            self._record_memory_usage()
            
            # ๐Ÿ“ข Progress report
            if self.processed_count % 10000 == 0:
                current_mb = self._get_current_memory_mb()
                print(f"๐Ÿ“Š Processed: {self.processed_count} rows | Memory: {current_mb:.1f} MB")
            
            return results
            
        finally:
            # ๐Ÿงน Always clean up
            del chunk
            gc.collect()
    
    def _get_current_memory_mb(self):
        # ๐Ÿ“ Get current memory usage
        current, _ = tracemalloc.get_traced_memory()
        return current / 1024 / 1024
    
    def _check_memory_limit(self):
        # ๐Ÿšจ Check if memory limit exceeded
        current_mb = self._get_current_memory_mb()
        return current_mb > self.memory_limit_mb
    
    def _emergency_cleanup(self):
        # ๐Ÿšจ Emergency memory cleanup
        print("๐Ÿšจ Memory limit reached! Cleaning up...")
        gc.collect()
        
        # Clear any caches
        import functools
        functools._lru_cache_wrapper.cache_clear()
        
        print("โœ… Cleanup complete!")
    
    def _record_memory_usage(self):
        # ๐Ÿ“Š Record memory usage for visualization
        current_mb = self._get_current_memory_mb()
        timestamp = datetime.now()
        self.memory_history.append({
            'time': timestamp,
            'memory_mb': current_mb,
            'processed': self.processed_count
        })
    
    def _plot_memory_usage(self):
        # ๐Ÿ“ˆ Visualize memory usage
        if not self.memory_history:
            return
        
        times = [h['time'] for h in self.memory_history]
        memory = [h['memory_mb'] for h in self.memory_history]
        
        plt.figure(figsize=(10, 6))
        plt.plot(times, memory, '๐Ÿ“ˆ-', label='Memory Usage')
        plt.axhline(y=self.memory_limit_mb, color='r', linestyle='--', label='Memory Limit')
        plt.xlabel('Time')
        plt.ylabel('Memory (MB)')
        plt.title('๐Ÿงฎ Pipeline Memory Usage Over Time')
        plt.legend()
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig('memory_usage.png')
        print("๐Ÿ“Š Memory usage plot saved to memory_usage.png")

# ๐Ÿงช Example processing function
def analyze_data(row):
    # ๐Ÿ” Simulate data analysis
    try:
        # Process the row
        value = float(row.get('value', 0))
        
        # ๐Ÿ“Š Return only what's needed
        if value > 100:
            return {
                'id': row.get('id'),
                'high_value': value,
                'category': row.get('category')
            }
    except ValueError:
        pass  # Skip invalid data
    
    return None

# ๐ŸŽฎ Test the pipeline
pipeline = MemoryEfficientPipeline(memory_limit_mb=100, chunk_size=500)

# Create test data
print("๐Ÿ“ Creating test data...")
with open('test_data.csv', 'w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=['id', 'value', 'category'])
    writer.writeheader()
    
    for i in range(50000):
        writer.writerow({
            'id': i,
            'value': i * 1.5,
            'category': f'cat_{i % 10}'
        })

# Process the data
pipeline.process_csv('test_data.csv', analyze_data)

# ๐Ÿ“Š Final memory stats
final_memory = pipeline._get_current_memory_mb()
print(f"\n๐Ÿ“Š Final Statistics:")
print(f"  Total processed: {pipeline.processed_count} rows")
print(f"  Final memory usage: {final_memory:.2f} MB")
print(f"  Peak memory saved in history: {max(h['memory_mb'] for h in pipeline.memory_history):.2f} MB")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Profile memory usage with confidence ๐Ÿ’ช
  • โœ… Identify memory leaks before they crash your app ๐Ÿ›ก๏ธ
  • โœ… Implement memory-efficient data structures ๐ŸŽฏ
  • โœ… Debug memory issues like a pro ๐Ÿ›
  • โœ… Build scalable applications with Python! ๐Ÿš€

Remember: Memory management is crucial for production applications. Always profile and test! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered memory profiling and leak detection!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the exercises above
  2. ๐Ÿ—๏ธ Profile your existing projects for memory issues
  3. ๐Ÿ“š Move on to our next tutorial: Performance Profiling
  4. ๐ŸŒŸ Share your memory optimization wins with others!

Remember: Every Python expert knows how to manage memory efficiently. Keep profiling, keep optimizing, and most importantly, have fun! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ