+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 338 of 365

๐Ÿ“˜ Parallel Algorithms: MapReduce Pattern

Master parallel algorithms: mapreduce pattern in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on the MapReduce pattern in Python! ๐ŸŽ‰ In this guide, weโ€™ll explore how to process massive datasets efficiently using parallel algorithms.

Youโ€™ll discover how MapReduce can transform your data processing capabilities. Whether youโ€™re analyzing logs ๐Ÿ“Š, processing text data ๐Ÿ“, or crunching numbers ๐Ÿ”ข, understanding MapReduce is essential for handling big data challenges!

By the end of this tutorial, youโ€™ll feel confident implementing MapReduce patterns in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding MapReduce

๐Ÿค” What is MapReduce?

MapReduce is like a factory assembly line ๐Ÿญ. Think of it as breaking down a huge task into smaller pieces (Map), then combining the results (Reduce) to get your final answer!

In Python terms, MapReduce is a programming model that processes large datasets in parallel. This means you can:

  • โœจ Process gigabytes of data efficiently
  • ๐Ÿš€ Utilize multiple CPU cores simultaneously
  • ๐Ÿ›ก๏ธ Handle failures gracefully with built-in resilience

๐Ÿ’ก Why Use MapReduce?

Hereโ€™s why developers love MapReduce:

  1. Scalability ๐Ÿ”’: Process terabytes of data across multiple machines
  2. Simplicity ๐Ÿ’ป: Focus on your logic, not parallel programming complexities
  3. Fault Tolerance ๐Ÿ“–: Automatic handling of failures
  4. Performance ๐Ÿ”ง: Dramatic speedups for data-intensive tasks

Real-world example: Imagine counting words in millions of documents ๐Ÿ“š. With MapReduce, you can process them all in parallel, making a week-long task finish in hours!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Example

Letโ€™s start with a friendly example:

# ๐Ÿ‘‹ Hello, MapReduce!
from multiprocessing import Pool
from collections import defaultdict
import time

# ๐ŸŽจ Map function: process each chunk
def map_word_count(text_chunk):
    """Count words in a text chunk"""
    word_counts = defaultdict(int)
    for word in text_chunk.split():
        word = word.lower().strip('.,!?";')  # ๐Ÿงน Clean the word
        if word:
            word_counts[word] += 1
    return dict(word_counts)

# ๐Ÿ”„ Reduce function: combine results
def reduce_word_counts(count_list):
    """Combine word counts from all chunks"""
    total_counts = defaultdict(int)
    for counts in count_list:
        for word, count in counts.items():
            total_counts[word] += count
    return dict(total_counts)

# ๐Ÿš€ Let's use it!
if __name__ == "__main__":
    # ๐Ÿ“ Sample data
    documents = [
        "Python is amazing! ๐Ÿ",
        "MapReduce makes Python even more amazing!",
        "Parallel processing is the future ๐Ÿš€",
        "Python powers data science ๐Ÿ“Š"
    ]
    
    # ๐ŸŽฏ Map phase
    with Pool(processes=4) as pool:
        mapped_results = pool.map(map_word_count, documents)
    
    # ๐ŸŽจ Reduce phase
    final_counts = reduce_word_counts(mapped_results)
    
    print("๐Ÿ“Š Word counts:", final_counts)

๐Ÿ’ก Explanation: Notice how we split the work (Map) across multiple processes, then combine the results (Reduce)!

๐ŸŽฏ Common Patterns

Here are patterns youโ€™ll use daily:

# ๐Ÿ—๏ธ Pattern 1: Generic MapReduce class
class MapReduce:
    def __init__(self, num_workers=None):
        self.num_workers = num_workers or Pool()._processes
    
    def __call__(self, map_func, reduce_func, data):
        # ๐Ÿ—บ๏ธ Map phase
        with Pool(self.num_workers) as pool:
            mapped = pool.map(map_func, data)
        
        # ๐Ÿ”„ Reduce phase
        return reduce_func(mapped)

