+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 311 of 365

๐Ÿ“˜ FP Project: Data Processing Pipeline

Master fp project: data processing pipeline in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on building a functional data processing pipeline in Python! ๐ŸŽ‰ In this guide, weโ€™ll explore how to create powerful, composable data transformation pipelines using functional programming principles.

Youโ€™ll discover how functional programming can transform your data processing tasks into elegant, maintainable pipelines. Whether youโ€™re analyzing logs ๐Ÿ“Š, transforming datasets ๐Ÿ“ˆ, or building ETL systems ๐Ÿ—๏ธ, understanding functional pipelines is essential for writing robust, scalable code.

By the end of this tutorial, youโ€™ll feel confident building your own data processing pipelines using functional programming techniques! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Functional Data Pipelines

๐Ÿค” What is a Functional Data Pipeline?

A functional data pipeline is like a factory assembly line ๐Ÿญ. Think of it as a series of stations where each station performs one specific transformation on your data, passing the result to the next station.

In Python terms, itโ€™s a chain of pure functions that transform data step by step. This means you can:

  • โœจ Build complex transformations from simple functions
  • ๐Ÿš€ Process data efficiently with lazy evaluation
  • ๐Ÿ›ก๏ธ Create predictable, testable data flows

๐Ÿ’ก Why Use Functional Pipelines?

Hereโ€™s why developers love functional pipelines:

  1. Composability ๐Ÿงฉ: Build complex operations from simple pieces
  2. Reusability โ™ป๏ธ: Use the same transformations in multiple pipelines
  3. Testability ๐Ÿงช: Test each function independently
  4. Clarity ๐Ÿ”: Read pipelines like a recipe

Real-world example: Imagine processing web server logs ๐Ÿ“‹. With functional pipelines, you can filter errors, extract timestamps, aggregate by hour, and generate reports - all with composable functions!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Pipeline Example

Letโ€™s start with a friendly example:

# ๐Ÿ‘‹ Hello, Functional Pipeline!
from functools import reduce
from typing import List, Callable, Any

# ๐ŸŽจ Create simple transformation functions
def add_exclamation(text: str) -> str:
    """Add excitement to text! ๐ŸŽ‰"""
    return f"{text}!"

def uppercase(text: str) -> str:
    """Make text LOUD! ๐Ÿ“ข"""
    return text.upper()

def add_emoji(text: str) -> str:
    """Add some personality! ๐Ÿ˜Š"""
    return f"{text} ๐Ÿš€"

# ๐Ÿ”„ Compose functions into a pipeline
def pipeline(*functions: Callable) -> Callable:
    """Create a pipeline from functions"""
    def pipe(data: Any) -> Any:
        return reduce(lambda result, func: func(result), functions, data)
    return pipe

# ๐ŸŽฎ Let's use it!
text_pipeline = pipeline(
    add_exclamation,
    uppercase,
    add_emoji
)

result = text_pipeline("hello world")
print(result)  # HELLO WORLD! ๐Ÿš€

๐Ÿ’ก Explanation: Notice how we chain functions together! Each function takes the output of the previous one, creating a smooth data flow.

๐ŸŽฏ Common Pipeline Patterns

Here are patterns youโ€™ll use daily:

# ๐Ÿ—๏ธ Pattern 1: Filter-Map-Reduce
from typing import Iterable, TypeVar, Optional

T = TypeVar('T')

class Pipeline:
    """Fluent pipeline builder ๐Ÿ—๏ธ"""
    
    def __init__(self, data: Iterable[T]):
        self.data = data
    
    def filter(self, predicate: Callable[[T], bool]) -> 'Pipeline':
        """Filter items that match condition ๐Ÿ”"""
        self.data = filter(predicate, self.data)
        return self
    
    def map(self, transform: Callable[[T], Any]) -> 'Pipeline':
        """Transform each item ๐ŸŽจ"""
        self.data = map(transform, self.data)
        return self
    
    def reduce(self, reducer: Callable[[Any, T], Any], initial: Any) -> Any:
        """Combine all items ๐Ÿ”„"""
        return reduce(reducer, self.data, initial)
    
    def collect(self) -> List[Any]:
        """Get final results ๐Ÿ“ฆ"""
        return list(self.data)

