Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on building a functional data processing pipeline in Python! ๐ In this guide, weโll explore how to create powerful, composable data transformation pipelines using functional programming principles.
Youโll discover how functional programming can transform your data processing tasks into elegant, maintainable pipelines. Whether youโre analyzing logs ๐, transforming datasets ๐, or building ETL systems ๐๏ธ, understanding functional pipelines is essential for writing robust, scalable code.
By the end of this tutorial, youโll feel confident building your own data processing pipelines using functional programming techniques! Letโs dive in! ๐โโ๏ธ
๐ Understanding Functional Data Pipelines
๐ค What is a Functional Data Pipeline?
A functional data pipeline is like a factory assembly line ๐ญ. Think of it as a series of stations where each station performs one specific transformation on your data, passing the result to the next station.
In Python terms, itโs a chain of pure functions that transform data step by step. This means you can:
- โจ Build complex transformations from simple functions
- ๐ Process data efficiently with lazy evaluation
- ๐ก๏ธ Create predictable, testable data flows
๐ก Why Use Functional Pipelines?
Hereโs why developers love functional pipelines:
- Composability ๐งฉ: Build complex operations from simple pieces
- Reusability โป๏ธ: Use the same transformations in multiple pipelines
- Testability ๐งช: Test each function independently
- Clarity ๐: Read pipelines like a recipe
Real-world example: Imagine processing web server logs ๐. With functional pipelines, you can filter errors, extract timestamps, aggregate by hour, and generate reports - all with composable functions!
๐ง Basic Syntax and Usage
๐ Simple Pipeline Example
Letโs start with a friendly example:
# ๐ Hello, Functional Pipeline!
from functools import reduce
from typing import List, Callable, Any
# ๐จ Create simple transformation functions
def add_exclamation(text: str) -> str:
"""Add excitement to text! ๐"""
return f"{text}!"
def uppercase(text: str) -> str:
"""Make text LOUD! ๐ข"""
return text.upper()
def add_emoji(text: str) -> str:
"""Add some personality! ๐"""
return f"{text} ๐"
# ๐ Compose functions into a pipeline
def pipeline(*functions: Callable) -> Callable:
"""Create a pipeline from functions"""
def pipe(data: Any) -> Any:
return reduce(lambda result, func: func(result), functions, data)
return pipe
# ๐ฎ Let's use it!
text_pipeline = pipeline(
add_exclamation,
uppercase,
add_emoji
)
result = text_pipeline("hello world")
print(result) # HELLO WORLD! ๐
๐ก Explanation: Notice how we chain functions together! Each function takes the output of the previous one, creating a smooth data flow.
๐ฏ Common Pipeline Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Filter-Map-Reduce
from typing import Iterable, TypeVar, Optional
T = TypeVar('T')
class Pipeline:
"""Fluent pipeline builder ๐๏ธ"""
def __init__(self, data: Iterable[T]):
self.data = data
def filter(self, predicate: Callable[[T], bool]) -> 'Pipeline':
"""Filter items that match condition ๐"""
self.data = filter(predicate, self.data)
return self
def map(self, transform: Callable[[T], Any]) -> 'Pipeline':
"""Transform each item ๐จ"""
self.data = map(transform, self.data)
return self
def reduce(self, reducer: Callable[[Any, T], Any], initial: Any) -> Any:
"""Combine all items ๐"""
return reduce(reducer, self.data, initial)
def collect(self) -> List[Any]:
"""Get final results ๐ฆ"""
return list(self.data)
# ๐จ Pattern 2: Lazy evaluation with generators
def read_large_file(filename: str):
"""Read file line by line (memory efficient!) ๐พ"""
with open(filename, 'r') as f:
for line in f:
yield line.strip()
# ๐ Pattern 3: Function composition
def compose(*functions):
"""Compose functions right to left ๐"""
def inner(data):
return reduce(lambda x, f: f(x), reversed(functions), data)
return inner
๐ก Practical Examples
๐ Example 1: E-commerce Sales Analytics
Letโs build a real data processing pipeline:
# ๐๏ธ Define our data structures
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict
import json
@dataclass
class Sale:
"""Single sale record ๐ฐ"""
id: str
product: str
amount: float
category: str
timestamp: datetime
emoji: str # Every product needs an emoji!
class SalesAnalysisPipeline:
"""Process sales data functionally ๐"""
def __init__(self, sales: List[Sale]):
self.pipeline = Pipeline(sales)
# ๐ Filter functions
@staticmethod
def is_high_value(sale: Sale) -> bool:
"""Find big sales! ๐"""
return sale.amount > 100
@staticmethod
def is_category(category: str):
"""Filter by category ๐ท๏ธ"""
return lambda sale: sale.category == category
# ๐จ Transform functions
@staticmethod
def to_summary(sale: Sale) -> Dict:
"""Create sale summary ๐"""
return {
'product': f"{sale.emoji} {sale.product}",
'amount': sale.amount,
'date': sale.timestamp.strftime('%Y-%m-%d')
}
@staticmethod
def add_tax(tax_rate: float):
"""Add tax calculation ๐ธ"""
return lambda sale: Sale(
**{**sale.__dict__, 'amount': sale.amount * (1 + tax_rate)}
)
# ๐ Aggregation functions
def analyze_by_category(self) -> Dict[str, float]:
"""Group sales by category ๐"""
from itertools import groupby
from operator import attrgetter
# Sort by category first
sorted_sales = sorted(self.pipeline.data, key=attrgetter('category'))
result = {}
for category, group in groupby(sorted_sales, key=attrgetter('category')):
total = sum(sale.amount for sale in group)
result[category] = total
print(f"๐ฆ {category}: ${total:,.2f}")
return result
def top_products(self, n: int = 5) -> List[Dict]:
"""Find best sellers ๐"""
product_sales = {}
for sale in self.pipeline.data:
key = f"{sale.emoji} {sale.product}"
product_sales[key] = product_sales.get(key, 0) + sale.amount
# Sort and get top N
top = sorted(product_sales.items(), key=lambda x: x[1], reverse=True)[:n]
print("๐ Top Products:")
for i, (product, total) in enumerate(top, 1):
print(f" {i}. {product}: ${total:,.2f}")
return [{'product': p, 'total': t} for p, t in top]
# ๐ฎ Let's use it!
sales_data = [
Sale("1", "Laptop", 1200, "Electronics", datetime.now(), "๐ป"),
Sale("2", "Coffee Maker", 89, "Appliances", datetime.now(), "โ"),
Sale("3", "Smartphone", 899, "Electronics", datetime.now(), "๐ฑ"),
Sale("4", "Book", 29, "Media", datetime.now(), "๐"),
Sale("5", "Headphones", 199, "Electronics", datetime.now(), "๐ง"),
]
# Create analysis pipeline
analyzer = SalesAnalysisPipeline(sales_data)
# Filter high-value electronics
high_value_electronics = (
Pipeline(sales_data)
.filter(SalesAnalysisPipeline.is_high_value)
.filter(SalesAnalysisPipeline.is_category("Electronics"))
.map(SalesAnalysisPipeline.to_summary)
.collect()
)
print("๐ High-value electronics:", high_value_electronics)
๐ฏ Try it yourself: Add a time-based filter to analyze sales by hour or day!
๐ฎ Example 2: Log Processing Pipeline
Letโs process server logs functionally:
# ๐ Advanced log processing pipeline
import re
from enum import Enum
from collections import Counter
class LogLevel(Enum):
"""Log severity levels ๐ฆ"""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
@dataclass
class LogEntry:
"""Parsed log entry ๐"""
timestamp: datetime
level: LogLevel
message: str
source: str
emoji: str
class LogProcessingPipeline:
"""Functional log analysis ๐"""
# ๐จ Parser functions
@staticmethod
def parse_log_line(line: str) -> Optional[LogEntry]:
"""Parse raw log line ๐"""
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] \[(\w+)\] (.+)'
match = re.match(pattern, line)
if not match:
return None
timestamp_str, level_str, source, message = match.groups()
# Pick emoji based on level
emoji_map = {
"DEBUG": "๐",
"INFO": "โน๏ธ",
"WARNING": "โ ๏ธ",
"ERROR": "โ",
"CRITICAL": "๐จ"
}
return LogEntry(
timestamp=datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S'),
level=LogLevel(level_str),
message=message,
source=source,
emoji=emoji_map.get(level_str, "๐")
)
# ๐ Filter predicates
@staticmethod
def is_error_or_above(entry: LogEntry) -> bool:
"""Find problems! ๐จ"""
return entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]
@staticmethod
def contains_keyword(keyword: str):
"""Search for specific terms ๐"""
return lambda entry: keyword.lower() in entry.message.lower()
@staticmethod
def within_timeframe(start: datetime, end: datetime):
"""Filter by time window โฐ"""
return lambda entry: start <= entry.timestamp <= end
# ๐จ Transform functions
@staticmethod
def anonymize_ips(entry: LogEntry) -> LogEntry:
"""Remove IP addresses for privacy ๐ก๏ธ"""
ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
anonymized_message = re.sub(ip_pattern, 'XXX.XXX.XXX.XXX', entry.message)
return LogEntry(
timestamp=entry.timestamp,
level=entry.level,
message=anonymized_message,
source=entry.source,
emoji=entry.emoji
)
@staticmethod
def extract_metrics(entry: LogEntry) -> Dict:
"""Extract performance metrics ๐"""
# Look for response time patterns
time_pattern = r'response_time=(\d+)ms'
match = re.search(time_pattern, entry.message)
return {
'timestamp': entry.timestamp,
'source': entry.source,
'response_time': int(match.group(1)) if match else None,
'level': entry.level.value,
'emoji': entry.emoji
}
# ๐ Analysis functions
@staticmethod
def analyze_error_patterns(entries: List[LogEntry]) -> Dict:
"""Find common error patterns ๐"""
error_messages = [
entry.message for entry in entries
if entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]
]
# Extract error types
error_types = []
for msg in error_messages:
if "timeout" in msg.lower():
error_types.append("โฑ๏ธ Timeout")
elif "connection" in msg.lower():
error_types.append("๐ Connection")
elif "memory" in msg.lower():
error_types.append("๐พ Memory")
elif "permission" in msg.lower():
error_types.append("๐ Permission")
else:
error_types.append("โ Other")
return dict(Counter(error_types))
@staticmethod
def generate_summary_report(entries: List[LogEntry]) -> str:
"""Create readable summary ๐"""
total = len(entries)
by_level = Counter(entry.level.value for entry in entries)
report = f"""
๐ Log Analysis Summary
====================
๐ Total Entries: {total}
โฐ Time Range: {entries[0].timestamp} to {entries[-1].timestamp if entries else 'N/A'}
๐ By Level:
"""
for level, count in by_level.most_common():
emoji = {"DEBUG": "๐", "INFO": "โน๏ธ", "WARNING": "โ ๏ธ",
"ERROR": "โ", "CRITICAL": "๐จ"}.get(level, "๐")
percentage = (count / total * 100) if total > 0 else 0
report += f" {emoji} {level}: {count} ({percentage:.1f}%)\n"
return report
# ๐ฎ Example usage
sample_logs = [
"2024-01-15 10:30:45 [INFO] [API] Request processed response_time=45ms",
"2024-01-15 10:31:02 [ERROR] [DB] Connection timeout to 192.168.1.100",
"2024-01-15 10:31:15 [WARNING] [API] High memory usage detected",
"2024-01-15 10:31:30 [CRITICAL] [AUTH] Multiple failed login attempts from 10.0.0.5",
"2024-01-15 10:32:00 [INFO] [API] Health check passed",
]
# Process logs through pipeline
parsed_logs = [
LogProcessingPipeline.parse_log_line(line)
for line in sample_logs
]
parsed_logs = [log for log in parsed_logs if log] # Remove None values
# Create pipeline for error analysis
error_pipeline = (
Pipeline(parsed_logs)
.filter(LogProcessingPipeline.is_error_or_above)
.map(LogProcessingPipeline.anonymize_ips)
.collect()
)
print("๐จ Anonymized Errors:")
for entry in error_pipeline:
print(f" {entry.emoji} [{entry.level.value}] {entry.message}")
# Generate summary
summary = LogProcessingPipeline.generate_summary_report(parsed_logs)
print(summary)
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Lazy Evaluation with Generators
When youโre ready to level up, try this advanced pattern:
# ๐ฏ Advanced generator-based pipeline
from typing import Generator, Iterator
import itertools
class LazyPipeline:
"""Memory-efficient lazy pipeline ๐ซ"""
def __init__(self, source: Iterator):
self.source = source
def filter(self, predicate: Callable) -> 'LazyPipeline':
"""Lazy filter - no computation yet! ๐ฆฅ"""
self.source = (item for item in self.source if predicate(item))
return self
def map(self, transform: Callable) -> 'LazyPipeline':
"""Lazy map - still no computation! ๐จ"""
self.source = (transform(item) for item in self.source)
return self
def take(self, n: int) -> 'LazyPipeline':
"""Take only first n items โ๏ธ"""
self.source = itertools.islice(self.source, n)
return self
def batch(self, size: int) -> 'LazyPipeline':
"""Process in batches ๐ฆ"""
def make_batches():
batch = []
for item in self.source:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if batch: # Don't forget the last batch!
yield batch
self.source = make_batches()
return self
def window(self, size: int) -> 'LazyPipeline':
"""Sliding window over data ๐ช"""
def make_windows():
window = []
for item in self.source:
window.append(item)
if len(window) == size:
yield tuple(window)
window.pop(0)
self.source = make_windows()
return self
def execute(self) -> List:
"""Force evaluation - computation happens here! โก"""
return list(self.source)
def stream(self) -> Generator:
"""Stream results one by one ๐"""
for item in self.source:
yield item
# ๐ช Using the lazy pipeline
def infinite_numbers():
"""Generate infinite sequence ๐ข"""
n = 0
while True:
yield n
n += 1
# Process infinite stream lazily!
result = (
LazyPipeline(infinite_numbers())
.filter(lambda x: x % 2 == 0) # Even numbers
.map(lambda x: x ** 2) # Square them
.filter(lambda x: x < 1000) # Less than 1000
.take(10) # Only first 10
.execute()
)
print("๐ฏ First 10 even squares < 1000:", result)
๐๏ธ Advanced Topic 2: Functional Error Handling
For the brave developers - monadic error handling:
# ๐ Railway-oriented programming
from typing import Union, Callable, Generic, TypeVar
from dataclasses import dataclass
T = TypeVar('T')
E = TypeVar('E')
@dataclass
class Success(Generic[T]):
"""Successful result ๐"""
value: T
@dataclass
class Failure(Generic[E]):
"""Error result ๐ข"""
error: E
Result = Union[Success[T], Failure[E]]
class ResultPipeline:
"""Pipeline with error handling ๐ก๏ธ"""
def __init__(self, result: Result):
self.result = result
def then(self, func: Callable[[T], Result]) -> 'ResultPipeline':
"""Chain operations that might fail ๐"""
if isinstance(self.result, Failure):
return self # Skip if already failed
try:
new_result = func(self.result.value)
return ResultPipeline(new_result)
except Exception as e:
return ResultPipeline(Failure(str(e)))
def map(self, func: Callable[[T], Any]) -> 'ResultPipeline':
"""Transform if successful ๐จ"""
if isinstance(self.result, Failure):
return self
try:
new_value = func(self.result.value)
return ResultPipeline(Success(new_value))
except Exception as e:
return ResultPipeline(Failure(str(e)))
def recover(self, handler: Callable[[E], T]) -> 'ResultPipeline':
"""Recover from errors ๐ฅ"""
if isinstance(self.result, Success):
return self
try:
recovered = handler(self.result.error)
return ResultPipeline(Success(recovered))
except Exception as e:
return ResultPipeline(Failure(str(e)))
def unwrap_or(self, default: T) -> T:
"""Get value or default ๐ฆ"""
if isinstance(self.result, Success):
return self.result.value
return default
# ๐ฎ Example: Safe division pipeline
def safe_divide(a: float, b: float) -> Result:
"""Divide safely โ"""
if b == 0:
return Failure("Division by zero! ๐ซ")
return Success(a / b)
def validate_positive(n: float) -> Result:
"""Ensure positive number โ
"""
if n < 0:
return Failure(f"Negative number: {n} ๐")
return Success(n)
# Chain operations safely
result = (
ResultPipeline(Success(100))
.then(lambda x: safe_divide(x, 2))
.then(lambda x: validate_positive(x))
.map(lambda x: f"Result: {x} ๐ฏ")
.unwrap_or("Operation failed! ๐ข")
)
print(result)
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Mutating Data in Pipelines
# โ Wrong way - mutating shared state!
data = [1, 2, 3, 4, 5]
def bad_transform(lst):
lst.append(6) # ๐ฅ Mutates original!
return lst
pipeline_result = Pipeline([data]).map(bad_transform).collect()
print(data) # [1, 2, 3, 4, 5, 6] - Original changed! ๐ฐ
# โ
Correct way - create new data!
def good_transform(lst):
return lst + [6] # Creates new list! ๐ก๏ธ
data = [1, 2, 3, 4, 5]
pipeline_result = Pipeline([data]).map(good_transform).collect()
print(data) # [1, 2, 3, 4, 5] - Original unchanged! โจ
๐คฏ Pitfall 2: Eager Evaluation Memory Issues
# โ Dangerous - loads entire file into memory!
def process_huge_file_bad(filename):
with open(filename) as f:
lines = f.readlines() # ๐ฅ Loads everything!
return Pipeline(lines).filter(lambda x: "ERROR" in x).collect()
# โ
Safe - processes line by line!
def process_huge_file_good(filename):
def line_generator():
with open(filename) as f:
for line in f:
yield line.strip()
return LazyPipeline(line_generator()).filter(
lambda x: "ERROR" in x
).execute()
๐ ๏ธ Best Practices
- ๐ฏ Keep Functions Pure: No side effects - same input, same output!
- ๐ Name Functions Clearly:
filter_active_users
notf1
- ๐ก๏ธ Handle Errors Gracefully: Use Result types or try-except
- ๐จ Compose Small Functions: Each does one thing well
- โจ Use Type Hints: Makes pipelines self-documenting
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Real-Time Stream Processor
Create a functional pipeline for processing streaming data:
๐ Requirements:
- โ Process streaming sensor data (temperature, humidity, pressure)
- ๐ท๏ธ Filter anomalies (values outside normal range)
- ๐ค Group by sensor location
- ๐ Calculate rolling averages
- ๐จ Generate alerts for critical values!
๐ Bonus Points:
- Add time-window aggregations
- Implement backpressure handling
- Create real-time dashboard data
๐ก Solution
๐ Click to see solution
# ๐ฏ Real-time sensor stream processor!
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import deque
from typing import Deque, Optional
import random
import time
@dataclass
class SensorReading:
"""Single sensor measurement ๐ก๏ธ"""
sensor_id: str
location: str
temperature: float
humidity: float
pressure: float
timestamp: datetime
emoji: str
class StreamProcessor:
"""Functional stream processing pipeline ๐"""
def __init__(self, window_size: int = 10):
self.windows: Dict[str, Deque[SensorReading]] = {}
self.window_size = window_size
self.alerts: List[str] = []
# ๐ Filter functions
@staticmethod
def is_valid_reading(reading: SensorReading) -> bool:
"""Validate sensor data ๐ก๏ธ"""
return (
-50 <= reading.temperature <= 100 and
0 <= reading.humidity <= 100 and
900 <= reading.pressure <= 1100
)
@staticmethod
def is_anomaly(reading: SensorReading) -> bool:
"""Detect anomalies ๐จ"""
return (
reading.temperature > 40 or
reading.temperature < -10 or
reading.humidity > 90 or
reading.pressure < 950 or
reading.pressure > 1050
)
# ๐จ Transform functions
def add_to_window(self, reading: SensorReading) -> SensorReading:
"""Maintain sliding window ๐ช"""
key = f"{reading.location}_{reading.sensor_id}"
if key not in self.windows:
self.windows[key] = deque(maxlen=self.window_size)
self.windows[key].append(reading)
return reading
def calculate_rolling_stats(self, reading: SensorReading) -> Dict:
"""Calculate rolling statistics ๐"""
key = f"{reading.location}_{reading.sensor_id}"
window = self.windows.get(key, [])
if not window:
return {}
temps = [r.temperature for r in window]
humids = [r.humidity for r in window]
pressures = [r.pressure for r in window]
return {
'sensor': f"{reading.emoji} {reading.sensor_id}",
'location': reading.location,
'current': {
'temperature': reading.temperature,
'humidity': reading.humidity,
'pressure': reading.pressure
},
'rolling_avg': {
'temperature': sum(temps) / len(temps),
'humidity': sum(humids) / len(humids),
'pressure': sum(pressures) / len(pressures)
},
'rolling_min': {
'temperature': min(temps),
'humidity': min(humids),
'pressure': min(pressures)
},
'rolling_max': {
'temperature': max(temps),
'humidity': max(humids),
'pressure': max(pressures)
}
}
def generate_alert(self, reading: SensorReading) -> Optional[str]:
"""Create alerts for critical values ๐จ"""
alerts = []
if reading.temperature > 45:
alerts.append(f"๐ฅ CRITICAL: High temperature {reading.temperature}ยฐC")
elif reading.temperature < -20:
alerts.append(f"๐ง CRITICAL: Low temperature {reading.temperature}ยฐC")
if reading.humidity > 95:
alerts.append(f"๐ง CRITICAL: High humidity {reading.humidity}%")
if reading.pressure < 920 or reading.pressure > 1080:
alerts.append(f"๐ช๏ธ CRITICAL: Abnormal pressure {reading.pressure} hPa")
if alerts:
alert_msg = f"{reading.emoji} Sensor {reading.sensor_id} @ {reading.location}:\n"
alert_msg += "\n".join(f" {a}" for a in alerts)
self.alerts.append(alert_msg)
return alert_msg
return None
def process_stream(self, readings: Iterator[SensorReading]):
"""Main processing pipeline ๐๏ธ"""
pipeline = (
LazyPipeline(readings)
.filter(self.is_valid_reading)
.map(self.add_to_window)
.map(lambda r: (r, self.calculate_rolling_stats(r)))
.map(lambda pair: {
'reading': pair[0],
'stats': pair[1],
'alert': self.generate_alert(pair[0])
})
)
return pipeline
# ๐ฎ Simulate sensor stream
def generate_sensor_stream():
"""Generate realistic sensor data ๐ก"""
sensors = [
("TEMP-001", "Factory Floor", "๐ญ"),
("TEMP-002", "Server Room", "๐ฅ๏ธ"),
("TEMP-003", "Warehouse", "๐ฆ"),
("TEMP-004", "Office", "๐ข")
]
while True:
for sensor_id, location, emoji in sensors:
# Generate realistic data with occasional anomalies
base_temp = 20 + random.gauss(0, 5)
if random.random() < 0.1: # 10% chance of anomaly
base_temp += random.choice([-30, 30])
yield SensorReading(
sensor_id=sensor_id,
location=location,
temperature=round(base_temp, 1),
humidity=round(50 + random.gauss(0, 15), 1),
pressure=round(1013 + random.gauss(0, 20), 1),
timestamp=datetime.now(),
emoji=emoji
)
time.sleep(0.1) # Simulate real-time delay
# ๐ Run the processor!
processor = StreamProcessor(window_size=5)
stream = generate_sensor_stream()
# Process first 20 readings
for i, result in enumerate(processor.process_stream(stream).stream()):
if i >= 20:
break
reading = result['reading']
stats = result['stats']
alert = result['alert']
print(f"\n๐ Reading {i+1}:")
print(f" {reading.emoji} {reading.sensor_id} @ {reading.location}")
print(f" ๐ก๏ธ Temp: {reading.temperature}ยฐC")
if stats and 'rolling_avg' in stats:
avg_temp = stats['rolling_avg']['temperature']
print(f" ๐ 5-reading avg: {avg_temp:.1f}ยฐC")
if alert:
print(f" {alert}")
# Show final alerts summary
if processor.alerts:
print("\n๐จ ALERTS SUMMARY:")
for alert in processor.alerts[-5:]: # Last 5 alerts
print(alert)
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Build functional pipelines with confidence ๐ช
- โ Process data lazily for memory efficiency ๐ก๏ธ
- โ Compose complex transformations from simple functions ๐ฏ
- โ Handle errors functionally with Result types ๐
- โ Create real-time stream processors with Python! ๐
Remember: Functional pipelines make your code more modular, testable, and maintainable! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered functional data processing pipelines!
Hereโs what to do next:
- ๐ป Practice with the streaming exercise above
- ๐๏ธ Build a pipeline for your own data processing needs
- ๐ Explore libraries like
toolz
andfn.py
for more functional tools - ๐ Share your functional pipelines with the community!
Remember: Every data engineering expert started with their first pipeline. Keep building, keep learning, and most importantly, have fun! ๐
Happy functional programming! ๐๐โจ