# ๐ŸŽจ Pattern 2: Chunking large datasets
def chunk_data(data, chunk_size):
    """Split data into processable chunks"""
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

# ๐Ÿ”„ Pattern 3: Key-based reduction
def group_by_key(mapped_data):
    """Group mapped data by key for reduction"""
    grouped = defaultdict(list)
    for result in mapped_data:
        for key, value in result.items():
            grouped[key].append(value)
    return dict(grouped)

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: Sales Analysis System

Letโ€™s build something real:

# ๐Ÿ›๏ธ Analyze sales data across stores
import json
from datetime import datetime
from multiprocessing import Pool
from collections import defaultdict

# ๐Ÿ“Š Sales record structure
class SalesRecord:
    def __init__(self, store_id, product, amount, date):
        self.store_id = store_id
        self.product = product
        self.amount = amount
        self.date = date

# ๐Ÿ—บ๏ธ Map: Calculate sales per store
def map_sales_by_store(records_chunk):
    """Calculate total sales per store in chunk"""
    store_sales = defaultdict(float)
    product_sales = defaultdict(float)
    
    for record in records_chunk:
        store_sales[record.store_id] += record.amount
        product_sales[record.product] += record.amount
    
    return {
        'store_sales': dict(store_sales),
        'product_sales': dict(product_sales)
    }

# ๐Ÿ”„ Reduce: Combine all results
def reduce_sales_analysis(mapped_results):
    """Combine sales analysis from all chunks"""
    total_store_sales = defaultdict(float)
    total_product_sales = defaultdict(float)
    
    for result in mapped_results:
        # ๐Ÿช Combine store sales
        for store, amount in result['store_sales'].items():
            total_store_sales[store] += amount
        
        # ๐Ÿ“ฆ Combine product sales
        for product, amount in result['product_sales'].items():
            total_product_sales[product] += amount
    
    return {
        'top_stores': sorted(total_store_sales.items(), 
                           key=lambda x: x[1], reverse=True)[:5],
        'top_products': sorted(total_product_sales.items(), 
                             key=lambda x: x[1], reverse=True)[:5],
        'total_revenue': sum(total_store_sales.values())
    }

# ๐ŸŽฎ Let's analyze!
if __name__ == "__main__":
    # ๐Ÿ“ Generate sample data
    import random
    
    stores = ['Store A ๐Ÿช', 'Store B ๐Ÿฌ', 'Store C ๐Ÿข', 'Store D ๐Ÿญ']
    products = ['Widget ๐Ÿ”ง', 'Gadget ๐Ÿ“ฑ', 'Doohickey ๐ŸŽฎ', 'Thingamajig ๐ŸŽจ']
    
    # Generate 10000 sales records
    all_records = []
    for _ in range(10000):
        record = SalesRecord(
            random.choice(stores),
            random.choice(products),
            random.uniform(10, 1000),
            datetime.now()
        )
        all_records.append(record)
    
    # ๐Ÿš€ Process in parallel
    chunk_size = 1000
    chunks = [all_records[i:i+chunk_size] 
              for i in range(0, len(all_records), chunk_size)]
    
    print(f"๐ŸŽฏ Processing {len(all_records)} records in {len(chunks)} chunks...")
    
    start_time = time.time()
    
    # MapReduce magic! โœจ
    with Pool() as pool:
        mapped = pool.map(map_sales_by_store, chunks)
    
    results = reduce_sales_analysis(mapped)
    
    elapsed = time.time() - start_time
    
    print(f"\n๐Ÿ“Š Analysis complete in {elapsed:.2f} seconds!")
    print(f"๐Ÿ’ฐ Total Revenue: ${results['total_revenue']:,.2f}")
    print("\n๐Ÿ† Top 5 Stores:")
    for store, revenue in results['top_stores']:
        print(f"  {store}: ${revenue:,.2f}")
    print("\n๐ŸŒŸ Top 5 Products:")
    for product, revenue in results['top_products']:
        print(f"  {product}: ${revenue:,.2f}")

๐ŸŽฏ Try it yourself: Add a feature to find the best-selling product per store!