# ๐ŸŽจ Pattern 2: Lazy evaluation with generators
def read_large_file(filename: str):
    """Read file line by line (memory efficient!) ๐Ÿ’พ"""
    with open(filename, 'r') as f:
        for line in f:
            yield line.strip()

# ๐Ÿ”„ Pattern 3: Function composition
def compose(*functions):
    """Compose functions right to left ๐Ÿ”—"""
    def inner(data):
        return reduce(lambda x, f: f(x), reversed(functions), data)
    return inner

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: E-commerce Sales Analytics

Letโ€™s build a real data processing pipeline:

# ๐Ÿ›๏ธ Define our data structures
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict
import json

@dataclass
class Sale:
    """Single sale record ๐Ÿ’ฐ"""
    id: str
    product: str
    amount: float
    category: str
    timestamp: datetime
    emoji: str  # Every product needs an emoji!

class SalesAnalysisPipeline:
    """Process sales data functionally ๐Ÿ“Š"""
    
    def __init__(self, sales: List[Sale]):
        self.pipeline = Pipeline(sales)
    
    # ๐Ÿ” Filter functions
    @staticmethod
    def is_high_value(sale: Sale) -> bool:
        """Find big sales! ๐Ÿ’Ž"""
        return sale.amount > 100
    
    @staticmethod
    def is_category(category: str):
        """Filter by category ๐Ÿท๏ธ"""
        return lambda sale: sale.category == category
    
    # ๐ŸŽจ Transform functions
    @staticmethod
    def to_summary(sale: Sale) -> Dict:
        """Create sale summary ๐Ÿ“‹"""
        return {
            'product': f"{sale.emoji} {sale.product}",
            'amount': sale.amount,
            'date': sale.timestamp.strftime('%Y-%m-%d')
        }
    
    @staticmethod
    def add_tax(tax_rate: float):
        """Add tax calculation ๐Ÿ’ธ"""
        return lambda sale: Sale(
            **{**sale.__dict__, 'amount': sale.amount * (1 + tax_rate)}
        )
    
    # ๐Ÿ“Š Aggregation functions
    def analyze_by_category(self) -> Dict[str, float]:
        """Group sales by category ๐Ÿ“ˆ"""
        from itertools import groupby
        from operator import attrgetter
        
        # Sort by category first
        sorted_sales = sorted(self.pipeline.data, key=attrgetter('category'))
        
        result = {}
        for category, group in groupby(sorted_sales, key=attrgetter('category')):
            total = sum(sale.amount for sale in group)
            result[category] = total
            print(f"๐Ÿ“ฆ {category}: ${total:,.2f}")
        
        return result
    
    def top_products(self, n: int = 5) -> List[Dict]:
        """Find best sellers ๐Ÿ†"""
        product_sales = {}
        
        for sale in self.pipeline.data:
            key = f"{sale.emoji} {sale.product}"
            product_sales[key] = product_sales.get(key, 0) + sale.amount
        
        # Sort and get top N
        top = sorted(product_sales.items(), key=lambda x: x[1], reverse=True)[:n]
        
        print("๐Ÿ† Top Products:")
        for i, (product, total) in enumerate(top, 1):
            print(f"  {i}. {product}: ${total:,.2f}")
        
        return [{'product': p, 'total': t} for p, t in top]

# ๐ŸŽฎ Let's use it!
sales_data = [
    Sale("1", "Laptop", 1200, "Electronics", datetime.now(), "๐Ÿ’ป"),
    Sale("2", "Coffee Maker", 89, "Appliances", datetime.now(), "โ˜•"),
    Sale("3", "Smartphone", 899, "Electronics", datetime.now(), "๐Ÿ“ฑ"),
    Sale("4", "Book", 29, "Media", datetime.now(), "๐Ÿ“š"),
    Sale("5", "Headphones", 199, "Electronics", datetime.now(), "๐ŸŽง"),
]

# Create analysis pipeline
analyzer = SalesAnalysisPipeline(sales_data)

# Filter high-value electronics
high_value_electronics = (
    Pipeline(sales_data)
    .filter(SalesAnalysisPipeline.is_high_value)
    .filter(SalesAnalysisPipeline.is_category("Electronics"))
    .map(SalesAnalysisPipeline.to_summary)
    .collect()
)

print("๐Ÿ’Ž High-value electronics:", high_value_electronics)

