Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to the fascinating world of advanced CSV operations in Python! ๐ Ever wondered how to handle massive CSV files, work with complex data transformations, or build professional data pipelines? Youโre in the right place!
CSV files are everywhere - from Excel exports to database dumps, from sensor data to financial records. While basic CSV reading and writing is straightforward, real-world applications demand more sophisticated techniques. Today, weโll explore powerful strategies that will transform you from a CSV novice into a data-handling wizard! ๐งโโ๏ธ
By the end of this tutorial, youโll confidently handle gigabyte-sized CSV files, perform complex transformations, and build robust data processing pipelines. Letโs embark on this exciting journey! ๐
๐ Understanding Advanced CSV Operations
๐ค What Makes CSV Operations โAdvancedโ?
Think of basic CSV operations like cooking instant noodles ๐ - quick and simple. Advanced CSV operations are like being a master chef ๐จโ๐ณ - you have specialized tools, techniques, and can handle complex recipes with ease!
In Python terms, advanced CSV operations involve:
- โจ Memory-efficient processing of large files
- ๐ High-performance data transformations
- ๐ก๏ธ Robust error handling and data validation
- ๐ฏ Complex filtering and aggregation
- ๐ Integration with data analysis libraries
๐ก Why Master Advanced CSV Operations?
Hereโs why developers need these skills:
- Handle Big Data ๐: Process files that donโt fit in memory
- Performance Optimization โก: Speed up data processing by 10x or more
- Data Quality ๐ก๏ธ: Validate and clean data automatically
- Professional Integration ๐ง: Work seamlessly with pandas, databases, and APIs
Real-world example: Imagine processing sales data from 1000 stores ๐ช. With advanced techniques, you can process millions of records in seconds, validate data integrity, and generate insights automatically!
๐ง Basic Syntax and Usage
๐ DictReader and DictWriter
Letโs start with the foundation of advanced CSV operations:
import csv
from collections import defaultdict
# ๐ Hello, Advanced CSV Operations!
def process_sales_data():
# ๐จ Reading CSV with DictReader
with open('sales.csv', 'r') as file:
reader = csv.DictReader(file)
# ๐ Process each row as a dictionary
total_by_category = defaultdict(float)
for row in reader:
category = row['category'] # ๐ท๏ธ Access by column name
amount = float(row['amount']) # ๐ฐ Convert to number
total_by_category[category] += amount
return dict(total_by_category)
# ๐ฏ Writing with DictWriter
def write_summary(summary_data):
with open('summary.csv', 'w', newline='') as file:
fieldnames = ['category', 'total_sales', 'emoji']
writer = csv.DictWriter(file, fieldnames=fieldnames)
# ๐ Write header
writer.writeheader()
# โจ Write data rows
for category, total in summary_data.items():
writer.writerow({
'category': category,
'total_sales': round(total, 2),
'emoji': get_category_emoji(category) # ๐จ Add fun!
})
def get_category_emoji(category):
emojis = {
'Electronics': '๐ฑ',
'Clothing': '๐',
'Food': '๐',
'Books': '๐'
}
return emojis.get(category, '๐ฆ')
๐ก Explanation: DictReader and DictWriter make CSV handling intuitive by using dictionaries instead of lists. No more counting column indices!
๐ฏ Streaming Large Files
Hereโs how to process huge CSV files efficiently:
import csv
from itertools import islice
# ๐ Memory-efficient CSV processing
def process_large_csv(filename, chunk_size=1000):
"""Process CSV file in chunks to save memory"""
with open(filename, 'r') as file:
reader = csv.DictReader(file)
# ๐ Process in chunks
while True:
# ๐ฆ Get next chunk
chunk = list(islice(reader, chunk_size))
if not chunk:
break # ๐ No more data
# ๐ฏ Process chunk
process_chunk(chunk)
print(f"โ
Processed {len(chunk)} records")
def process_chunk(chunk):
"""Process a chunk of CSV records"""
# ๐ก Your processing logic here
valid_records = []
for record in chunk:
# ๐ก๏ธ Validate data
if validate_record(record):
valid_records.append(record)
else:
print(f"โ ๏ธ Invalid record: {record.get('id', 'unknown')}")
# ๐ Save or process valid records
return valid_records
def validate_record(record):
"""Validate a single record"""
try:
# โ
Check required fields exist
required = ['id', 'date', 'amount']
for field in required:
if not record.get(field):
return False
# ๐ฐ Validate amount is numeric
float(record['amount'])
return True
except ValueError:
return False
๐ก Practical Examples
๐ Example 1: E-commerce Order Processing
Letโs build a real-world order processing system:
import csv
from datetime import datetime
from collections import defaultdict
# ๐๏ธ Advanced order processing system
class OrderProcessor:
def __init__(self):
self.orders = []
self.stats = {
'total_revenue': 0.0,
'orders_by_status': defaultdict(int),
'top_products': defaultdict(int),
'customer_totals': defaultdict(float)
}
# ๐ฅ Load and process orders
def process_orders_file(self, filename):
"""Process orders with validation and statistics"""
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row_num, row in enumerate(reader, start=2): # ๐ Track line numbers
try:
# ๐ก๏ธ Validate and process order
order = self.validate_order(row, row_num)
if order:
self.process_order(order)
print(f"โ
Processed order {order['order_id']}")
except Exception as e:
print(f"โ Error on line {row_num}: {e}")
# ๐ Validate order data
def validate_order(self, row, row_num):
"""Validate order with detailed error checking"""
# ๐ Required fields
required_fields = ['order_id', 'customer_id', 'product', 'quantity', 'price', 'status']
# โ
Check all required fields exist
for field in required_fields:
if not row.get(field):
raise ValueError(f"Missing required field: {field}")
# ๐ข Validate numeric fields
try:
quantity = int(row['quantity'])
price = float(row['price'])
if quantity <= 0:
raise ValueError("Quantity must be positive")
if price < 0:
raise ValueError("Price cannot be negative")
except ValueError as e:
raise ValueError(f"Invalid numeric value: {e}")
# ๐
Validate date if present
if row.get('order_date'):
try:
datetime.strptime(row['order_date'], '%Y-%m-%d')
except ValueError:
raise ValueError(f"Invalid date format: {row['order_date']}")
# โจ Return validated order
return {
'order_id': row['order_id'],
'customer_id': row['customer_id'],
'product': row['product'],
'quantity': quantity,
'price': price,
'total': quantity * price,
'status': row['status'],
'order_date': row.get('order_date', ''),
'emoji': self.get_product_emoji(row['product'])
}
# ๐ฏ Process validated order
def process_order(self, order):
"""Update statistics and store order"""
# ๐ฐ Update revenue
self.stats['total_revenue'] += order['total']
# ๐ Update order status counts
self.stats['orders_by_status'][order['status']] += 1
# ๐ Track top products
self.stats['top_products'][order['product']] += order['quantity']
# ๐ค Track customer spending
self.stats['customer_totals'][order['customer_id']] += order['total']
# ๐ฆ Store order
self.orders.append(order)
# ๐จ Add fun emojis
def get_product_emoji(self, product):
"""Get emoji for product category"""
emojis = {
'laptop': '๐ป', 'phone': '๐ฑ', 'tablet': '๐ฑ',
'shirt': '๐', 'pants': '๐', 'shoes': '๐',
'pizza': '๐', 'burger': '๐', 'coffee': 'โ',
'book': '๐', 'game': '๐ฎ', 'toy': '๐งธ'
}
# ๐ Find matching emoji
product_lower = product.lower()
for key, emoji in emojis.items():
if key in product_lower:
return emoji
return '๐ฆ' # Default package emoji
# ๐ Generate reports
def generate_report(self, output_file):
"""Generate comprehensive CSV report"""
with open(output_file, 'w', newline='', encoding='utf-8') as file:
# ๐ฏ Write summary statistics
writer = csv.writer(file)
writer.writerow(['๐ Order Processing Report'])
writer.writerow([])
writer.writerow(['Metric', 'Value'])
writer.writerow(['Total Revenue ๐ฐ', f"${self.stats['total_revenue']:,.2f}"])
writer.writerow(['Total Orders ๐ฆ', len(self.orders)])
writer.writerow([])
# ๐ Order status breakdown
writer.writerow(['Status', 'Count', 'Percentage'])
total_orders = len(self.orders)
for status, count in self.stats['orders_by_status'].items():
percentage = (count / total_orders * 100) if total_orders > 0 else 0
emoji = self.get_status_emoji(status)
writer.writerow([f"{status} {emoji}", count, f"{percentage:.1f}%"])
# ๐ Top products
writer.writerow([])
writer.writerow(['Top 5 Products ๐'])
writer.writerow(['Product', 'Quantity Sold', 'Emoji'])
top_products = sorted(self.stats['top_products'].items(),
key=lambda x: x[1], reverse=True)[:5]
for product, quantity in top_products:
emoji = self.get_product_emoji(product)
writer.writerow([product, quantity, emoji])
# ๐จ Status emojis
def get_status_emoji(self, status):
status_emojis = {
'pending': 'โณ',
'processing': '๐',
'shipped': '๐ฆ',
'delivered': 'โ
',
'cancelled': 'โ'
}
return status_emojis.get(status.lower(), '๐')
# ๐ฎ Let's use it!
processor = OrderProcessor()
processor.process_orders_file('orders.csv')
processor.generate_report('order_report.csv')
print("๐ Order processing complete!")
๐ฏ Try it yourself: Add a feature to detect and flag potentially fraudulent orders based on unusual patterns!
๐ฎ Example 2: Game Analytics Pipeline
Letโs analyze player data from a mobile game:
import csv
from datetime import datetime, timedelta
import json
# ๐ Advanced game analytics system
class GameAnalytics:
def __init__(self):
self.player_sessions = defaultdict(list)
self.achievements = defaultdict(set)
self.daily_metrics = defaultdict(lambda: {
'active_players': set(),
'total_playtime': 0,
'sessions': 0,
'revenue': 0.0,
'new_players': 0
})
# ๐ฎ Process game event logs
def process_event_stream(self, event_file):
"""Stream process game events for real-time analytics"""
with open(event_file, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
# ๐ Process events in streaming fashion
event_buffer = []
buffer_size = 100
for event in reader:
event_buffer.append(event)
# ๐ฆ Process buffer when full
if len(event_buffer) >= buffer_size:
self.process_event_batch(event_buffer)
event_buffer = []
# ๐งน Process remaining events
if event_buffer:
self.process_event_batch(event_buffer)
# ๐ฏ Process batch of events
def process_event_batch(self, events):
"""Process a batch of game events"""
for event in events:
try:
# ๐จ Parse event
event_data = self.parse_event(event)
# ๐ Route to appropriate handler
if event_data['type'] == 'session_start':
self.handle_session_start(event_data)
elif event_data['type'] == 'session_end':
self.handle_session_end(event_data)
elif event_data['type'] == 'achievement':
self.handle_achievement(event_data)
elif event_data['type'] == 'purchase':
self.handle_purchase(event_data)
except Exception as e:
print(f"โ ๏ธ Error processing event: {e}")
# ๐ Parse and validate event
def parse_event(self, event):
"""Parse event with validation"""
# ๐
Parse timestamp
timestamp = datetime.strptime(event['timestamp'], '%Y-%m-%d %H:%M:%S')
date_key = timestamp.date().isoformat()
return {
'type': event['event_type'],
'player_id': event['player_id'],
'timestamp': timestamp,
'date_key': date_key,
'data': json.loads(event.get('event_data', '{}'))
}
# ๐ฎ Handle session events
def handle_session_start(self, event):
"""Track session start"""
player_id = event['player_id']
# ๐ค Track active player
self.daily_metrics[event['date_key']]['active_players'].add(player_id)
self.daily_metrics[event['date_key']]['sessions'] += 1
# ๐ Check if new player
if event['data'].get('first_session'):
self.daily_metrics[event['date_key']]['new_players'] += 1
print(f"๐ Welcome new player: {player_id}")
# ๐ Store session start
self.player_sessions[player_id].append({
'start': event['timestamp'],
'end': None
})
def handle_session_end(self, event):
"""Track session end and calculate duration"""
player_id = event['player_id']
# ๐ Find matching session
if player_id in self.player_sessions:
sessions = self.player_sessions[player_id]
for session in reversed(sessions):
if session['end'] is None:
session['end'] = event['timestamp']
# โฑ๏ธ Calculate duration
duration = (session['end'] - session['start']).seconds
self.daily_metrics[event['date_key']]['total_playtime'] += duration
print(f"โฑ๏ธ Player {player_id} played for {duration//60} minutes")
break
# ๐ Handle achievements
def handle_achievement(self, event):
"""Track player achievements"""
player_id = event['player_id']
achievement = event['data']['achievement_name']
# ๐ Award achievement
self.achievements[player_id].add(achievement)
# ๐ Special achievements
emoji = self.get_achievement_emoji(achievement)
print(f"{emoji} Player {player_id} earned: {achievement}!")
# ๐ฐ Handle purchases
def handle_purchase(self, event):
"""Track in-game purchases"""
amount = float(event['data']['amount'])
self.daily_metrics[event['date_key']]['revenue'] += amount
print(f"๐ฐ Purchase: ${amount:.2f} from player {event['player_id']}")
# ๐จ Achievement emojis
def get_achievement_emoji(self, achievement):
achievement_emojis = {
'first_win': '๐',
'speed_demon': 'โก',
'collector': '๐ฆ',
'social_butterfly': '๐ฆ',
'marathon': '๐',
'perfectionist': '๐ฏ',
'explorer': '๐บ๏ธ'
}
# ๐ Match achievement type
for key, emoji in achievement_emojis.items():
if key in achievement.lower():
return emoji
return '๐' # Default star
# ๐ Export analytics
def export_analytics(self, output_file):
"""Export comprehensive analytics report"""
with open(output_file, 'w', newline='', encoding='utf-8') as file:
fieldnames = ['date', 'active_players', 'new_players', 'sessions',
'avg_session_minutes', 'revenue', 'emoji']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
# ๐ Write daily metrics
for date, metrics in sorted(self.daily_metrics.items()):
active_count = len(metrics['active_players'])
avg_session = (metrics['total_playtime'] / metrics['sessions'] / 60
if metrics['sessions'] > 0 else 0)
# ๐ฏ Determine day emoji based on performance
if metrics['revenue'] > 1000:
emoji = '๐' # Diamond day!
elif active_count > 1000:
emoji = '๐ฅ' # Hot day!
elif metrics['new_players'] > 100:
emoji = '๐' # Growth day!
else:
emoji = '๐' # Normal day
writer.writerow({
'date': date,
'active_players': active_count,
'new_players': metrics['new_players'],
'sessions': metrics['sessions'],
'avg_session_minutes': round(avg_session, 1),
'revenue': round(metrics['revenue'], 2),
'emoji': emoji
})
# ๐ฎ Run analytics
analytics = GameAnalytics()
analytics.process_event_stream('game_events.csv')
analytics.export_analytics('game_analytics.csv')
print("๐ Game analytics complete!")
๐ Advanced Concepts
๐งโโ๏ธ Custom CSV Dialects
When dealing with non-standard CSV formats:
import csv
# ๐ฏ Define custom CSV dialect
class PipeDelimitedDialect(csv.Dialect):
delimiter = '|'
quotechar = '"'
doublequote = True
skipinitialspace = True
lineterminator = '\n'
quoting = csv.QUOTE_MINIMAL
# ๐ Register the dialect
csv.register_dialect('pipe', PipeDelimitedDialect)
# ๐ Use custom dialect
def process_pipe_delimited(filename):
"""Process pipe-delimited files"""
with open(filename, 'r') as file:
reader = csv.DictReader(file, dialect='pipe')
for row in reader:
# โจ Process normally!
print(f"Processing: {row}")
# ๐จ Alternative: inline dialect specification
def process_custom_format(filename):
"""Process with inline dialect settings"""
with open(filename, 'r') as file:
reader = csv.DictReader(
file,
delimiter=';', # ๐ฏ Semicolon delimiter
quotechar="'", # ๐ Single quotes
escapechar='\\' # ๐ก๏ธ Backslash escape
)
for row in reader:
process_row(row)
๐๏ธ CSV to Database Pipeline
For production-grade data pipelines:
import csv
import sqlite3
from contextlib import contextmanager
# ๐ High-performance CSV to database loader
class CSVDatabaseLoader:
def __init__(self, db_path):
self.db_path = db_path
self.batch_size = 1000
self.total_loaded = 0
@contextmanager
def get_connection(self):
"""Database connection context manager"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
conn.close()
# ๐ Load CSV to database
def load_csv_to_table(self, csv_file, table_name, create_table=True):
"""Load CSV data into SQLite table"""
with open(csv_file, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
# ๐๏ธ Create table if needed
if create_table:
first_row = next(reader)
self.create_table_from_row(table_name, first_row)
# ๐ Reset reader
file.seek(0)
next(reader) # Skip header
# ๐ฆ Process in batches
with self.get_connection() as conn:
batch = []
for row in reader:
batch.append(row)
if len(batch) >= self.batch_size:
self.insert_batch(conn, table_name, batch)
batch = []
print(f"โ
Loaded {self.total_loaded} records...")
# ๐งน Insert remaining records
if batch:
self.insert_batch(conn, table_name, batch)
print(f"๐ Successfully loaded {self.total_loaded} records!")
# ๐๏ธ Create table dynamically
def create_table_from_row(self, table_name, sample_row):
"""Create table based on CSV columns"""
with self.get_connection() as conn:
# ๐ฏ Build CREATE TABLE statement
columns = []
for key, value in sample_row.items():
# ๐ Infer column type
col_type = self.infer_type(value)
columns.append(f"{key} {col_type}")
create_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{', '.join(columns)}
)
"""
conn.execute(create_sql)
conn.commit()
print(f"๐๏ธ Created table: {table_name}")
# ๐ Type inference
def infer_type(self, value):
"""Infer SQL type from value"""
try:
int(value)
return "INTEGER"
except ValueError:
try:
float(value)
return "REAL"
except ValueError:
return "TEXT"
# ๐ฆ Batch insert
def insert_batch(self, conn, table_name, batch):
"""Insert batch of records"""
if not batch:
return
# ๐ฏ Prepare insert statement
columns = list(batch[0].keys())
placeholders = ', '.join(['?' for _ in columns])
insert_sql = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES ({placeholders})
"""
# ๐ Execute batch insert
values = [[row.get(col, '') for col in columns] for row in batch]
conn.executemany(insert_sql, values)
conn.commit()
self.total_loaded += len(batch)
# ๐ฎ Usage example
loader = CSVDatabaseLoader('analytics.db')
loader.load_csv_to_table('large_dataset.csv', 'sales_data')
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Memory Overload
# โ Wrong way - loads entire file into memory!
def bad_process_csv(filename):
with open(filename, 'r') as file:
all_data = list(csv.DictReader(file)) # ๐ฅ Boom for large files!
for row in all_data:
process_row(row)
# โ
Correct way - stream processing!
def good_process_csv(filename):
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader: # ๐ Streams one row at a time
process_row(row)
print("โจ Processed without loading entire file!")
๐คฏ Pitfall 2: Encoding Issues
# โ Dangerous - assumes UTF-8!
def risky_read_csv(filename):
with open(filename, 'r') as file: # ๐ฅ May fail on special characters!
reader = csv.DictReader(file)
return list(reader)
# โ
Safe - handles encodings properly!
def safe_read_csv(filename):
# ๐ก๏ธ Try different encodings
encodings = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(filename, 'r', encoding=encoding) as file:
reader = csv.DictReader(file)
data = list(reader)
print(f"โ
Successfully read with {encoding} encoding")
return data
except UnicodeDecodeError:
continue
print("โ Could not read file with any encoding")
return []
๐ ๏ธ Best Practices
- ๐ฏ Use DictReader/DictWriter: More readable and maintainable than index-based access
- ๐ Stream Large Files: Process row by row instead of loading everything
- ๐ก๏ธ Validate Everything: Never trust input data - validate types and ranges
- โก Batch Operations: Group database inserts and API calls for performance
- โจ Handle Encodings: Always specify encoding and have fallbacks
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Data Quality Analyzer
Create a comprehensive CSV data quality analyzer:
๐ Requirements:
- โ Detect data types for each column automatically
- ๐ท๏ธ Find missing values and calculate completeness percentage
- ๐ค Identify duplicate records
- ๐ Validate date formats and ranges
- ๐จ Generate a quality report with emojis!
๐ Bonus Points:
- Add outlier detection for numeric columns
- Suggest data cleaning operations
- Export quality metrics to a new CSV
๐ก Solution
๐ Click to see solution
import csv
from collections import defaultdict
from datetime import datetime
import re
import statistics
# ๐ฏ Comprehensive data quality analyzer!
class DataQualityAnalyzer:
def __init__(self):
self.column_stats = defaultdict(lambda: {
'total_rows': 0,
'non_empty_rows': 0,
'unique_values': set(),
'data_types': defaultdict(int),
'numeric_values': [],
'date_formats': defaultdict(int),
'max_length': 0
})
self.duplicate_rows = []
self.seen_rows = set()
# ๐ Analyze CSV file
def analyze_csv(self, filename):
"""Perform comprehensive data quality analysis"""
print("๐ Starting data quality analysis...")
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
row_count = 0
for row_num, row in enumerate(reader, start=2):
row_count += 1
# ๐ Check for duplicates
row_tuple = tuple(row.values())
if row_tuple in self.seen_rows:
self.duplicate_rows.append(row_num)
else:
self.seen_rows.add(row_tuple)
# ๐ Analyze each column
for column, value in row.items():
self.analyze_column_value(column, value)
# ๐ Progress update
if row_count % 1000 == 0:
print(f"โ
Analyzed {row_count} rows...")
print(f"๐ Analysis complete! Analyzed {row_count} total rows")
return self.generate_report()
# ๐ Analyze individual column value
def analyze_column_value(self, column, value):
"""Analyze a single column value"""
stats = self.column_stats[column]
stats['total_rows'] += 1
# ๐ Check if value is non-empty
if value and value.strip():
stats['non_empty_rows'] += 1
stats['unique_values'].add(value)
stats['max_length'] = max(stats['max_length'], len(value))
# ๐ฏ Detect data type
data_type = self.detect_data_type(value)
stats['data_types'][data_type] += 1
# ๐ Collect numeric values
if data_type == 'numeric':
try:
stats['numeric_values'].append(float(value))
except ValueError:
pass
# ๐
Detect date format
elif data_type == 'date':
date_format = self.detect_date_format(value)
if date_format:
stats['date_formats'][date_format] += 1
# ๐ฏ Detect data type
def detect_data_type(self, value):
"""Detect the data type of a value"""
# ๐ข Check numeric
if re.match(r'^-?\d+$', value):
return 'integer'
elif re.match(r'^-?\d+\.\d+$', value):
return 'float'
elif self.detect_data_type_numeric(value):
return 'numeric'
# ๐
Check date
elif self.is_date(value):
return 'date'
# ๐ง Check email
elif re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', value):
return 'email'
# ๐ฑ Check phone
elif re.match(r'^[\d\s\-\+\(\)]+$', value) and len(value) >= 10:
return 'phone'
# ๐ Check URL
elif value.startswith(('http://', 'https://', 'www.')):
return 'url'
# ๐ Default to text
else:
return 'text'
def detect_data_type_numeric(self, value):
"""Check if value can be converted to numeric"""
try:
float(value.replace(',', ''))
return True
except ValueError:
return False
# ๐
Detect date format
def detect_date_format(self, value):
"""Detect common date formats"""
date_formats = [
('%Y-%m-%d', 'YYYY-MM-DD'),
('%d/%m/%Y', 'DD/MM/YYYY'),
('%m/%d/%Y', 'MM/DD/YYYY'),
('%Y/%m/%d', 'YYYY/MM/DD'),
('%d-%m-%Y', 'DD-MM-YYYY'),
('%m-%d-%Y', 'MM-DD-YYYY'),
('%Y%m%d', 'YYYYMMDD'),
('%d %b %Y', 'DD Mon YYYY'),
('%d %B %Y', 'DD Month YYYY')
]
for fmt, name in date_formats:
try:
datetime.strptime(value, fmt)
return name
except ValueError:
continue
return None
# ๐
Check if value is a date
def is_date(self, value):
"""Check if value is likely a date"""
return self.detect_date_format(value) is not None
# ๐ Generate quality report
def generate_report(self):
"""Generate comprehensive quality report"""
report = {
'summary': {},
'columns': {},
'issues': []
}
# ๐ Overall summary
total_columns = len(self.column_stats)
report['summary'] = {
'total_columns': total_columns,
'duplicate_rows': len(self.duplicate_rows),
'quality_score': 0 # Calculate below
}
quality_scores = []
# ๐ Analyze each column
for column, stats in self.column_stats.items():
completeness = (stats['non_empty_rows'] / stats['total_rows'] * 100
if stats['total_rows'] > 0 else 0)
uniqueness = (len(stats['unique_values']) / stats['non_empty_rows'] * 100
if stats['non_empty_rows'] > 0 else 0)
# ๐ฏ Determine primary data type
if stats['data_types']:
primary_type = max(stats['data_types'].items(), key=lambda x: x[1])
primary_type_name = primary_type[0]
type_consistency = (primary_type[1] / stats['non_empty_rows'] * 100
if stats['non_empty_rows'] > 0 else 0)
else:
primary_type_name = 'empty'
type_consistency = 0
# ๐ Calculate statistics for numeric columns
numeric_stats = {}
if stats['numeric_values'] and len(stats['numeric_values']) > 1:
numeric_stats = {
'min': min(stats['numeric_values']),
'max': max(stats['numeric_values']),
'mean': statistics.mean(stats['numeric_values']),
'median': statistics.median(stats['numeric_values']),
'std_dev': statistics.stdev(stats['numeric_values'])
}
# ๐จ Assign quality emoji
if completeness >= 95 and type_consistency >= 95:
quality_emoji = 'โ
'
quality_scores.append(100)
elif completeness >= 80 and type_consistency >= 80:
quality_emoji = 'โก'
quality_scores.append(80)
elif completeness >= 60:
quality_emoji = 'โ ๏ธ'
quality_scores.append(60)
else:
quality_emoji = 'โ'
quality_scores.append(40)
# ๐ Column report
report['columns'][column] = {
'completeness': round(completeness, 2),
'uniqueness': round(uniqueness, 2),
'primary_type': primary_type_name,
'type_consistency': round(type_consistency, 2),
'unique_count': len(stats['unique_values']),
'max_length': stats['max_length'],
'quality_emoji': quality_emoji,
'numeric_stats': numeric_stats
}
# ๐จ Identify issues
if completeness < 80:
report['issues'].append(f"โ ๏ธ Column '{column}' has low completeness: {completeness:.1f}%")
if type_consistency < 80 and stats['non_empty_rows'] > 0:
report['issues'].append(f"โ ๏ธ Column '{column}' has mixed data types")
# ๐ Calculate overall quality score
if quality_scores:
report['summary']['quality_score'] = round(statistics.mean(quality_scores), 1)
# ๐ฏ Add quality emoji to summary
score = report['summary']['quality_score']
if score >= 90:
report['summary']['quality_emoji'] = '๐'
elif score >= 75:
report['summary']['quality_emoji'] = 'โ
'
elif score >= 60:
report['summary']['quality_emoji'] = 'โก'
else:
report['summary']['quality_emoji'] = 'โ ๏ธ'
return report
# ๐ Export report to CSV
def export_report(self, report, output_file):
"""Export quality report to CSV"""
with open(output_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# ๐ Write summary
writer.writerow(['๐ Data Quality Report'])
writer.writerow([])
writer.writerow(['Overall Quality Score', f"{report['summary']['quality_score']}% {report['summary']['quality_emoji']}"])
writer.writerow(['Total Columns', report['summary']['total_columns']])
writer.writerow(['Duplicate Rows', report['summary']['duplicate_rows']])
writer.writerow([])
# ๐ Write column analysis
writer.writerow(['Column', 'Completeness %', 'Type', 'Consistency %', 'Unique Values', 'Quality'])
for column, stats in report['columns'].items():
writer.writerow([
column,
f"{stats['completeness']}%",
stats['primary_type'],
f"{stats['type_consistency']}%",
stats['unique_count'],
stats['quality_emoji']
])
# ๐จ Write issues
if report['issues']:
writer.writerow([])
writer.writerow(['๐จ Quality Issues'])
for issue in report['issues']:
writer.writerow([issue])
print(f"๐ Report exported to {output_file}")
# ๐ฎ Test the analyzer!
analyzer = DataQualityAnalyzer()
report = analyzer.analyze_csv('sample_data.csv')
analyzer.export_report(report, 'quality_report.csv')
# ๐ Print summary
print(f"\n๐ฏ Overall Quality Score: {report['summary']['quality_score']}% {report['summary']['quality_emoji']}")
print(f"๐ Columns analyzed: {report['summary']['total_columns']}")
print(f"๐ Duplicate rows found: {report['summary']['duplicate_rows']}")
if report['issues']:
print("\n๐จ Top Issues:")
for issue in report['issues'][:5]:
print(f" {issue}")
๐ Key Takeaways
Youโve mastered advanced CSV operations! Hereโs what you can now do:
- โ Process gigabyte-sized CSV files efficiently without memory issues ๐ช
- โ Validate and clean data automatically with robust error handling ๐ก๏ธ
- โ Build production-grade pipelines for data processing ๐ฏ
- โ Analyze data quality and generate comprehensive reports ๐
- โ Handle any CSV format with custom dialects and encodings! ๐
Remember: CSV files might seem simple, but with these advanced techniques, you can handle any data challenge that comes your way! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve become a CSV processing expert!
Hereโs what to do next:
- ๐ป Practice with the data quality analyzer exercise
- ๐๏ธ Build a CSV processing pipeline for your own data
- ๐ Move on to our next tutorial: Working with JSON Files
- ๐ Share your CSV processing success stories!
Remember: Every data scientist and engineer started where you are now. Keep practicing, keep learning, and most importantly, have fun with data! ๐
Happy data processing! ๐๐โจ