๐ŸŽฎ Example 2: Log Analysis System

Letโ€™s make it fun:

# ๐Ÿ† Analyze server logs for patterns
import re
from multiprocessing import Pool
from collections import defaultdict, Counter
from datetime import datetime

# ๐Ÿ“ Log entry parser
def parse_log_entry(line):
    """Parse a server log entry"""
    # Example: 2024-01-15 10:23:45 ERROR /api/users 500 "Database timeout"
    pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) ([\w/]+) (\d+) "(.*)"'
    match = re.match(pattern, line)
    
    if match:
        return {
            'timestamp': match.group(1),
            'level': match.group(2),
            'endpoint': match.group(3),
            'status_code': int(match.group(4)),
            'message': match.group(5)
        }
    return None

# ๐Ÿ—บ๏ธ Map: Analyze log chunk
def map_log_analysis(log_chunk):
    """Analyze patterns in log chunk"""
    stats = {
        'error_count': 0,
        'warning_count': 0,
        'endpoints': Counter(),
        'status_codes': Counter(),
        'error_messages': []
    }
    
    for line in log_chunk:
        entry = parse_log_entry(line.strip())
        if not entry:
            continue
        
        # ๐Ÿ“Š Count by level
        if entry['level'] == 'ERROR':
            stats['error_count'] += 1
            stats['error_messages'].append(entry['message'])
        elif entry['level'] == 'WARNING':
            stats['warning_count'] += 1
        
        # ๐ŸŽฏ Track endpoints and status codes
        stats['endpoints'][entry['endpoint']] += 1
        stats['status_codes'][entry['status_code']] += 1
    
    return stats

# ๐Ÿ”„ Reduce: Combine analysis results
def reduce_log_analysis(mapped_results):
    """Combine log analysis from all chunks"""
    total_stats = {
        'total_errors': 0,
        'total_warnings': 0,
        'endpoint_hits': Counter(),
        'status_distribution': Counter(),
        'common_errors': Counter()
    }
    
    for stats in mapped_results:
        total_stats['total_errors'] += stats['error_count']
        total_stats['total_warnings'] += stats['warning_count']
        total_stats['endpoint_hits'].update(stats['endpoints'])
        total_stats['status_distribution'].update(stats['status_codes'])
        total_stats['common_errors'].update(stats['error_messages'])
    
    # ๐Ÿ† Get top items
    total_stats['top_endpoints'] = total_stats['endpoint_hits'].most_common(5)
    total_stats['top_errors'] = total_stats['common_errors'].most_common(5)
    
    return total_stats

# ๐Ÿš€ Advanced: Real-time log processing
class LogProcessor:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers
        self.results_cache = {}
    
    def process_logs(self, log_file_path, chunk_size=1000):
        """Process logs with MapReduce"""
        print(f"๐Ÿ“‚ Processing logs from {log_file_path}...")
        
        # Read and chunk logs
        chunks = []
        current_chunk = []
        
        with open(log_file_path, 'r') as f:
            for i, line in enumerate(f):
                current_chunk.append(line)
                if len(current_chunk) >= chunk_size:
                    chunks.append(current_chunk)
                    current_chunk = []
        
        if current_chunk:  # Don't forget the last chunk!
            chunks.append(current_chunk)
        
        print(f"๐ŸŽฏ Created {len(chunks)} chunks for processing")
        
        # MapReduce! ๐Ÿš€
        start_time = time.time()
        
        with Pool(self.num_workers) as pool:
            mapped = pool.map(map_log_analysis, chunks)
        
        results = reduce_log_analysis(mapped)
        elapsed = time.time() - start_time
        
        print(f"โœจ Analysis complete in {elapsed:.2f} seconds!")
        
        return results
    
    def generate_report(self, results):
        """Generate a beautiful report"""
        print("\n" + "="*50)
        print("๐Ÿ“Š LOG ANALYSIS REPORT")
        print("="*50)
        
        print(f"\n๐Ÿšจ Alert Summary:")
        print(f"  โŒ Errors: {results['total_errors']}")
        print(f"  โš ๏ธ  Warnings: {results['total_warnings']}")
        
        print(f"\n๐ŸŽฏ Top 5 Endpoints:")
        for endpoint, count in results['top_endpoints']:
            print(f"  {endpoint}: {count} hits")
        
        print(f"\n๐Ÿ“ˆ Status Code Distribution:")
        for code, count in sorted(results['status_distribution'].items()):
            emoji = "โœ…" if code < 400 else "โš ๏ธ" if code < 500 else "โŒ"
            print(f"  {emoji} {code}: {count} responses")
        
        if results['top_errors']:
            print(f"\n๐Ÿ’ฅ Most Common Errors:")
            for error, count in results['top_errors']:
                print(f"  '{error}': {count} occurrences")

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Custom Partitioning