๐ŸŽฏ Try it yourself: Add a time-based filter to analyze sales by hour or day!

๐ŸŽฎ Example 2: Log Processing Pipeline

Letโ€™s process server logs functionally:

# ๐Ÿ† Advanced log processing pipeline
import re
from enum import Enum
from collections import Counter

class LogLevel(Enum):
    """Log severity levels ๐Ÿšฆ"""
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

@dataclass
class LogEntry:
    """Parsed log entry ๐Ÿ“"""
    timestamp: datetime
    level: LogLevel
    message: str
    source: str
    emoji: str

class LogProcessingPipeline:
    """Functional log analysis ๐Ÿ”"""
    
    # ๐ŸŽจ Parser functions
    @staticmethod
    def parse_log_line(line: str) -> Optional[LogEntry]:
        """Parse raw log line ๐Ÿ“‹"""
        pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] \[(\w+)\] (.+)'
        match = re.match(pattern, line)
        
        if not match:
            return None
        
        timestamp_str, level_str, source, message = match.groups()
        
        # Pick emoji based on level
        emoji_map = {
            "DEBUG": "๐Ÿ›",
            "INFO": "โ„น๏ธ",
            "WARNING": "โš ๏ธ",
            "ERROR": "โŒ",
            "CRITICAL": "๐Ÿšจ"
        }
        
        return LogEntry(
            timestamp=datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S'),
            level=LogLevel(level_str),
            message=message,
            source=source,
            emoji=emoji_map.get(level_str, "๐Ÿ“")
        )
    
    # ๐Ÿ” Filter predicates
    @staticmethod
    def is_error_or_above(entry: LogEntry) -> bool:
        """Find problems! ๐Ÿšจ"""
        return entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]
    
    @staticmethod
    def contains_keyword(keyword: str):
        """Search for specific terms ๐Ÿ”Ž"""
        return lambda entry: keyword.lower() in entry.message.lower()
    
    @staticmethod
    def within_timeframe(start: datetime, end: datetime):
        """Filter by time window โฐ"""
        return lambda entry: start <= entry.timestamp <= end
    
    # ๐ŸŽจ Transform functions
    @staticmethod
    def anonymize_ips(entry: LogEntry) -> LogEntry:
        """Remove IP addresses for privacy ๐Ÿ›ก๏ธ"""
        ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
        anonymized_message = re.sub(ip_pattern, 'XXX.XXX.XXX.XXX', entry.message)
        
        return LogEntry(
            timestamp=entry.timestamp,
            level=entry.level,
            message=anonymized_message,
            source=entry.source,
            emoji=entry.emoji
        )
    
    @staticmethod
    def extract_metrics(entry: LogEntry) -> Dict:
        """Extract performance metrics ๐Ÿ“Š"""
        # Look for response time patterns
        time_pattern = r'response_time=(\d+)ms'
        match = re.search(time_pattern, entry.message)
        
        return {
            'timestamp': entry.timestamp,
            'source': entry.source,
            'response_time': int(match.group(1)) if match else None,
            'level': entry.level.value,
            'emoji': entry.emoji
        }
    
    # ๐Ÿ“Š Analysis functions
    @staticmethod
    def analyze_error_patterns(entries: List[LogEntry]) -> Dict:
        """Find common error patterns ๐Ÿ”"""
        error_messages = [
            entry.message for entry in entries 
            if entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]
        ]
        
        # Extract error types
        error_types = []
        for msg in error_messages:
            if "timeout" in msg.lower():
                error_types.append("โฑ๏ธ Timeout")
            elif "connection" in msg.lower():
                error_types.append("๐Ÿ”Œ Connection")
            elif "memory" in msg.lower():
                error_types.append("๐Ÿ’พ Memory")
            elif "permission" in msg.lower():
                error_types.append("๐Ÿ”’ Permission")
            else:
                error_types.append("โ“ Other")
        
        return dict(Counter(error_types))
    
    @staticmethod
    def generate_summary_report(entries: List[LogEntry]) -> str:
        """Create readable summary ๐Ÿ“‘"""
        total = len(entries)
        by_level = Counter(entry.level.value for entry in entries)
        
        report = f"""
๐Ÿ“Š Log Analysis Summary
====================
๐Ÿ“ Total Entries: {total}
โฐ Time Range: {entries[0].timestamp} to {entries[-1].timestamp if entries else 'N/A'}

๐Ÿ“ˆ By Level:
"""
        for level, count in by_level.most_common():
            emoji = {"DEBUG": "๐Ÿ›", "INFO": "โ„น๏ธ", "WARNING": "โš ๏ธ", 
                    "ERROR": "โŒ", "CRITICAL": "๐Ÿšจ"}.get(level, "๐Ÿ“")
            percentage = (count / total * 100) if total > 0 else 0
            report += f"  {emoji} {level}: {count} ({percentage:.1f}%)\n"
        
        return report

