+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 341 of 365

๐Ÿš€ Concurrency Project: Web Scraper

Master concurrency project: web scraper in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on building a concurrent web scraper! ๐ŸŽ‰ In this guide, weโ€™ll explore how to harness the power of Pythonโ€™s concurrency tools to build a blazing-fast web scraper that can handle multiple websites simultaneously.

Youโ€™ll discover how concurrency can transform your web scraping from a slow, sequential process into a lightning-fast parallel operation. Whether youโ€™re collecting data for analysis ๐Ÿ“Š, monitoring prices ๐Ÿ’ฐ, or gathering research information ๐Ÿ“š, understanding concurrent web scraping is essential for handling real-world data collection at scale.

By the end of this tutorial, youโ€™ll have built a production-ready concurrent web scraper that you can adapt for your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Concurrent Web Scraping

๐Ÿค” What is Concurrent Web Scraping?

Concurrent web scraping is like having multiple shopping assistants ๐Ÿ›’. Instead of one person visiting each store sequentially, you send multiple assistants to different stores simultaneously, dramatically reducing your total shopping time!

In Python terms, concurrent web scraping allows you to fetch data from multiple URLs at the same time using threading, asyncio, or multiprocessing. This means you can:

  • โœจ Scrape 100 pages in the time it takes to scrape 10
  • ๐Ÿš€ Handle I/O-bound operations efficiently
  • ๐Ÿ›ก๏ธ Build resilient scrapers with proper error handling

๐Ÿ’ก Why Use Concurrent Web Scraping?

Hereโ€™s why developers love concurrent scrapers:

  1. Speed โšก: Scrape multiple pages simultaneously
  2. Efficiency ๐Ÿ’ป: Better resource utilization
  3. Scalability ๐Ÿ“ˆ: Handle large-scale data collection
  4. Responsiveness ๐Ÿ”„: Non-blocking operations

Real-world example: Imagine monitoring prices on 1000 products across 10 e-commerce sites ๐Ÿ›๏ธ. With sequential scraping, this might take hours. With concurrency, you can do it in minutes!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Threading Approach

Letโ€™s start with a simple threaded scraper:

import threading
import requests
from bs4 import BeautifulSoup
import time
from queue import Queue