When youโ€™re ready to level up, try this advanced pattern:

# ๐ŸŽฏ Advanced: Custom partitioning for better load balancing
import hashlib

class AdvancedMapReduce:
    def __init__(self, num_partitions=None):
        self.num_partitions = num_partitions or Pool()._processes
    
    def partition_by_key(self, data, key_func):
        """Partition data by key for better distribution"""
        partitions = [[] for _ in range(self.num_partitions)]
        
        for item in data:
            key = key_func(item)
            # ๐ŸŽฒ Use hash to determine partition
            partition_idx = int(hashlib.md5(
                str(key).encode()
            ).hexdigest(), 16) % self.num_partitions
            partitions[partition_idx].append(item)
        
        return partitions
    
    def map_reduce_with_key(self, map_func, reduce_func, data, key_func):
        """MapReduce with custom partitioning"""
        # ๐Ÿ—‚๏ธ Partition data
        partitions = self.partition_by_key(data, key_func)
        
        # ๐Ÿ—บ๏ธ Map phase
        with Pool(self.num_partitions) as pool:
            mapped = pool.map(map_func, partitions)
        
        # ๐Ÿ”„ Shuffle and sort
        shuffled = defaultdict(list)
        for partition_result in mapped:
            for key, values in partition_result.items():
                shuffled[key].extend(values)
        
        # ๐Ÿ“Š Reduce phase
        final_results = {}
        for key, values in shuffled.items():
            final_results[key] = reduce_func(key, values)
        
        return final_results

# ๐Ÿช„ Example: Word frequency with custom partitioning
def advanced_word_mapper(text_partition):
    """Map words with frequency"""
    word_freq = defaultdict(list)
    for text in text_partition:
        words = text.lower().split()
        for word in words:
            word = word.strip('.,!?";')
            if word:
                word_freq[word].append(1)
    return dict(word_freq)

def advanced_word_reducer(word, counts):
    """Reduce word counts"""
    return sum(counts)

๐Ÿ—๏ธ Advanced Topic 2: Streaming MapReduce

For the brave developers:

# ๐Ÿš€ Streaming MapReduce for infinite data
import queue
import threading
from concurrent.futures import ThreadPoolExecutor