# ๐ŸŽฎ Example usage
sample_logs = [
    "2024-01-15 10:30:45 [INFO] [API] Request processed response_time=45ms",
    "2024-01-15 10:31:02 [ERROR] [DB] Connection timeout to 192.168.1.100",
    "2024-01-15 10:31:15 [WARNING] [API] High memory usage detected",
    "2024-01-15 10:31:30 [CRITICAL] [AUTH] Multiple failed login attempts from 10.0.0.5",
    "2024-01-15 10:32:00 [INFO] [API] Health check passed",
]

# Process logs through pipeline
parsed_logs = [
    LogProcessingPipeline.parse_log_line(line) 
    for line in sample_logs
]
parsed_logs = [log for log in parsed_logs if log]  # Remove None values

# Create pipeline for error analysis
error_pipeline = (
    Pipeline(parsed_logs)
    .filter(LogProcessingPipeline.is_error_or_above)
    .map(LogProcessingPipeline.anonymize_ips)
    .collect()
)

print("๐Ÿšจ Anonymized Errors:")
for entry in error_pipeline:
    print(f"  {entry.emoji} [{entry.level.value}] {entry.message}")

# Generate summary
summary = LogProcessingPipeline.generate_summary_report(parsed_logs)
print(summary)

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Lazy Evaluation with Generators

When youโ€™re ready to level up, try this advanced pattern:

# ๐ŸŽฏ Advanced generator-based pipeline
from typing import Generator, Iterator
import itertools

class LazyPipeline:
    """Memory-efficient lazy pipeline ๐Ÿ’ซ"""
    
    def __init__(self, source: Iterator):
        self.source = source
    
    def filter(self, predicate: Callable) -> 'LazyPipeline':
        """Lazy filter - no computation yet! ๐Ÿฆฅ"""
        self.source = (item for item in self.source if predicate(item))
        return self
    
    def map(self, transform: Callable) -> 'LazyPipeline':
        """Lazy map - still no computation! ๐ŸŽจ"""
        self.source = (transform(item) for item in self.source)
        return self
    
    def take(self, n: int) -> 'LazyPipeline':
        """Take only first n items โœ‚๏ธ"""
        self.source = itertools.islice(self.source, n)
        return self
    
    def batch(self, size: int) -> 'LazyPipeline':
        """Process in batches ๐Ÿ“ฆ"""
        def make_batches():
            batch = []
            for item in self.source:
                batch.append(item)
                if len(batch) == size:
                    yield batch
                    batch = []
            if batch:  # Don't forget the last batch!
                yield batch
        
        self.source = make_batches()
        return self
    
    def window(self, size: int) -> 'LazyPipeline':
        """Sliding window over data ๐ŸชŸ"""
        def make_windows():
            window = []
            for item in self.source:
                window.append(item)
                if len(window) == size:
                    yield tuple(window)
                    window.pop(0)
        
        self.source = make_windows()
        return self
    
    def execute(self) -> List:
        """Force evaluation - computation happens here! โšก"""
        return list(self.source)
    
    def stream(self) -> Generator:
        """Stream results one by one ๐ŸŒŠ"""
        for item in self.source:
            yield item

# ๐Ÿช„ Using the lazy pipeline
def infinite_numbers():
    """Generate infinite sequence ๐Ÿ”ข"""
    n = 0
    while True:
        yield n
        n += 1

# Process infinite stream lazily!
result = (
    LazyPipeline(infinite_numbers())
    .filter(lambda x: x % 2 == 0)  # Even numbers
    .map(lambda x: x ** 2)  # Square them
    .filter(lambda x: x < 1000)  # Less than 1000
    .take(10)  # Only first 10
    .execute()
)