# ๐Ÿ‘‹ Hello, concurrent scraping!
class ThreadedScraper:
    def __init__(self, num_threads=5):
        self.num_threads = num_threads
        self.url_queue = Queue()
        self.results = []
        self.lock = threading.Lock()
    
    def fetch_url(self):
        """๐Ÿ”„ Worker thread function"""
        while True:
            url = self.url_queue.get()
            if url is None:  # ๐Ÿ›‘ Poison pill
                break
            
            try:
                # ๐ŸŒ Fetch the page
                response = requests.get(url, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')
                
                # ๐ŸŽฏ Extract data (example: page title)
                title = soup.find('title').text if soup.find('title') else 'No title'
                
                # ๐Ÿ”’ Thread-safe result storage
                with self.lock:
                    self.results.append({
                        'url': url,
                        'title': title,
                        'status': response.status_code
                    })
                print(f"โœ… Scraped: {url}")
                
            except Exception as e:
                print(f"โŒ Error scraping {url}: {e}")
            
            finally:
                self.url_queue.task_done()

๐Ÿ’ก Explanation: We use a Queue for thread-safe URL distribution and a Lock to protect shared data. Each thread pulls URLs from the queue and processes them independently!

๐ŸŽฏ AsyncIO Approach

Hereโ€™s the modern async approach:

import asyncio
import aiohttp
from bs4 import BeautifulSoup

# ๐Ÿš€ Async scraper for maximum speed!
class AsyncScraper:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
    
    async def fetch_url(self, session, url):
        """โšก Async fetch with rate limiting"""
        async with self.semaphore:  # ๐Ÿšฆ Control concurrency
            try:
                async with session.get(url) as response:
                    html = await response.text()
                    soup = BeautifulSoup(html, 'html.parser')
                    
                    # ๐ŸŽจ Extract data
                    title = soup.find('title').text if soup.find('title') else 'No title'
                    
                    result = {
                        'url': url,
                        'title': title,
                        'status': response.status
                    }
                    self.results.append(result)
                    print(f"โœจ Async scraped: {url}")
                    return result
                    
            except Exception as e:
                print(f"๐Ÿ’ฅ Error: {url} - {e}")
                return {'url': url, 'error': str(e)}
    
    async def scrape_all(self, urls):
        """๐ŸŽฏ Scrape all URLs concurrently"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_url(session, url) for url in urls]
            return await asyncio.gather(*tasks)

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: E-commerce Price Monitor

Letโ€™s build a real price monitoring system:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from datetime import datetime
import json

# ๐Ÿ›๏ธ Price monitoring scraper
class PriceMonitor:
    def __init__(self):
        self.products = []
        self.price_history = {}
    
    async def scrape_product(self, session, product_info):
        """๐Ÿ’ฐ Scrape product price"""
        url = product_info['url']
        selector = product_info['price_selector']
        
        try:
            async with session.get(url) as response:
                html = await response.text()
                soup = BeautifulSoup(html, 'html.parser')
                
                # ๐ŸŽฏ Extract price
                price_element = soup.select_one(selector)
                if price_element:
                    price_text = price_element.text.strip()
                    # ๐Ÿ’ต Clean price (remove $, commas, etc.)
                    price = float(''.join(c for c in price_text if c.isdigit() or c == '.'))
                    
                    # ๐Ÿ“Š Track price history
                    product_id = product_info['id']
                    if product_id not in self.price_history:
                        self.price_history[product_id] = []
                    
                    self.price_history[product_id].append({
                        'price': price,
                        'timestamp': datetime.now().isoformat(),
                        'name': product_info['name']
                    })
                    
                    # ๐Ÿ”” Check for price drops
                    if len(self.price_history[product_id]) > 1:
                        prev_price = self.price_history[product_id][-2]['price']
                        if price < prev_price:
                            print(f"๐ŸŽ‰ PRICE DROP! {product_info['name']}: ${prev_price} โ†’ ${price}")
                    
                    return {'product': product_info['name'], 'price': price, 'status': 'success'}
                
        except Exception as e:
            print(f"โŒ Error monitoring {product_info['name']}: {e}")
            return {'product': product_info['name'], 'error': str(e)}
    
    async def monitor_all_prices(self, products):
        """๐Ÿ“Š Monitor all product prices"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.scrape_product(session, product) for product in products]
            results = await asyncio.gather(*tasks)
            
            # ๐Ÿ“ˆ Generate report
            print("\n๐Ÿ“Š Price Monitoring Report:")
            print("=" * 50)
            for result in results:
                if 'price' in result:
                    print(f"โœ… {result['product']}: ${result['price']}")
                else:
                    print(f"โŒ {result['product']}: Failed")
            
            return results

# ๐ŸŽฎ Let's use it!
async def main():
    monitor = PriceMonitor()
    
    # ๐Ÿ›๏ธ Products to monitor
    products = [
        {
            'id': 'laptop-1',
            'name': 'Gaming Laptop',
            'url': 'https://example.com/laptop',
            'price_selector': '.price-now'
        },
        {
            'id': 'phone-1',
            'name': 'Smartphone',
            'url': 'https://example.com/phone',
            'price_selector': '.product-price'
        }
    ]
    
    # ๐Ÿš€ Start monitoring
    await monitor.monitor_all_prices(products)

๐ŸŽฎ Example 2: News Aggregator

Letโ€™s build a concurrent news scraper:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from datetime import datetime
import hashlib

# ๐Ÿ“ฐ News aggregator with duplicate detection
class NewsAggregator:
    def __init__(self):
        self.articles = []
        self.seen_hashes = set()  # ๐Ÿ” Duplicate detection
    
    def generate_hash(self, title, url):
        """๐Ÿ” Generate unique hash for article"""
        content = f"{title}{url}".encode('utf-8')
        return hashlib.md5(content).hexdigest()
    
    async def scrape_news_site(self, session, site_config):
        """๐Ÿ“ฐ Scrape articles from a news site"""
        site_name = site_config['name']
        url = site_config['url']
        
        try:
            async with session.get(url) as response:
                html = await response.text()
                soup = BeautifulSoup(html, 'html.parser')
                
                # ๐ŸŽฏ Find articles
                articles = soup.select(site_config['article_selector'])
                scraped_count = 0
                
                for article in articles[:10]:  # ๐Ÿ“Š Limit to 10 per site
                    # ๐Ÿ“ Extract article data
                    title_elem = article.select_one(site_config['title_selector'])
                    link_elem = article.select_one(site_config['link_selector'])
                    
                    if title_elem and link_elem:
                        title = title_elem.text.strip()
                        link = link_elem.get('href', '')
                        
                        # ๐Ÿ”— Make absolute URL
                        if link.startswith('/'):
                            link = f"{site_config['base_url']}{link}"
                        
                        # ๐Ÿ” Check for duplicates
                        article_hash = self.generate_hash(title, link)
                        if article_hash not in self.seen_hashes:
                            self.seen_hashes.add(article_hash)
                            
                            article_data = {
                                'title': title,
                                'url': link,
                                'source': site_name,
                                'scraped_at': datetime.now().isoformat(),
                                'emoji': site_config['emoji']
                            }
                            
                            self.articles.append(article_data)
                            scraped_count += 1
                
                print(f"{site_config['emoji']} Scraped {scraped_count} articles from {site_name}")
                return {'site': site_name, 'count': scraped_count}
                
        except Exception as e:
            print(f"โŒ Error scraping {site_name}: {e}")
            return {'site': site_name, 'error': str(e)}
    
    async def aggregate_news(self, news_sites):
        """๐ŸŒ Aggregate news from multiple sites"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.scrape_news_site(session, site) for site in news_sites]
            results = await asyncio.gather(*tasks)
            
            # ๐Ÿ“Š Sort by time and display
            self.articles.sort(key=lambda x: x['scraped_at'], reverse=True)
            
            print(f"\n๐Ÿ“ฐ Aggregated {len(self.articles)} unique articles!")
            print("=" * 60)
            
            for article in self.articles[:20]:  # ๐Ÿ“‹ Show top 20
                print(f"{article['emoji']} [{article['source']}] {article['title'][:60]}...")
            
            return self.articles

# ๐ŸŽฎ Example configuration
news_sites = [
    {
        'name': 'TechNews',
        'url': 'https://example-tech.com',
        'base_url': 'https://example-tech.com',
        'article_selector': '.article-item',
        'title_selector': 'h2',
        'link_selector': 'a',
        'emoji': '๐Ÿ’ป'
    },
    {
        'name': 'ScienceDaily',
        'url': 'https://example-science.com',
        'base_url': 'https://example-science.com',
        'article_selector': '.news-item',
        'title_selector': '.headline',
        'link_selector': 'a.read-more',
        'emoji': '๐Ÿ”ฌ'
    }
]

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Intelligent Rate Limiting

When youโ€™re ready to level up, implement smart rate limiting:

import asyncio
from asyncio import Queue
import time

# ๐ŸŽฏ Advanced rate limiter with burst support
class RateLimiter:
    def __init__(self, rate=10, per=1.0, burst=20):
        self.rate = rate  # ๐Ÿ“Š Requests per period
        self.per = per    # โฑ๏ธ Time period in seconds
        self.burst = burst  # ๐Ÿš€ Burst capacity
        self.tokens = burst
        self.updated_at = time.monotonic()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        """๐ŸŽซ Acquire permission to make request"""
        async with self.lock:
            while self.tokens <= 0:
                # ๐Ÿ”„ Refill tokens
                now = time.monotonic()
                elapsed = now - self.updated_at
                self.tokens += elapsed * (self.rate / self.per)
                self.tokens = min(self.tokens, self.burst)
                self.updated_at = now
                
                if self.tokens <= 0:
                    # ๐Ÿ˜ด Wait for tokens
                    sleep_time = (1 - self.tokens) * (self.per / self.rate)
                    await asyncio.sleep(sleep_time)
            
            self.tokens -= 1

# ๐Ÿช„ Using the rate limiter
class SmartScraper:
    def __init__(self):
        # ๐ŸŽฏ Different rate limits per domain
        self.rate_limiters = {
            'fast-site.com': RateLimiter(rate=50, per=1.0),
            'slow-site.com': RateLimiter(rate=5, per=1.0),
            'default': RateLimiter(rate=10, per=1.0)
        }
    
    def get_rate_limiter(self, url):
        """๐Ÿ” Get appropriate rate limiter for URL"""
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        return self.rate_limiters.get(domain, self.rate_limiters['default'])

๐Ÿ—๏ธ Advanced Topic 2: Distributed Scraping

For the brave developers, hereโ€™s distributed scraping:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from functools import partial

# ๐Ÿš€ Distributed scraper with process pool
class DistributedScraper:
    def __init__(self, num_processes=None):
        self.num_processes = num_processes or mp.cpu_count()
        self.results_queue = mp.Queue()
    
    @staticmethod
    def scrape_batch(urls, batch_id):
        """๐Ÿ”ง Scrape a batch of URLs in a separate process"""
        import requests
        from bs4 import BeautifulSoup
        
        results = []
        for url in urls:
            try:
                response = requests.get(url, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')
                
                # ๐ŸŽจ Extract data
                data = {
                    'url': url,
                    'title': soup.find('title').text if soup.find('title') else '',
                    'batch_id': batch_id,
                    'process_id': mp.current_process().pid
                }
                results.append(data)
                
            except Exception as e:
                results.append({'url': url, 'error': str(e), 'batch_id': batch_id})
        
        return results
    
    async def scrape_distributed(self, urls):
        """๐ŸŒ Distribute scraping across processes"""
        # ๐Ÿ“Š Split URLs into batches
        batch_size = len(urls) // self.num_processes + 1
        batches = [urls[i:i+batch_size] for i in range(0, len(urls), batch_size)]
        
        # ๐Ÿš€ Create process pool
        loop = asyncio.get_event_loop()
        with ProcessPoolExecutor(max_workers=self.num_processes) as executor:
            # ๐ŸŽฏ Submit batches to processes
            futures = []
            for i, batch in enumerate(batches):
                future = loop.run_in_executor(
                    executor,
                    partial(self.scrape_batch, batch, i)
                )
                futures.append(future)
            
            # ๐Ÿ”„ Gather results
            results = await asyncio.gather(*futures)
            
            # ๐Ÿ“ˆ Flatten results
            all_results = []
            for batch_results in results:
                all_results.extend(batch_results)
            
            print(f"๐ŸŽ‰ Scraped {len(all_results)} URLs using {self.num_processes} processes!")
            return all_results

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: No Error Handling

# โŒ Wrong way - crashes on first error!
async def bad_scraper(urls):
    async with aiohttp.ClientSession() as session:
        results = []
        for url in urls:
            response = await session.get(url)  # ๐Ÿ’ฅ Crashes if site is down!
            results.append(await response.text())
        return results

# โœ… Correct way - graceful error handling!
async def good_scraper(urls):
    async with aiohttp.ClientSession() as session:
        results = []
        for url in urls:
            try:
                response = await session.get(url, timeout=10)
                if response.status == 200:
                    results.append({
                        'url': url,
                        'content': await response.text(),
                        'status': 'success'
                    })
                else:
                    results.append({
                        'url': url,
                        'status': f'HTTP {response.status}'
                    })
            except asyncio.TimeoutError:
                results.append({'url': url, 'error': 'Timeout โฑ๏ธ'})
            except Exception as e:
                results.append({'url': url, 'error': str(e)})
        
        return results

๐Ÿคฏ Pitfall 2: Overwhelming Target Servers

# โŒ Dangerous - DDoS-like behavior!
async def aggressive_scraper(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:  # ๐Ÿ’ฅ 1000 simultaneous requests!
            tasks.append(session.get(url))
        return await asyncio.gather(*tasks)

# โœ… Respectful - controlled concurrency!
async def polite_scraper(urls, max_concurrent=5):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_limit(session, url):
        async with semaphore:  # ๐Ÿšฆ Limit concurrent requests
            await asyncio.sleep(0.1)  # ๐Ÿ˜ด Small delay
            return await session.get(url)
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url) for url in urls]
        return await asyncio.gather(*tasks)

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Respect robots.txt: Always check and follow site rules
  2. โฑ๏ธ Implement delays: Add reasonable delays between requests
  3. ๐Ÿ›ก๏ธ Handle errors gracefully: Expect and handle failures
  4. ๐Ÿ“Š Monitor performance: Track success rates and timing
  5. ๐Ÿ”„ Use retry logic: Implement exponential backoff
  6. ๐Ÿ’พ Cache when possible: Avoid re-scraping unchanged data
  7. ๐Ÿ”’ Respect rate limits: Donโ€™t overwhelm servers

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Job Listing Aggregator

Create a concurrent scraper that aggregates job listings:

๐Ÿ“‹ Requirements:

  • โœ… Scrape from at least 3 different job sites
  • ๐Ÿท๏ธ Extract job title, company, location, and salary
  • ๐Ÿ” Remove duplicate listings
  • ๐Ÿ“Š Generate statistics (jobs per location, salary ranges)
  • ๐Ÿ’พ Save results to JSON with timestamps
  • ๐ŸŽจ Each job site needs its own emoji!

๐Ÿš€ Bonus Points:

  • Add keyword filtering
  • Implement resume matching score
  • Create email alerts for new matches
  • Build a simple web interface

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from datetime import datetime
import json
import re

# ๐ŸŽฏ Job aggregator with all features!
class JobAggregator:
    def __init__(self):
        self.jobs = []
        self.seen_jobs = set()  # ๐Ÿ” Duplicate detection
        self.stats = {
            'by_location': {},
            'by_company': {},
            'salary_ranges': []
        }
    
    def extract_salary(self, salary_text):
        """๐Ÿ’ฐ Extract salary numbers from text"""
        if not salary_text:
            return None
        
        # ๐Ÿ” Find salary patterns ($XX,XXX - $XX,XXX)
        numbers = re.findall(r'\$?(\d{1,3},?\d{3,})', salary_text)
        if numbers:
            cleaned = [int(n.replace(',', '')) for n in numbers]
            return {
                'min': min(cleaned),
                'max': max(cleaned),
                'text': salary_text
            }
        return None
    
    def job_hash(self, job):
        """๐Ÿ” Generate unique hash for job"""
        key = f"{job['title']}{job['company']}{job['location']}"
        return hash(key)
    
    async def scrape_job_site(self, session, site_config):
        """๐Ÿ’ผ Scrape jobs from a site"""
        site_name = site_config['name']
        
        try:
            async with session.get(site_config['url']) as response:
                html = await response.text()
                soup = BeautifulSoup(html, 'html.parser')
                
                jobs_found = 0
                job_elements = soup.select(site_config['job_selector'])
                
                for job_elem in job_elements[:20]:  # ๐Ÿ“Š Limit per site
                    try:
                        # ๐Ÿ“ Extract job details
                        title = job_elem.select_one(site_config['title_selector'])
                        company = job_elem.select_one(site_config['company_selector'])
                        location = job_elem.select_one(site_config['location_selector'])
                        salary = job_elem.select_one(site_config['salary_selector'])
                        
                        if title and company:
                            job_data = {
                                'title': title.text.strip(),
                                'company': company.text.strip(),
                                'location': location.text.strip() if location else 'Remote',
                                'salary': self.extract_salary(salary.text if salary else ''),
                                'source': site_name,
                                'emoji': site_config['emoji'],
                                'scraped_at': datetime.now().isoformat()
                            }
                            
                            # ๐Ÿ” Check for duplicates
                            job_id = self.job_hash(job_data)
                            if job_id not in self.seen_jobs:
                                self.seen_jobs.add(job_id)
                                self.jobs.append(job_data)
                                jobs_found += 1
                                
                                # ๐Ÿ“Š Update statistics
                                loc = job_data['location']
                                self.stats['by_location'][loc] = self.stats['by_location'].get(loc, 0) + 1
                                
                                comp = job_data['company']
                                self.stats['by_company'][comp] = self.stats['by_company'].get(comp, 0) + 1
                                
                                if job_data['salary']:
                                    self.stats['salary_ranges'].append(job_data['salary'])
                    
                    except Exception as e:
                        print(f"โš ๏ธ Error parsing job: {e}")
                        continue
                
                print(f"{site_config['emoji']} Found {jobs_found} jobs on {site_name}")
                return {'site': site_name, 'count': jobs_found}
                
        except Exception as e:
            print(f"โŒ Error scraping {site_name}: {e}")
            return {'site': site_name, 'error': str(e)}
    
    async def aggregate_jobs(self, job_sites):
        """๐ŸŒ Aggregate jobs from all sites"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.scrape_job_site(session, site) for site in job_sites]
            results = await asyncio.gather(*tasks)
            
            # ๐Ÿ“Š Generate report
            print(f"\n๐Ÿ’ผ Job Aggregation Complete!")
            print(f"๐Ÿ“Š Total unique jobs: {len(self.jobs)}")
            print(f"\n๐Ÿ“ Top locations:")
            for loc, count in sorted(self.stats['by_location'].items(), 
                                   key=lambda x: x[1], reverse=True)[:5]:
                print(f"  {loc}: {count} jobs")
            
            # ๐Ÿ’ฐ Salary analysis
            if self.stats['salary_ranges']:
                avg_min = sum(s['min'] for s in self.stats['salary_ranges']) / len(self.stats['salary_ranges'])
                avg_max = sum(s['max'] for s in self.stats['salary_ranges']) / len(self.stats['salary_ranges'])
                print(f"\n๐Ÿ’ฐ Average salary range: ${avg_min:,.0f} - ${avg_max:,.0f}")
            
            # ๐Ÿ’พ Save to file
            self.save_results()
            
            return self.jobs
    
    def save_results(self):
        """๐Ÿ’พ Save results to JSON"""
        filename = f"jobs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(filename, 'w') as f:
            json.dump({
                'jobs': self.jobs,
                'stats': self.stats,
                'scraped_at': datetime.now().isoformat()
            }, f, indent=2)
        print(f"\n๐Ÿ’พ Results saved to {filename}")
    
    def filter_jobs(self, keywords):
        """๐Ÿ” Filter jobs by keywords"""
        filtered = []
        for job in self.jobs:
            text = f"{job['title']} {job['company']}".lower()
            if any(keyword.lower() in text for keyword in keywords):
                filtered.append(job)
        return filtered

# ๐ŸŽฎ Test it out!
async def main():
    aggregator = JobAggregator()
    
    # ๐Ÿ’ผ Job sites configuration
    job_sites = [
        {
            'name': 'TechJobs',
            'url': 'https://example-tech-jobs.com',
            'emoji': '๐Ÿ’ป',
            'job_selector': '.job-card',
            'title_selector': '.job-title',
            'company_selector': '.company-name',
            'location_selector': '.location',
            'salary_selector': '.salary'
        },
        {
            'name': 'StartupJobs',
            'url': 'https://example-startup-jobs.com',
            'emoji': '๐Ÿš€',
            'job_selector': '.listing-item',
            'title_selector': 'h3.title',
            'company_selector': '.startup-name',
            'location_selector': '.loc',
            'salary_selector': '.compensation'
        },
        {
            'name': 'RemoteWork',
            'url': 'https://example-remote-jobs.com',
            'emoji': '๐Ÿ ',
            'job_selector': '.remote-job',
            'title_selector': '.position',
            'company_selector': '.employer',
            'location_selector': '.timezone',
            'salary_selector': '.pay-range'
        }
    ]
    
    # ๐Ÿš€ Start aggregation
    await aggregator.aggregate_jobs(job_sites)
    
    # ๐Ÿ” Filter for Python jobs
    python_jobs = aggregator.filter_jobs(['Python', 'Django', 'FastAPI'])
    print(f"\n๐Ÿ Found {len(python_jobs)} Python-related jobs!")

# Run it!
if __name__ == "__main__":
    asyncio.run(main())

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Build concurrent scrapers using threading and asyncio ๐Ÿ’ช
  • โœ… Handle multiple websites simultaneously ๐ŸŒ
  • โœ… Implement rate limiting to be respectful ๐Ÿ›ก๏ธ
  • โœ… Process large-scale data efficiently ๐Ÿš€
  • โœ… Handle errors gracefully in concurrent environments ๐Ÿ›

Remember: With great scraping power comes great responsibility! Always respect website terms of service and rate limits. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered concurrent web scraping!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Build your own scraper using the patterns you learned
  2. ๐Ÿ—๏ธ Add features like proxy rotation and user-agent switching
  3. ๐Ÿ“š Learn about Scrapy for more advanced scraping needs
  4. ๐ŸŒŸ Share your scraping projects with the community!

Remember: Every expert scraper started with their first concurrent request. Keep experimenting, keep learning, and most importantly, scrape responsibly! ๐Ÿš€


Happy scraping! ๐ŸŽ‰๐Ÿ•ท๏ธโœจ