class StreamingMapReduce:
    def __init__(self, map_func, reduce_func, num_workers=4):
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.num_workers = num_workers
        self.input_queue = queue.Queue()
        self.output_queue = queue.Queue()
        self.results = defaultdict(list)
        self.running = False
    
    def mapper_worker(self):
        """Worker thread for mapping"""
        while self.running:
            try:
                data = self.input_queue.get(timeout=1)
                if data is None:  # Poison pill
                    break
                
                # ๐Ÿ—บ๏ธ Apply map function
                result = self.map_func(data)
                self.output_queue.put(result)
                
            except queue.Empty:
                continue
    
    def reducer_worker(self):
        """Worker thread for reducing"""
        while self.running:
            try:
                mapped_data = self.output_queue.get(timeout=1)
                if mapped_data is None:  # Poison pill
                    break
                
                # ๐Ÿ”„ Accumulate results
                for key, value in mapped_data.items():
                    self.results[key].append(value)
                
            except queue.Empty:
                continue
    
    def start(self):
        """Start streaming processing"""
        self.running = True
        
        # ๐Ÿš€ Start mapper threads
        self.mapper_threads = []
        for _ in range(self.num_workers):
            t = threading.Thread(target=self.mapper_worker)
            t.start()
            self.mapper_threads.append(t)
        
        # ๐ŸŽฏ Start reducer thread
        self.reducer_thread = threading.Thread(target=self.reducer_worker)
        self.reducer_thread.start()
        
        print("๐ŸŒŠ Streaming MapReduce started!")
    
    def process(self, data):
        """Add data to processing pipeline"""
        self.input_queue.put(data)
    
    def get_results(self):
        """Get current results"""
        final_results = {}
        for key, values in self.results.items():
            final_results[key] = self.reduce_func(key, values)
        return final_results
    
    def stop(self):
        """Stop streaming processing"""
        self.running = False
        
        # Send poison pills
        for _ in range(self.num_workers):
            self.input_queue.put(None)
        self.output_queue.put(None)
        
        # Wait for threads
        for t in self.mapper_threads:
            t.join()
        self.reducer_thread.join()
        
        print("๐Ÿ›‘ Streaming MapReduce stopped!")

# ๐ŸŽฎ Example usage
def stream_mapper(tweet):
    """Extract hashtags from tweet"""
    hashtags = {}
    for word in tweet.split():
        if word.startswith('#'):
            hashtags[word.lower()] = 1
    return hashtags

def stream_reducer(hashtag, counts):
    """Count hashtag occurrences"""
    return sum(counts)

# ๐ŸŒŠ Process tweets in real-time!
streamer = StreamingMapReduce(stream_mapper, stream_reducer)
streamer.start()

# Simulate incoming tweets
tweets = [
    "Love #Python programming! ๐Ÿ #coding",
    "MapReduce is amazing! #bigdata #Python",
    "Building cool stuff with #Python #MapReduce ๐Ÿš€"
]

for tweet in tweets:
    streamer.process(tweet)
    time.sleep(0.1)  # Simulate real-time

# Get results
time.sleep(1)  # Let processing finish
results = streamer.get_results()
print("๐Ÿ“Š Trending hashtags:", results)

streamer.stop()

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Memory Overflow

# โŒ Wrong way - loading everything into memory!
def bad_mapper(huge_file_path):
    with open(huge_file_path, 'r') as f:
        all_data = f.read()  # ๐Ÿ’ฅ Memory explosion!
    return process(all_data)

# โœ… Correct way - process in chunks!
def good_mapper(huge_file_path):
    results = {}
    with open(huge_file_path, 'r') as f:
        for line in f:  # ๐ŸŽฏ Process line by line
            partial_result = process_line(line)
            merge_results(results, partial_result)
    return results

๐Ÿคฏ Pitfall 2: Unbalanced Work Distribution

# โŒ Dangerous - uneven chunks!
def bad_partition(data, num_partitions):
    # Some workers get all the work! ๐Ÿ˜ฐ
    chunk_size = len(data) // num_partitions
    return [data[:chunk_size * num_partitions]]

# โœ… Safe - balanced distribution!
def good_partition(data, num_partitions):
    # Everyone gets fair share! ๐ŸŽฏ
    chunk_size = len(data) // num_partitions
    partitions = []
    for i in range(num_partitions):
        start = i * chunk_size
        end = start + chunk_size if i < num_partitions - 1 else len(data)
        partitions.append(data[start:end])
    return partitions

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Choose Right Chunk Size: Not too small (overhead), not too large (memory)
  2. ๐Ÿ“ Handle Failures Gracefully: Use try-except in mappers and reducers
  3. ๐Ÿ›ก๏ธ Validate Input Data: Check data before processing
  4. ๐ŸŽจ Keep Functions Pure: No side effects in map/reduce functions
  5. โœจ Monitor Performance: Track processing time and memory usage

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Web Crawler Analysis System

Create a MapReduce system to analyze web pages:

๐Ÿ“‹ Requirements:

  • โœ… Crawl multiple URLs in parallel
  • ๐Ÿท๏ธ Extract and count all links per domain
  • ๐Ÿ‘ค Find most common words across all pages
  • ๐Ÿ“… Calculate average page load time
  • ๐ŸŽจ Extract all images and their sizes

๐Ÿš€ Bonus Points:

  • Add caching to avoid re-crawling
  • Implement rate limiting
  • Create a visualization of link relationships

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Web crawler with MapReduce!
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from multiprocessing import Pool
from collections import defaultdict, Counter
import time

# ๐Ÿ—บ๏ธ Map: Analyze a single URL
def map_web_page(url):
    """Analyze a single web page"""
    stats = {
        'url': url,
        'links': [],
        'words': Counter(),
        'images': [],
        'load_time': 0,
        'error': None
    }
    
    try:
        # ๐Ÿ“ฅ Fetch the page
        start_time = time.time()
        response = requests.get(url, timeout=10)
        stats['load_time'] = time.time() - start_time
        
        # ๐Ÿฒ Parse HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # ๐Ÿ”— Extract links
        for link in soup.find_all('a', href=True):
            absolute_url = urljoin(url, link['href'])
            domain = urlparse(absolute_url).netloc
            if domain:
                stats['links'].append({
                    'url': absolute_url,
                    'domain': domain,
                    'text': link.get_text(strip=True)
                })
        
        # ๐Ÿ“ Extract words
        text = soup.get_text()
        words = text.lower().split()
        for word in words:
            word = word.strip('.,!?";:')
            if len(word) > 3:  # Skip short words
                stats['words'][word] += 1
        
        # ๐Ÿ–ผ๏ธ Extract images
        for img in soup.find_all('img'):
            img_url = urljoin(url, img.get('src', ''))
            stats['images'].append({
                'url': img_url,
                'alt': img.get('alt', ''),
                'width': img.get('width', 'unknown'),
                'height': img.get('height', 'unknown')
            })
        
    except Exception as e:
        stats['error'] = str(e)
        print(f"โŒ Error crawling {url}: {e}")
    
    return stats

# ๐Ÿ”„ Reduce: Combine all results
def reduce_web_analysis(mapped_results):
    """Combine analysis from all pages"""
    analysis = {
        'total_pages': len(mapped_results),
        'successful_pages': 0,
        'total_links': 0,
        'domains': Counter(),
        'common_words': Counter(),
        'all_images': [],
        'avg_load_time': 0,
        'link_graph': defaultdict(set)
    }
    
    total_load_time = 0
    
    for result in mapped_results:
        if not result['error']:
            analysis['successful_pages'] += 1
            total_load_time += result['load_time']
            
            # ๐Ÿ“Š Aggregate links
            for link in result['links']:
                analysis['total_links'] += 1
                analysis['domains'][link['domain']] += 1
                
                # Build link graph
                from_domain = urlparse(result['url']).netloc
                to_domain = link['domain']
                if from_domain != to_domain:
                    analysis['link_graph'][from_domain].add(to_domain)
            
            # ๐Ÿ“ Aggregate words
            analysis['common_words'].update(result['words'])
            
            # ๐Ÿ–ผ๏ธ Collect images
            analysis['all_images'].extend(result['images'])
    
    # Calculate averages
    if analysis['successful_pages'] > 0:
        analysis['avg_load_time'] = total_load_time / analysis['successful_pages']
    
    # Get top items
    analysis['top_domains'] = analysis['domains'].most_common(10)
    analysis['top_words'] = analysis['common_words'].most_common(20)
    
    return analysis