print("๐ŸŽฏ First 10 even squares < 1000:", result)

๐Ÿ—๏ธ Advanced Topic 2: Functional Error Handling

For the brave developers - monadic error handling:

# ๐Ÿš€ Railway-oriented programming
from typing import Union, Callable, Generic, TypeVar
from dataclasses import dataclass

T = TypeVar('T')
E = TypeVar('E')

@dataclass
class Success(Generic[T]):
    """Successful result ๐ŸŽ‰"""
    value: T

@dataclass
class Failure(Generic[E]):
    """Error result ๐Ÿ˜ข"""
    error: E

Result = Union[Success[T], Failure[E]]

class ResultPipeline:
    """Pipeline with error handling ๐Ÿ›ก๏ธ"""
    
    def __init__(self, result: Result):
        self.result = result
    
    def then(self, func: Callable[[T], Result]) -> 'ResultPipeline':
        """Chain operations that might fail ๐Ÿ”—"""
        if isinstance(self.result, Failure):
            return self  # Skip if already failed
        
        try:
            new_result = func(self.result.value)
            return ResultPipeline(new_result)
        except Exception as e:
            return ResultPipeline(Failure(str(e)))
    
    def map(self, func: Callable[[T], Any]) -> 'ResultPipeline':
        """Transform if successful ๐ŸŽจ"""
        if isinstance(self.result, Failure):
            return self
        
        try:
            new_value = func(self.result.value)
            return ResultPipeline(Success(new_value))
        except Exception as e:
            return ResultPipeline(Failure(str(e)))
    
    def recover(self, handler: Callable[[E], T]) -> 'ResultPipeline':
        """Recover from errors ๐Ÿฅ"""
        if isinstance(self.result, Success):
            return self
        
        try:
            recovered = handler(self.result.error)
            return ResultPipeline(Success(recovered))
        except Exception as e:
            return ResultPipeline(Failure(str(e)))
    
    def unwrap_or(self, default: T) -> T:
        """Get value or default ๐Ÿ“ฆ"""
        if isinstance(self.result, Success):
            return self.result.value
        return default

# ๐ŸŽฎ Example: Safe division pipeline
def safe_divide(a: float, b: float) -> Result:
    """Divide safely โž—"""
    if b == 0:
        return Failure("Division by zero! ๐Ÿšซ")
    return Success(a / b)

def validate_positive(n: float) -> Result:
    """Ensure positive number โœ…"""
    if n < 0:
        return Failure(f"Negative number: {n} ๐Ÿ‘Ž")
    return Success(n)

# Chain operations safely
result = (
    ResultPipeline(Success(100))
    .then(lambda x: safe_divide(x, 2))
    .then(lambda x: validate_positive(x))
    .map(lambda x: f"Result: {x} ๐ŸŽฏ")
    .unwrap_or("Operation failed! ๐Ÿ˜ข")
)

print(result)

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Mutating Data in Pipelines

# โŒ Wrong way - mutating shared state!
data = [1, 2, 3, 4, 5]

def bad_transform(lst):
    lst.append(6)  # ๐Ÿ’ฅ Mutates original!
    return lst

pipeline_result = Pipeline([data]).map(bad_transform).collect()
print(data)  # [1, 2, 3, 4, 5, 6] - Original changed! ๐Ÿ˜ฐ

# โœ… Correct way - create new data!
def good_transform(lst):
    return lst + [6]  # Creates new list! ๐Ÿ›ก๏ธ

data = [1, 2, 3, 4, 5]
pipeline_result = Pipeline([data]).map(good_transform).collect()
print(data)  # [1, 2, 3, 4, 5] - Original unchanged! โœจ

๐Ÿคฏ Pitfall 2: Eager Evaluation Memory Issues

# โŒ Dangerous - loads entire file into memory!
def process_huge_file_bad(filename):
    with open(filename) as f:
        lines = f.readlines()  # ๐Ÿ’ฅ Loads everything!
    
    return Pipeline(lines).filter(lambda x: "ERROR" in x).collect()