# ๐Ÿš€ Web Crawler class
class WebCrawler:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers
        self.visited_urls = set()
    
    def crawl(self, start_urls, max_depth=2):
        """Crawl websites using MapReduce"""
        print(f"๐Ÿ•ท๏ธ Starting web crawl with {len(start_urls)} seed URLs...")
        
        urls_to_crawl = list(start_urls)
        all_results = []
        
        for depth in range(max_depth):
            print(f"\n๐Ÿ” Crawling depth {depth + 1}...")
            
            # Filter out already visited URLs
            new_urls = [url for url in urls_to_crawl 
                       if url not in self.visited_urls]
            
            if not new_urls:
                print("โœ… No new URLs to crawl!")
                break
            
            # MapReduce magic! โœจ
            with Pool(self.num_workers) as pool:
                results = pool.map(map_web_page, new_urls)
            
            all_results.extend(results)
            self.visited_urls.update(new_urls)
            
            # Extract new URLs for next depth
            urls_to_crawl = []
            for result in results:
                if not result['error']:
                    for link in result['links'][:10]:  # Limit to 10 per page
                        if link['url'] not in self.visited_urls:
                            urls_to_crawl.append(link['url'])
        
        # Final analysis
        final_analysis = reduce_web_analysis(all_results)
        return final_analysis
    
    def generate_report(self, analysis):
        """Generate a beautiful crawl report"""
        print("\n" + "="*60)
        print("๐Ÿ•ธ๏ธ WEB CRAWL ANALYSIS REPORT")
        print("="*60)
        
        print(f"\n๐Ÿ“Š Crawl Statistics:")
        print(f"  ๐Ÿ“„ Total pages crawled: {analysis['total_pages']}")
        print(f"  โœ… Successful: {analysis['successful_pages']}")
        print(f"  ๐Ÿ”— Total links found: {analysis['total_links']}")
        print(f"  โฑ๏ธ Average load time: {analysis['avg_load_time']:.2f}s")
        
        print(f"\n๐ŸŒ Top 10 Linked Domains:")
        for domain, count in analysis['top_domains']:
            print(f"  {domain}: {count} links")
        
        print(f"\n๐Ÿ“ Top 20 Common Words:")
        for i, (word, count) in enumerate(analysis['top_words']):
            if i % 4 == 0:
                print()
            print(f"  {word}({count})", end="")
        
        print(f"\n\n๐Ÿ–ผ๏ธ Images Found: {len(analysis['all_images'])}")
        
        print(f"\n๐Ÿ”— Link Network:")
        for from_domain, to_domains in list(analysis['link_graph'].items())[:5]:
            print(f"  {from_domain} โ†’ {', '.join(list(to_domains)[:3])}")

# ๐ŸŽฎ Test it out!
if __name__ == "__main__":
    crawler = WebCrawler(num_workers=4)
    
    # Start with some seed URLs
    seed_urls = [
        "https://python.org",
        "https://docs.python.org",
        "https://pypi.org"
    ]
    
    # Crawl! ๐Ÿ•ท๏ธ
    start_time = time.time()
    results = crawler.crawl(seed_urls, max_depth=1)
    elapsed = time.time() - start_time
    
    # Show results
    crawler.generate_report(results)
    print(f"\nโฑ๏ธ Total crawl time: {elapsed:.2f} seconds")
    print(f"๐Ÿš€ Pages per second: {results['total_pages'] / elapsed:.2f}")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Implement MapReduce patterns with confidence ๐Ÿ’ช
  • โœ… Process large datasets in parallel ๐Ÿ›ก๏ธ
  • โœ… Build scalable data pipelines ๐ŸŽฏ
  • โœ… Debug parallel processing issues like a pro ๐Ÿ›
  • โœ… Create efficient big data solutions with Python! ๐Ÿš€

Remember: MapReduce is your friend for big data challenges! Itโ€™s here to help you process data at scale. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered the MapReduce pattern!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the exercises above
  2. ๐Ÿ—๏ธ Build a MapReduce system for your own data
  3. ๐Ÿ“š Explore frameworks like Apache Spark or Dask
  4. ๐ŸŒŸ Share your MapReduce projects with the community!

Remember: Every big data expert started with simple map and reduce functions. Keep coding, keep learning, and most importantly, have fun! ๐Ÿš€


Happy parallel processing! ๐ŸŽ‰๐Ÿš€โœจ