# โœ… Safe - processes line by line!
def process_huge_file_good(filename):
    def line_generator():
        with open(filename) as f:
            for line in f:
                yield line.strip()
    
    return LazyPipeline(line_generator()).filter(
        lambda x: "ERROR" in x
    ).execute()

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Keep Functions Pure: No side effects - same input, same output!
  2. ๐Ÿ“ Name Functions Clearly: filter_active_users not f1
  3. ๐Ÿ›ก๏ธ Handle Errors Gracefully: Use Result types or try-except
  4. ๐ŸŽจ Compose Small Functions: Each does one thing well
  5. โœจ Use Type Hints: Makes pipelines self-documenting

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Real-Time Stream Processor

Create a functional pipeline for processing streaming data:

๐Ÿ“‹ Requirements:

  • โœ… Process streaming sensor data (temperature, humidity, pressure)
  • ๐Ÿท๏ธ Filter anomalies (values outside normal range)
  • ๐Ÿ‘ค Group by sensor location
  • ๐Ÿ“… Calculate rolling averages
  • ๐ŸŽจ Generate alerts for critical values!

๐Ÿš€ Bonus Points:

  • Add time-window aggregations
  • Implement backpressure handling
  • Create real-time dashboard data

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Real-time sensor stream processor!
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import deque
from typing import Deque, Optional
import random
import time

@dataclass
class SensorReading:
    """Single sensor measurement ๐ŸŒก๏ธ"""
    sensor_id: str
    location: str
    temperature: float
    humidity: float
    pressure: float
    timestamp: datetime
    emoji: str

class StreamProcessor:
    """Functional stream processing pipeline ๐ŸŒŠ"""
    
    def __init__(self, window_size: int = 10):
        self.windows: Dict[str, Deque[SensorReading]] = {}
        self.window_size = window_size
        self.alerts: List[str] = []
    
    # ๐Ÿ” Filter functions
    @staticmethod
    def is_valid_reading(reading: SensorReading) -> bool:
        """Validate sensor data ๐Ÿ›ก๏ธ"""
        return (
            -50 <= reading.temperature <= 100 and
            0 <= reading.humidity <= 100 and
            900 <= reading.pressure <= 1100
        )
    
    @staticmethod
    def is_anomaly(reading: SensorReading) -> bool:
        """Detect anomalies ๐Ÿšจ"""
        return (
            reading.temperature > 40 or
            reading.temperature < -10 or
            reading.humidity > 90 or
            reading.pressure < 950 or
            reading.pressure > 1050
        )
    
    # ๐ŸŽจ Transform functions
    def add_to_window(self, reading: SensorReading) -> SensorReading:
        """Maintain sliding window ๐ŸชŸ"""
        key = f"{reading.location}_{reading.sensor_id}"
        
        if key not in self.windows:
            self.windows[key] = deque(maxlen=self.window_size)
        
        self.windows[key].append(reading)
        return reading
    
    def calculate_rolling_stats(self, reading: SensorReading) -> Dict:
        """Calculate rolling statistics ๐Ÿ“Š"""
        key = f"{reading.location}_{reading.sensor_id}"
        window = self.windows.get(key, [])
        
        if not window:
            return {}
        
        temps = [r.temperature for r in window]
        humids = [r.humidity for r in window]
        pressures = [r.pressure for r in window]
        
        return {
            'sensor': f"{reading.emoji} {reading.sensor_id}",
            'location': reading.location,
            'current': {
                'temperature': reading.temperature,
                'humidity': reading.humidity,
                'pressure': reading.pressure
            },
            'rolling_avg': {
                'temperature': sum(temps) / len(temps),
                'humidity': sum(humids) / len(humids),
                'pressure': sum(pressures) / len(pressures)
            },
            'rolling_min': {
                'temperature': min(temps),
                'humidity': min(humids),
                'pressure': min(pressures)
            },
            'rolling_max': {
                'temperature': max(temps),
                'humidity': max(humids),
                'pressure': max(pressures)
            }
        }
    
    def generate_alert(self, reading: SensorReading) -> Optional[str]:
        """Create alerts for critical values ๐Ÿšจ"""
        alerts = []
        
        if reading.temperature > 45:
            alerts.append(f"๐Ÿ”ฅ CRITICAL: High temperature {reading.temperature}ยฐC")
        elif reading.temperature < -20:
            alerts.append(f"๐ŸงŠ CRITICAL: Low temperature {reading.temperature}ยฐC")
        
        if reading.humidity > 95:
            alerts.append(f"๐Ÿ’ง CRITICAL: High humidity {reading.humidity}%")
        
        if reading.pressure < 920 or reading.pressure > 1080:
            alerts.append(f"๐ŸŒช๏ธ CRITICAL: Abnormal pressure {reading.pressure} hPa")
        
        if alerts:
            alert_msg = f"{reading.emoji} Sensor {reading.sensor_id} @ {reading.location}:\n"
            alert_msg += "\n".join(f"  {a}" for a in alerts)
            self.alerts.append(alert_msg)
            return alert_msg
        
        return None
    
    def process_stream(self, readings: Iterator[SensorReading]):
        """Main processing pipeline ๐Ÿ—๏ธ"""
        pipeline = (
            LazyPipeline(readings)
            .filter(self.is_valid_reading)
            .map(self.add_to_window)
            .map(lambda r: (r, self.calculate_rolling_stats(r)))
            .map(lambda pair: {
                'reading': pair[0],
                'stats': pair[1],
                'alert': self.generate_alert(pair[0])
            })
        )
        
        return pipeline

# ๐ŸŽฎ Simulate sensor stream
def generate_sensor_stream():
    """Generate realistic sensor data ๐Ÿ“ก"""
    sensors = [
        ("TEMP-001", "Factory Floor", "๐Ÿญ"),
        ("TEMP-002", "Server Room", "๐Ÿ–ฅ๏ธ"),
        ("TEMP-003", "Warehouse", "๐Ÿ“ฆ"),
        ("TEMP-004", "Office", "๐Ÿข")
    ]
    
    while True:
        for sensor_id, location, emoji in sensors:
            # Generate realistic data with occasional anomalies
            base_temp = 20 + random.gauss(0, 5)
            if random.random() < 0.1:  # 10% chance of anomaly
                base_temp += random.choice([-30, 30])
            
            yield SensorReading(
                sensor_id=sensor_id,
                location=location,
                temperature=round(base_temp, 1),
                humidity=round(50 + random.gauss(0, 15), 1),
                pressure=round(1013 + random.gauss(0, 20), 1),
                timestamp=datetime.now(),
                emoji=emoji
            )
        
        time.sleep(0.1)  # Simulate real-time delay

# ๐Ÿš€ Run the processor!
processor = StreamProcessor(window_size=5)
stream = generate_sensor_stream()

# Process first 20 readings
for i, result in enumerate(processor.process_stream(stream).stream()):
    if i >= 20:
        break
    
    reading = result['reading']
    stats = result['stats']
    alert = result['alert']
    
    print(f"\n๐Ÿ“Š Reading {i+1}:")
    print(f"  {reading.emoji} {reading.sensor_id} @ {reading.location}")
    print(f"  ๐ŸŒก๏ธ Temp: {reading.temperature}ยฐC")
    
    if stats and 'rolling_avg' in stats:
        avg_temp = stats['rolling_avg']['temperature']
        print(f"  ๐Ÿ“ˆ 5-reading avg: {avg_temp:.1f}ยฐC")
    
    if alert:
        print(f"  {alert}")

# Show final alerts summary
if processor.alerts:
    print("\n๐Ÿšจ ALERTS SUMMARY:")
    for alert in processor.alerts[-5:]:  # Last 5 alerts
        print(alert)

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Build functional pipelines with confidence ๐Ÿ’ช
  • โœ… Process data lazily for memory efficiency ๐Ÿ›ก๏ธ
  • โœ… Compose complex transformations from simple functions ๐ŸŽฏ
  • โœ… Handle errors functionally with Result types ๐Ÿ›
  • โœ… Create real-time stream processors with Python! ๐Ÿš€

Remember: Functional pipelines make your code more modular, testable, and maintainable! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered functional data processing pipelines!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the streaming exercise above
  2. ๐Ÿ—๏ธ Build a pipeline for your own data processing needs
  3. ๐Ÿ“š Explore libraries like toolz and fn.py for more functional tools
  4. ๐ŸŒŸ Share your functional pipelines with the community!

Remember: Every data engineering expert started with their first pipeline. Keep building, keep learning, and most importantly, have fun! ๐Ÿš€


Happy functional programming! ๐ŸŽ‰๐Ÿš€โœจ