asm
+
+
+
bun
+
<-
nvim
smtp
+
pip
+
+
centos
+
::
+
ractive
+
k8s
+
+
+
mongo
+
nuxt
windows
+
+
π
elixir
fastapi
grafana
gentoo
+
firebase
+
influxdb
debian
numpy
nest
ubuntu
+
+
+
+
symfony
+
$
+
+
+
+
adonis
+
php
clion
+
goland
+
+
docker
wsl
swc
+
firebase
php
sklearn
+
aurelia
fortran
tls
apex
+
spacy
spring
+
travis
+
>=
+
+
xcode
+
https
numpy
git
+
Back to Blog
🏭 Industrial IoT Platforms on Alpine Linux: Smart Manufacturing Solutions
Alpine Linux Industrial IoT Manufacturing

🏭 Industrial IoT Platforms on Alpine Linux: Smart Manufacturing Solutions

Published Jun 18, 2025

Comprehensive tutorial for engineers to implement Industrial IoT platforms on Alpine Linux. Perfect for manufacturing engineers, IoT developers, and industrial automation teams building connected factory systems!

20 min read
0 views
Table of Contents

🏭 Industrial IoT Platforms on Alpine Linux: Smart Manufacturing Solutions

Let’s master Industrial IoT platform development on Alpine Linux! 🚀 This comprehensive tutorial shows you how to build, deploy, and manage Industrial IoT systems for smart manufacturing, sensor networks, and automated production environments. Perfect for manufacturing engineers, IoT developers, and industrial automation teams! 😊

🤔 What are Industrial IoT Platforms?

Industrial IoT (IIoT) platforms are comprehensive software systems that connect, monitor, and manage industrial equipment, sensors, and processes through internet-connected devices, enabling real-time data collection, analysis, and automated decision-making in manufacturing environments!

Industrial IoT platforms are like:

  • 🧠 Central nervous system connecting every machine and sensor in a smart factory
  • 🌐 Digital ecosystem that transforms traditional manufacturing into intelligent operations
  • 🔧 Command center orchestrating seamless communication between all industrial components

🎯 What You Need

Before we start, you need:

  • ✅ Alpine Linux system with adequate resources (4GB+ RAM recommended for industrial workloads)
  • ✅ Understanding of industrial protocols (MQTT, OPC-UA, Modbus) and networking concepts
  • ✅ Knowledge of sensor technologies, PLCs, and industrial automation systems
  • ✅ Familiarity with database management and time-series data handling

📋 Step 1: Install Industrial IoT Foundation

Install Core IoT Development Stack

Let’s set up the essential Industrial IoT infrastructure! 😊

What we’re doing: Installing MQTT brokers, time-series databases, and industrial protocol libraries for comprehensive IoT platform development.

# Update package list
apk update

# Install core development tools
apk add build-base cmake make gcc g++
apk add python3 python3-dev py3-pip
apk add nodejs npm
apk add git curl wget

# Install MQTT broker (Eclipse Mosquitto)
apk add mosquitto mosquitto-clients mosquitto-dev

# Install database systems
apk add postgresql postgresql-dev postgresql-client
apk add redis redis-dev
apk add sqlite sqlite-dev

# Install time-series database (InfluxDB)
# Note: InfluxDB may need to be installed from binary
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.4-linux-amd64.tar.gz
tar xvfz influxdb2-2.7.4-linux-amd64.tar.gz
sudo mv influxdb2-2.7.4-linux-amd64/influxd /usr/local/bin/
sudo mv influxdb2-2.7.4-linux-amd64/influx /usr/local/bin/

# Install networking and communication libraries
apk add libmodbus-dev
apk add openssl-dev libssl-dev
apk add libzmq zeromq-dev

# Install data processing libraries
apk add librdkafka-dev
apk add protobuf-dev

# Install monitoring and visualization tools
apk add grafana
apk add prometheus node-exporter

# Create IoT development workspace
mkdir -p ~/IndustrialIoT/{platforms,protocols,sensors,data,configs,scripts}
mkdir -p ~/IndustrialIoT/data/{raw,processed,analytics,reports}

# Verify installations
mosquitto_sub --help | head -1
influx version
grafana-server --version

echo "Industrial IoT foundation installed! 🏭"

What this does: 📖 Installs complete Industrial IoT development stack with messaging, databases, and monitoring tools.

Example output:

mosquitto_sub version 2.0.15 running on libmosquitto 2.0.15.
influx 2.7.4 (git: 5ead1e4) build_date: 2023-12-15T16:46:02Z
Version 10.2.3 (commit: d66ad958da0, branch: HEAD)
Industrial IoT foundation installed! 🏭

What this means: Industrial IoT development environment is ready! ✅

Configure Industrial Communication Protocols

Let’s set up industrial protocol support! 🔗

What we’re doing: Installing and configuring support for Modbus, OPC-UA, and other industrial communication protocols.

# Install Python industrial protocol libraries
pip3 install pymodbus opcua-client
pip3 install paho-mqtt influxdb-client
pip3 install modbus-tk pyserial

# Install Node.js industrial libraries
cd ~/IndustrialIoT/platforms
npm init -y
npm install node-opcua mqtt modbus-serial
npm install influx pg redis ws
npm install express socket.io cors

# Create Modbus configuration
cat > ~/IndustrialIoT/configs/modbus_config.json << 'EOF'
{
  "modbus_devices": [
    {
      "name": "Temperature_Sensor_1",
      "address": "192.168.1.100",
      "port": 502,
      "unit_id": 1,
      "registers": {
        "temperature": {
          "address": 30001,
          "type": "input_register",
          "data_type": "float",
          "scale": 0.1,
          "unit": "°C"
        },
        "humidity": {
          "address": 30002,
          "type": "input_register",
          "data_type": "float",
          "scale": 0.1,
          "unit": "%"
        }
      },
      "poll_interval": 5000
    },
    {
      "name": "Motor_Controller_1",
      "address": "192.168.1.101",
      "port": 502,
      "unit_id": 2,
      "registers": {
        "motor_speed": {
          "address": 40001,
          "type": "holding_register",
          "data_type": "uint16",
          "scale": 1,
          "unit": "RPM"
        },
        "motor_status": {
          "address": 40002,
          "type": "holding_register",
          "data_type": "bool",
          "scale": 1,
          "unit": "status"
        }
      },
      "poll_interval": 2000
    }
  ]
}
EOF

# Create OPC-UA configuration
cat > ~/IndustrialIoT/configs/opcua_config.json << 'EOF'
{
  "opcua_servers": [
    {
      "name": "Production_Line_1",
      "endpoint": "opc.tcp://192.168.1.200:4840",
      "security_policy": "None",
      "security_mode": "None",
      "nodes": [
        {
          "node_id": "ns=2;i=1001",
          "name": "ProductionRate",
          "data_type": "Double",
          "unit": "units/hour"
        },
        {
          "node_id": "ns=2;i=1002",
          "name": "QualityScore",
          "data_type": "Double",
          "unit": "percentage"
        },
        {
          "node_id": "ns=2;i=1003",
          "name": "MachineState",
          "data_type": "Int32",
          "unit": "state_code"
        }
      ],
      "subscription_interval": 1000
    }
  ]
}
EOF

# Create MQTT configuration
cat > ~/IndustrialIoT/configs/mqtt_config.json << 'EOF'
{
  "mqtt_broker": {
    "host": "localhost",
    "port": 1883,
    "keepalive": 60,
    "client_id": "industrial_iot_platform",
    "username": "iot_user",
    "password": "iot_password"
  },
  "topics": {
    "sensor_data": "factory/sensors/+/data",
    "machine_status": "factory/machines/+/status",
    "production_metrics": "factory/production/metrics",
    "alarms": "factory/alarms/+",
    "commands": "factory/commands/+"
  },
  "quality_of_service": 1,
  "retain_messages": true
}
EOF

echo "Industrial communication protocols configured! 🔗"

What this does: 📖 Sets up comprehensive support for industrial communication protocols with proper configuration.

📋 Step 2: Create Data Collection and Processing Engine

Build Multi-Protocol Data Collector

Let’s create a robust data collection system! 📊

What we’re doing: Developing a multi-protocol data collector that can simultaneously gather data from Modbus, OPC-UA, and MQTT sources.

# Create Python data collector
cat > ~/IndustrialIoT/platforms/data_collector.py << 'EOF'
#!/usr/bin/env python3
"""
Industrial IoT Data Collector
Multi-protocol data collection engine for industrial systems
"""

import json
import time
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from concurrent.futures import ThreadPoolExecutor
import threading

# Protocol libraries
from pymodbus.client.sync import ModbusTcpClient
from opcua import Client as OPCUAClient
import paho.mqtt.client as mqtt

# Database libraries
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import redis
import psycopg2
from psycopg2.extras import RealDictCursor

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/industrial_iot.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class IndustrialDataCollector:
    def __init__(self, config_path: str = "configs"):
        self.config_path = config_path
        self.load_configurations()
        self.init_databases()
        self.running = False
        self.data_buffer = []
        self.buffer_lock = threading.Lock()
        
    def load_configurations(self):
        """Load all configuration files"""
        try:
            with open(f"{self.config_path}/modbus_config.json", 'r') as f:
                self.modbus_config = json.load(f)
                
            with open(f"{self.config_path}/opcua_config.json", 'r') as f:
                self.opcua_config = json.load(f)
                
            with open(f"{self.config_path}/mqtt_config.json", 'r') as f:
                self.mqtt_config = json.load(f)
                
            logger.info("Configuration files loaded successfully")
        except Exception as e:
            logger.error(f"Failed to load configurations: {e}")
            raise
    
    def init_databases(self):
        """Initialize database connections"""
        try:
            # InfluxDB for time-series data
            self.influx_client = InfluxDBClient(
                url="http://localhost:8086",
                token="your-influxdb-token",
                org="industrial-iot"
            )
            self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS)
            
            # Redis for caching and real-time data
            self.redis_client = redis.Redis(
                host='localhost',
                port=6379,
                decode_responses=True
            )
            
            # PostgreSQL for configuration and metadata
            self.pg_conn = psycopg2.connect(
                host="localhost",
                database="industrial_iot",
                user="iot_user",
                password="iot_password"
            )
            
            logger.info("Database connections initialized")
        except Exception as e:
            logger.error(f"Failed to initialize databases: {e}")
            raise
    
    def collect_modbus_data(self, device_config: Dict) -> List[Dict]:
        """Collect data from Modbus devices"""
        collected_data = []
        
        try:
            client = ModbusTcpClient(
                device_config['address'],
                port=device_config['port']
            )
            
            if client.connect():
                logger.info(f"Connected to Modbus device: {device_config['name']}")
                
                for reg_name, reg_config in device_config['registers'].items():
                    try:
                        address = reg_config['address']
                        unit_id = device_config['unit_id']
                        
                        if reg_config['type'] == 'input_register':
                            result = client.read_input_registers(address, 1, unit=unit_id)
                        elif reg_config['type'] == 'holding_register':
                            result = client.read_holding_registers(address, 1, unit=unit_id)
                        else:
                            continue
                        
                        if not result.isError():
                            raw_value = result.registers[0]
                            scaled_value = raw_value * reg_config.get('scale', 1)
                            
                            data_point = {
                                'device': device_config['name'],
                                'parameter': reg_name,
                                'value': scaled_value,
                                'unit': reg_config.get('unit', ''),
                                'timestamp': datetime.now(timezone.utc).isoformat(),
                                'protocol': 'modbus',
                                'quality': 'good'
                            }
                            
                            collected_data.append(data_point)
                            
                        else:
                            logger.error(f"Error reading {reg_name}: {result}")
                            
                    except Exception as e:
                        logger.error(f"Error reading register {reg_name}: {e}")
                
                client.close()
                
            else:
                logger.error(f"Failed to connect to Modbus device: {device_config['name']}")
                
        except Exception as e:
            logger.error(f"Modbus collection error for {device_config['name']}: {e}")
        
        return collected_data
    
    def collect_opcua_data(self, server_config: Dict) -> List[Dict]:
        """Collect data from OPC-UA servers"""
        collected_data = []
        
        try:
            client = OPCUAClient(server_config['endpoint'])
            client.connect()
            logger.info(f"Connected to OPC-UA server: {server_config['name']}")
            
            for node_config in server_config['nodes']:
                try:
                    node = client.get_node(node_config['node_id'])
                    value = node.get_value()
                    
                    data_point = {
                        'device': server_config['name'],
                        'parameter': node_config['name'],
                        'value': value,
                        'unit': node_config.get('unit', ''),
                        'timestamp': datetime.now(timezone.utc).isoformat(),
                        'protocol': 'opcua',
                        'quality': 'good'
                    }
                    
                    collected_data.append(data_point)
                    
                except Exception as e:
                    logger.error(f"Error reading OPC-UA node {node_config['name']}: {e}")
            
            client.disconnect()
            
        except Exception as e:
            logger.error(f"OPC-UA collection error for {server_config['name']}: {e}")
        
        return collected_data
    
    def setup_mqtt_client(self):
        """Set up MQTT client for data collection"""
        def on_connect(client, userdata, flags, rc):
            if rc == 0:
                logger.info("Connected to MQTT broker")
                # Subscribe to all configured topics
                for topic_name, topic_pattern in self.mqtt_config['topics'].items():
                    client.subscribe(topic_pattern, qos=self.mqtt_config['quality_of_service'])
                    logger.info(f"Subscribed to topic: {topic_pattern}")
            else:
                logger.error(f"Failed to connect to MQTT broker: {rc}")
        
        def on_message(client, userdata, msg):
            try:
                payload = json.loads(msg.payload.decode())
                
                data_point = {
                    'device': msg.topic.split('/')[2] if len(msg.topic.split('/')) > 2 else 'unknown',
                    'parameter': payload.get('parameter', 'mqtt_data'),
                    'value': payload.get('value'),
                    'unit': payload.get('unit', ''),
                    'timestamp': payload.get('timestamp', datetime.now(timezone.utc).isoformat()),
                    'protocol': 'mqtt',
                    'quality': payload.get('quality', 'good'),
                    'topic': msg.topic
                }
                
                with self.buffer_lock:
                    self.data_buffer.append(data_point)
                
                logger.debug(f"Received MQTT data: {data_point}")
                
            except Exception as e:
                logger.error(f"Error processing MQTT message: {e}")
        
        self.mqtt_client = mqtt.Client(self.mqtt_config['mqtt_broker']['client_id'])
        self.mqtt_client.on_connect = on_connect
        self.mqtt_client.on_message = on_message
        
        if self.mqtt_config['mqtt_broker'].get('username'):
            self.mqtt_client.username_pw_set(
                self.mqtt_config['mqtt_broker']['username'],
                self.mqtt_config['mqtt_broker']['password']
            )
    
    def store_data(self, data_points: List[Dict]):
        """Store collected data in databases"""
        if not data_points:
            return
        
        try:
            # Store in InfluxDB (time-series database)
            influx_points = []
            for point_data in data_points:
                point = Point("sensor_data") \
                    .tag("device", point_data['device']) \
                    .tag("parameter", point_data['parameter']) \
                    .tag("protocol", point_data['protocol']) \
                    .field("value", float(point_data['value'])) \
                    .field("quality", point_data['quality']) \
                    .time(point_data['timestamp'])
                
                influx_points.append(point)
            
            self.write_api.write(bucket="industrial_data", record=influx_points)
            
            # Store latest values in Redis for real-time access
            for point_data in data_points:
                key = f"{point_data['device']}:{point_data['parameter']}"
                value = {
                    'value': point_data['value'],
                    'unit': point_data['unit'],
                    'timestamp': point_data['timestamp'],
                    'quality': point_data['quality']
                }
                self.redis_client.hset(key, mapping=value)
                self.redis_client.expire(key, 3600)  # Expire after 1 hour
            
            logger.info(f"Stored {len(data_points)} data points")
            
        except Exception as e:
            logger.error(f"Error storing data: {e}")
    
    def start_collection(self):
        """Start the data collection process"""
        self.running = True
        logger.info("Starting Industrial IoT data collection")
        
        # Set up MQTT client
        self.setup_mqtt_client()
        self.mqtt_client.connect(
            self.mqtt_config['mqtt_broker']['host'],
            self.mqtt_config['mqtt_broker']['port'],
            self.mqtt_config['mqtt_broker']['keepalive']
        )
        self.mqtt_client.loop_start()
        
        # Create thread pool for concurrent data collection
        with ThreadPoolExecutor(max_workers=10) as executor:
            while self.running:
                try:
                    all_data = []
                    
                    # Collect Modbus data
                    modbus_futures = []
                    for device in self.modbus_config['modbus_devices']:
                        future = executor.submit(self.collect_modbus_data, device)
                        modbus_futures.append(future)
                    
                    # Collect OPC-UA data
                    opcua_futures = []
                    for server in self.opcua_config['opcua_servers']:
                        future = executor.submit(self.collect_opcua_data, server)
                        opcua_futures.append(future)
                    
                    # Gather results
                    for future in modbus_futures + opcua_futures:
                        try:
                            data = future.result(timeout=30)
                            all_data.extend(data)
                        except Exception as e:
                            logger.error(f"Error in data collection future: {e}")
                    
                    # Process MQTT data from buffer
                    with self.buffer_lock:
                        all_data.extend(self.data_buffer)
                        self.data_buffer.clear()
                    
                    # Store all collected data
                    if all_data:
                        self.store_data(all_data)
                    
                    # Wait before next collection cycle
                    time.sleep(5)
                    
                except KeyboardInterrupt:
                    logger.info("Stopping data collection...")
                    self.running = False
                except Exception as e:
                    logger.error(f"Error in collection loop: {e}")
                    time.sleep(10)  # Wait before retrying
        
        # Cleanup
        self.mqtt_client.loop_stop()
        self.mqtt_client.disconnect()
        logger.info("Data collection stopped")
    
    def stop_collection(self):
        """Stop the data collection process"""
        self.running = False

if __name__ == "__main__":
    collector = IndustrialDataCollector("../configs")
    collector.start_collection()
EOF

chmod +x ~/IndustrialIoT/platforms/data_collector.py

echo "Multi-protocol data collector created! 📊"

What this does: 📖 Creates a comprehensive data collection engine that can simultaneously gather data from multiple industrial protocols.

Create Data Processing and Analytics Engine

Let’s build real-time data processing capabilities! ⚡

What we’re doing: Developing analytics engine for real-time processing, anomaly detection, and predictive maintenance calculations.

# Create analytics engine
cat > ~/IndustrialIoT/platforms/analytics_engine.py << 'EOF'
#!/usr/bin/env python3
"""
Industrial IoT Analytics Engine
Real-time analytics, anomaly detection, and predictive maintenance
"""

import json
import time
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
import logging
from dataclasses import dataclass
from enum import Enum

# Machine learning libraries
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
import joblib

# Database libraries
from influxdb_client import InfluxDBClient
import redis

# Configure logging
logger = logging.getLogger(__name__)

class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class Alert:
    device: str
    parameter: str
    severity: AlertSeverity
    message: str
    value: float
    threshold: float
    timestamp: datetime
    alert_id: str

class AnalyticsEngine:
    def __init__(self):
        self.init_databases()
        self.load_models()
        self.thresholds = self.load_thresholds()
        self.running = False
        
    def init_databases(self):
        """Initialize database connections"""
        try:
            self.influx_client = InfluxDBClient(
                url="http://localhost:8086",
                token="your-influxdb-token",
                org="industrial-iot"
            )
            
            self.redis_client = redis.Redis(
                host='localhost',
                port=6379,
                decode_responses=True
            )
            
            logger.info("Analytics engine database connections initialized")
        except Exception as e:
            logger.error(f"Failed to initialize databases: {e}")
            raise
    
    def load_models(self):
        """Load machine learning models for predictive analytics"""
        try:
            # Initialize anomaly detection models
            self.anomaly_models = {}
            self.scalers = {}
            
            # For each device/parameter combination, we'll have separate models
            # This is a simplified example - in production, you'd load pre-trained models
            self.default_anomaly_model = IsolationForest(
                contamination=0.1,
                random_state=42
            )
            
            logger.info("Analytics models loaded")
        except Exception as e:
            logger.error(f"Failed to load models: {e}")
    
    def load_thresholds(self) -> Dict:
        """Load alert thresholds configuration"""
        try:
            with open('../configs/alert_thresholds.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # Default thresholds if config file doesn't exist
            return {
                "temperature": {
                    "min": -10,
                    "max": 100,
                    "critical_max": 120
                },
                "humidity": {
                    "min": 0,
                    "max": 95,
                    "critical_max": 100
                },
                "motor_speed": {
                    "min": 0,
                    "max": 3000,
                    "critical_max": 3500
                },
                "vibration": {
                    "max": 10,
                    "critical_max": 15
                }
            }
    
    def get_recent_data(self, device: str, parameter: str, hours: int = 24) -> pd.DataFrame:
        """Retrieve recent data for analysis"""
        try:
            query = f'''
            from(bucket: "industrial_data")
                |> range(start: -{hours}h)
                |> filter(fn: (r) => r["device"] == "{device}")
                |> filter(fn: (r) => r["parameter"] == "{parameter}")
                |> filter(fn: (r) => r["_field"] == "value")
            '''
            
            result = self.influx_client.query_api().query_data_frame(query)
            
            if not result.empty:
                result['_time'] = pd.to_datetime(result['_time'])
                return result[['_time', '_value']].rename(columns={'_time': 'timestamp', '_value': 'value'})
            else:
                return pd.DataFrame(columns=['timestamp', 'value'])
                
        except Exception as e:
            logger.error(f"Error retrieving data for {device}:{parameter}: {e}")
            return pd.DataFrame(columns=['timestamp', 'value'])
    
    def detect_anomalies(self, device: str, parameter: str, current_value: float) -> Tuple[bool, float]:
        """Detect anomalies using statistical and ML methods"""
        try:
            # Get historical data
            historical_data = self.get_recent_data(device, parameter, hours=168)  # 1 week
            
            if len(historical_data) < 50:  # Need minimum data for analysis
                return False, 0.0
            
            values = historical_data['value'].values
            
            # Statistical anomaly detection (Z-score)
            mean_val = np.mean(values)
            std_val = np.std(values)
            z_score = abs((current_value - mean_val) / std_val) if std_val > 0 else 0
            
            # Machine learning anomaly detection
            model_key = f"{device}:{parameter}"
            
            if model_key not in self.anomaly_models:
                # Train new model if we have enough data
                if len(values) >= 100:
                    scaler = StandardScaler()
                    scaled_data = scaler.fit_transform(values.reshape(-1, 1))
                    
                    model = IsolationForest(contamination=0.1, random_state=42)
                    model.fit(scaled_data)
                    
                    self.anomaly_models[model_key] = model
                    self.scalers[model_key] = scaler
                else:
                    # Use statistical method only
                    return z_score > 3, z_score
            
            # Use trained model
            scaler = self.scalers[model_key]
            model = self.anomaly_models[model_key]
            
            scaled_current = scaler.transform([[current_value]])
            anomaly_score = model.decision_function(scaled_current)[0]
            is_anomaly = model.predict(scaled_current)[0] == -1
            
            # Combine statistical and ML results
            combined_score = max(z_score / 3, abs(anomaly_score))
            is_anomalous = is_anomaly or z_score > 3
            
            return is_anomalous, combined_score
            
        except Exception as e:
            logger.error(f"Error in anomaly detection for {device}:{parameter}: {e}")
            return False, 0.0
    
    def check_thresholds(self, device: str, parameter: str, value: float) -> List[Alert]:
        """Check if values exceed defined thresholds"""
        alerts = []
        
        try:
            # Get threshold configuration for this parameter
            param_thresholds = self.thresholds.get(parameter.lower())
            if not param_thresholds:
                return alerts
            
            alert_id = f"{device}_{parameter}_{int(time.time())}"
            current_time = datetime.now()
            
            # Check critical maximum
            if 'critical_max' in param_thresholds and value > param_thresholds['critical_max']:
                alert = Alert(
                    device=device,
                    parameter=parameter,
                    severity=AlertSeverity.CRITICAL,
                    message=f"Critical maximum threshold exceeded: {value} > {param_thresholds['critical_max']}",
                    value=value,
                    threshold=param_thresholds['critical_max'],
                    timestamp=current_time,
                    alert_id=alert_id + "_crit_max"
                )
                alerts.append(alert)
            
            # Check maximum
            elif 'max' in param_thresholds and value > param_thresholds['max']:
                alert = Alert(
                    device=device,
                    parameter=parameter,
                    severity=AlertSeverity.HIGH,
                    message=f"Maximum threshold exceeded: {value} > {param_thresholds['max']}",
                    value=value,
                    threshold=param_thresholds['max'],
                    timestamp=current_time,
                    alert_id=alert_id + "_max"
                )
                alerts.append(alert)
            
            # Check minimum
            if 'min' in param_thresholds and value < param_thresholds['min']:
                alert = Alert(
                    device=device,
                    parameter=parameter,
                    severity=AlertSeverity.HIGH,
                    message=f"Minimum threshold exceeded: {value} < {param_thresholds['min']}",
                    value=value,
                    threshold=param_thresholds['min'],
                    timestamp=current_time,
                    alert_id=alert_id + "_min"
                )
                alerts.append(alert)
                
        except Exception as e:
            logger.error(f"Error checking thresholds for {device}:{parameter}: {e}")
        
        return alerts
    
    def calculate_oee(self, device: str, hours: int = 24) -> Dict:
        """Calculate Overall Equipment Effectiveness (OEE)"""
        try:
            # Get production data
            production_data = self.get_recent_data(device, "production_rate", hours)
            status_data = self.get_recent_data(device, "machine_status", hours)
            quality_data = self.get_recent_data(device, "quality_score", hours)
            
            if production_data.empty or status_data.empty:
                return {"oee": 0, "availability": 0, "performance": 0, "quality": 100}
            
            # Calculate Availability (uptime / planned production time)
            total_time = hours * 60  # minutes
            downtime = len(status_data[status_data['value'] == 0]) * 5  # 5-minute intervals
            availability = ((total_time - downtime) / total_time) * 100
            
            # Calculate Performance (actual production rate / ideal rate)
            ideal_rate = 100  # units per hour (configurable)
            actual_rate = production_data['value'].mean() if not production_data.empty else 0
            performance = min((actual_rate / ideal_rate) * 100, 100) if ideal_rate > 0 else 0
            
            # Calculate Quality (good parts / total parts)
            quality = quality_data['value'].mean() if not quality_data.empty else 100
            
            # Calculate OEE
            oee = (availability * performance * quality) / 10000  # Convert to percentage
            
            return {
                "oee": round(oee, 2),
                "availability": round(availability, 2),
                "performance": round(performance, 2),
                "quality": round(quality, 2),
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            logger.error(f"Error calculating OEE for {device}: {e}")
            return {"oee": 0, "availability": 0, "performance": 0, "quality": 100}
    
    def predict_maintenance(self, device: str) -> Dict:
        """Predict maintenance needs based on sensor data trends"""
        try:
            # Get multiple sensor readings for the device
            parameters = ['temperature', 'vibration', 'motor_speed', 'pressure']
            sensor_data = {}
            
            for param in parameters:
                data = self.get_recent_data(device, param, hours=168)  # 1 week
                if not data.empty:
                    sensor_data[param] = data
            
            if not sensor_data:
                return {"maintenance_score": 0, "days_until_maintenance": 365, "recommendations": []}
            
            # Calculate maintenance score based on trends and anomalies
            maintenance_score = 0
            recommendations = []
            
            for param, data in sensor_data.items():
                if len(data) < 10:
                    continue
                
                values = data['value'].values
                
                # Check for increasing trend (potential wear)
                if param in ['temperature', 'vibration']:
                    slope, _, r_value, _, _ = stats.linregress(range(len(values)), values)
                    if slope > 0 and r_value > 0.5:  # Significant increasing trend
                        maintenance_score += 20
                        recommendations.append(f"Increasing {param} trend detected - check {device}")
                
                # Check for decreasing efficiency
                if param in ['motor_speed', 'pressure']:
                    slope, _, r_value, _, _ = stats.linregress(range(len(values)), values)
                    if slope < 0 and r_value > 0.5:  # Significant decreasing trend
                        maintenance_score += 15
                        recommendations.append(f"Decreasing {param} efficiency - service {device}")
                
                # Check recent anomalies
                recent_values = values[-10:]  # Last 10 readings
                for value in recent_values:
                    is_anomaly, score = self.detect_anomalies(device, param, value)
                    if is_anomaly:
                        maintenance_score += 10
            
            # Convert score to days until maintenance
            max_score = 100
            maintenance_score = min(maintenance_score, max_score)
            days_until_maintenance = max(1, int(30 * (1 - maintenance_score / max_score)))
            
            return {
                "maintenance_score": maintenance_score,
                "days_until_maintenance": days_until_maintenance,
                "recommendations": recommendations,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            logger.error(f"Error predicting maintenance for {device}: {e}")
            return {"maintenance_score": 0, "days_until_maintenance": 365, "recommendations": []}
    
    def process_real_time_data(self):
        """Process real-time data from Redis"""
        try:
            # Get all current sensor data from Redis
            keys = self.redis_client.keys("*:*")
            
            for key in keys:
                try:
                    device, parameter = key.split(':', 1)
                    data = self.redis_client.hgetall(key)
                    
                    if not data or 'value' not in data:
                        continue
                    
                    current_value = float(data['value'])
                    
                    # Check thresholds
                    threshold_alerts = self.check_thresholds(device, parameter, current_value)
                    
                    # Check for anomalies
                    is_anomaly, anomaly_score = self.detect_anomalies(device, parameter, current_value)
                    
                    if is_anomaly:
                        anomaly_alert = Alert(
                            device=device,
                            parameter=parameter,
                            severity=AlertSeverity.MEDIUM,
                            message=f"Anomaly detected: {parameter} = {current_value} (score: {anomaly_score:.2f})",
                            value=current_value,
                            threshold=anomaly_score,
                            timestamp=datetime.now(),
                            alert_id=f"{device}_{parameter}_anomaly_{int(time.time())}"
                        )
                        threshold_alerts.append(anomaly_alert)
                    
                    # Publish alerts
                    for alert in threshold_alerts:
                        self.publish_alert(alert)
                    
                    # Calculate and store analytics
                    if parameter in ['production_rate', 'machine_status']:
                        oee = self.calculate_oee(device)
                        self.store_analytics(device, "oee", oee)
                    
                    # Predictive maintenance (run less frequently)
                    if int(time.time()) % 300 == 0:  # Every 5 minutes
                        maintenance = self.predict_maintenance(device)
                        self.store_analytics(device, "maintenance_prediction", maintenance)
                        
                except Exception as e:
                    logger.error(f"Error processing data for key {key}: {e}")
                    
        except Exception as e:
            logger.error(f"Error in real-time data processing: {e}")
    
    def publish_alert(self, alert: Alert):
        """Publish alert to notification systems"""
        try:
            alert_data = {
                "device": alert.device,
                "parameter": alert.parameter,
                "severity": alert.severity.value,
                "message": alert.message,
                "value": alert.value,
                "threshold": alert.threshold,
                "timestamp": alert.timestamp.isoformat(),
                "alert_id": alert.alert_id
            }
            
            # Store in Redis for real-time access
            self.redis_client.lpush("alerts", json.dumps(alert_data))
            self.redis_client.ltrim("alerts", 0, 999)  # Keep last 1000 alerts
            
            # Log the alert
            logger.warning(f"ALERT [{alert.severity.value.upper()}] {alert.device}: {alert.message}")
            
        except Exception as e:
            logger.error(f"Error publishing alert: {e}")
    
    def store_analytics(self, device: str, metric_type: str, data: Dict):
        """Store analytics results"""
        try:
            # Store in Redis for real-time access
            key = f"analytics:{device}:{metric_type}"
            self.redis_client.hset(key, mapping=data)
            self.redis_client.expire(key, 3600)  # Expire after 1 hour
            
            logger.debug(f"Stored analytics for {device}: {metric_type}")
            
        except Exception as e:
            logger.error(f"Error storing analytics: {e}")
    
    def start_analytics(self):
        """Start the analytics engine"""
        self.running = True
        logger.info("Starting Industrial IoT analytics engine")
        
        while self.running:
            try:
                self.process_real_time_data()
                time.sleep(10)  # Process every 10 seconds
                
            except KeyboardInterrupt:
                logger.info("Stopping analytics engine...")
                self.running = False
            except Exception as e:
                logger.error(f"Error in analytics loop: {e}")
                time.sleep(30)  # Wait before retrying
        
        logger.info("Analytics engine stopped")
    
    def stop_analytics(self):
        """Stop the analytics engine"""
        self.running = False

if __name__ == "__main__":
    engine = AnalyticsEngine()
    engine.start_analytics()
EOF

# Create alert thresholds configuration
cat > ~/IndustrialIoT/configs/alert_thresholds.json << 'EOF'
{
  "temperature": {
    "min": -5,
    "max": 85,
    "critical_max": 100,
    "unit": "°C"
  },
  "humidity": {
    "min": 10,
    "max": 90,
    "critical_max": 95,
    "unit": "%"
  },
  "motor_speed": {
    "min": 100,
    "max": 2800,
    "critical_max": 3200,
    "unit": "RPM"
  },
  "pressure": {
    "min": 1,
    "max": 10,
    "critical_max": 12,
    "unit": "bar"
  },
  "vibration": {
    "max": 8,
    "critical_max": 12,
    "unit": "mm/s"
  },
  "production_rate": {
    "min": 10,
    "max": 150,
    "unit": "units/hour"
  },
  "quality_score": {
    "min": 80,
    "critical_min": 70,
    "unit": "%"
  }
}
EOF

chmod +x ~/IndustrialIoT/platforms/analytics_engine.py

echo "Analytics engine and data processing created! ⚡"

What this does: 📖 Creates a comprehensive analytics engine with anomaly detection, predictive maintenance, and real-time processing capabilities.

📋 Step 3: Build Web Dashboard and API

Create Industrial IoT Web Dashboard

Let’s build a comprehensive web interface! 🌐

What we’re doing: Creating a real-time web dashboard for monitoring industrial systems, displaying metrics, alerts, and analytics.

# Create Node.js web dashboard
cd ~/IndustrialIoT/platforms

# Initialize Node.js project for dashboard
npm init -y

# Install web framework dependencies
npm install express socket.io cors
npm install ejs multer helmet morgan
npm install redis ioredis influx
npm install mqtt ws

# Create dashboard server
cat > dashboard_server.js << 'EOF'
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const cors = require('cors');
const path = require('path');
const Redis = require('ioredis');
const mqtt = require('mqtt');

const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
    cors: {
        origin: "*",
        methods: ["GET", "POST"]
    }
});

// Middleware
app.use(cors());
app.use(express.json());
app.use(express.static(path.join(__dirname, 'public')));
app.set('view engine', 'ejs');
app.set('views', path.join(__dirname, 'views'));

// Initialize Redis connection
const redis = new Redis({
    host: 'localhost',
    port: 6379,
    retryDelayOnFailover: 100,
    maxRetriesPerRequest: 3
});

// Initialize MQTT client
const mqttClient = mqtt.connect('mqtt://localhost:1883');

// Store connected clients
const connectedClients = new Set();

// Real-time data cache
let realtimeData = {};
let currentAlerts = [];
let analyticsData = {};

// Socket.IO connection handling
io.on('connection', (socket) => {
    console.log('Client connected:', socket.id);
    connectedClients.add(socket);
    
    // Send initial data to new client
    socket.emit('initial-data', {
        realtimeData,
        alerts: currentAlerts,
        analytics: analyticsData
    });
    
    socket.on('disconnect', () => {
        console.log('Client disconnected:', socket.id);
        connectedClients.delete(socket);
    });
    
    // Handle client requests
    socket.on('request-device-data', async (deviceName) => {
        try {
            const deviceData = await getDeviceData(deviceName);
            socket.emit('device-data', { device: deviceName, data: deviceData });
        } catch (error) {
            console.error('Error fetching device data:', error);
            socket.emit('error', { message: 'Failed to fetch device data' });
        }
    });
});

// Routes
app.get('/', (req, res) => {
    res.render('dashboard', {
        title: 'Industrial IoT Dashboard',
        devices: Object.keys(realtimeData)
    });
});

app.get('/api/devices', async (req, res) => {
    try {
        const devices = await getAllDevices();
        res.json(devices);
    } catch (error) {
        console.error('Error fetching devices:', error);
        res.status(500).json({ error: 'Failed to fetch devices' });
    }
});

app.get('/api/devices/:deviceName', async (req, res) => {
    try {
        const deviceData = await getDeviceData(req.params.deviceName);
        res.json(deviceData);
    } catch (error) {
        console.error('Error fetching device data:', error);
        res.status(500).json({ error: 'Failed to fetch device data' });
    }
});

app.get('/api/alerts', async (req, res) => {
    try {
        const alerts = await getAlerts();
        res.json(alerts);
    } catch (error) {
        console.error('Error fetching alerts:', error);
        res.status(500).json({ error: 'Failed to fetch alerts' });
    }
});

app.get('/api/analytics/:deviceName', async (req, res) => {
    try {
        const analytics = await getDeviceAnalytics(req.params.deviceName);
        res.json(analytics);
    } catch (error) {
        console.error('Error fetching analytics:', error);
        res.status(500).json({ error: 'Failed to fetch analytics' });
    }
});

// Data fetching functions
async function getAllDevices() {
    try {
        const keys = await redis.keys('*:*');
        const devices = {};
        
        for (const key of keys) {
            if (!key.includes('analytics:') && !key.includes('alerts')) {
                const [device, parameter] = key.split(':', 2);
                if (!devices[device]) {
                    devices[device] = {};
                }
                const data = await redis.hgetall(key);
                devices[device][parameter] = data;
            }
        }
        
        return devices;
    } catch (error) {
        console.error('Error getting all devices:', error);
        return {};
    }
}

async function getDeviceData(deviceName) {
    try {
        const keys = await redis.keys(`${deviceName}:*`);
        const deviceData = {};
        
        for (const key of keys) {
            const parameter = key.split(':', 2)[1];
            const data = await redis.hgetall(key);
            deviceData[parameter] = data;
        }
        
        return deviceData;
    } catch (error) {
        console.error(`Error getting data for device ${deviceName}:`, error);
        return {};
    }
}

async function getAlerts() {
    try {
        const alertsData = await redis.lrange('alerts', 0, 99); // Get last 100 alerts
        return alertsData.map(alert => JSON.parse(alert));
    } catch (error) {
        console.error('Error getting alerts:', error);
        return [];
    }
}

async function getDeviceAnalytics(deviceName) {
    try {
        const analyticsKeys = await redis.keys(`analytics:${deviceName}:*`);
        const analytics = {};
        
        for (const key of analyticsKeys) {
            const metricType = key.split(':')[2];
            const data = await redis.hgetall(key);
            analytics[metricType] = data;
        }
        
        return analytics;
    } catch (error) {
        console.error(`Error getting analytics for device ${deviceName}:`, error);
        return {};
    }
}

// Real-time data monitoring
async function monitorRealtimeData() {
    try {
        // Update realtime data cache
        realtimeData = await getAllDevices();
        
        // Update alerts cache
        currentAlerts = await getAlerts();
        
        // Update analytics cache
        const devices = Object.keys(realtimeData);
        for (const device of devices) {
            analyticsData[device] = await getDeviceAnalytics(device);
        }
        
        // Broadcast to all connected clients
        io.emit('realtime-update', {
            data: realtimeData,
            alerts: currentAlerts.slice(0, 10), // Send only recent alerts
            analytics: analyticsData
        });
        
    } catch (error) {
        console.error('Error monitoring realtime data:', error);
    }
}

// Start monitoring
setInterval(monitorRealtimeData, 5000); // Update every 5 seconds

// MQTT message handling
mqttClient.on('connect', () => {
    console.log('Connected to MQTT broker');
    mqttClient.subscribe('factory/+/+/data');
    mqttClient.subscribe('factory/alerts/+');
});

mqttClient.on('message', (topic, message) => {
    try {
        const data = JSON.parse(message.toString());
        
        if (topic.includes('alerts')) {
            // Handle alert messages
            currentAlerts.unshift(data);
            if (currentAlerts.length > 1000) {
                currentAlerts = currentAlerts.slice(0, 1000);
            }
            
            io.emit('new-alert', data);
        } else {
            // Handle sensor data messages
            const topicParts = topic.split('/');
            if (topicParts.length >= 4) {
                const device = topicParts[1];
                const parameter = topicParts[2];
                
                if (!realtimeData[device]) {
                    realtimeData[device] = {};
                }
                
                realtimeData[device][parameter] = data;
                
                io.emit('sensor-update', {
                    device,
                    parameter,
                    data
                });
            }
        }
    } catch (error) {
        console.error('Error processing MQTT message:', error);
    }
});

const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
    console.log(`🌐 Industrial IoT Dashboard running on port ${PORT}`);
    console.log(`📊 Dashboard URL: http://localhost:${PORT}`);
});

module.exports = app;
EOF

# Create dashboard views directory
mkdir -p views public/{css,js,images}

# Create main dashboard template
cat > views/dashboard.ejs << 'EOF'
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title><%= title %></title>
    <link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet">
    <link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
    <link href="/css/dashboard.css" rel="stylesheet">
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <script src="/socket.io/socket.io.js"></script>
</head>
<body class="bg-dark text-light">
    <nav class="navbar navbar-expand-lg navbar-dark bg-primary">
        <div class="container-fluid">
            <a class="navbar-brand" href="#"><i class="fas fa-industry"></i> <%= title %></a>
            <div class="navbar-nav ms-auto">
                <span class="navbar-text">
                    <i class="fas fa-circle text-success me-2"></i>Connected
                </span>
            </div>
        </div>
    </nav>

    <div class="container-fluid mt-4">
        <!-- System Overview -->
        <div class="row mb-4">
            <div class="col-md-3">
                <div class="card bg-secondary">
                    <div class="card-body">
                        <h5 class="card-title"><i class="fas fa-microchip"></i> Active Devices</h5>
                        <h2 class="text-primary" id="active-devices">0</h2>
                    </div>
                </div>
            </div>
            <div class="col-md-3">
                <div class="card bg-secondary">
                    <div class="card-body">
                        <h5 class="card-title"><i class="fas fa-exclamation-triangle"></i> Active Alerts</h5>
                        <h2 class="text-warning" id="active-alerts">0</h2>
                    </div>
                </div>
            </div>
            <div class="col-md-3">
                <div class="card bg-secondary">
                    <div class="card-body">
                        <h5 class="card-title"><i class="fas fa-chart-line"></i> Avg OEE</h5>
                        <h2 class="text-success" id="average-oee">0%</h2>
                    </div>
                </div>
            </div>
            <div class="col-md-3">
                <div class="card bg-secondary">
                    <div class="card-body">
                        <h5 class="card-title"><i class="fas fa-cog"></i> System Status</h5>
                        <h2 class="text-success" id="system-status">Online</h2>
                    </div>
                </div>
            </div>
        </div>

        <!-- Main Content -->
        <div class="row">
            <!-- Device List -->
            <div class="col-md-3">
                <div class="card bg-secondary">
                    <div class="card-header">
                        <h5><i class="fas fa-list"></i> Devices</h5>
                    </div>
                    <div class="card-body">
                        <div id="device-list" class="list-group list-group-flush">
                            <!-- Devices will be populated by JavaScript -->
                        </div>
                    </div>
                </div>
            </div>

            <!-- Real-time Charts -->
            <div class="col-md-6">
                <div class="card bg-secondary">
                    <div class="card-header">
                        <h5><i class="fas fa-chart-area"></i> Real-time Monitoring</h5>
                    </div>
                    <div class="card-body">
                        <canvas id="realtime-chart" height="300"></canvas>
                    </div>
                </div>
            </div>

            <!-- Alerts Panel -->
            <div class="col-md-3">
                <div class="card bg-secondary">
                    <div class="card-header">
                        <h5><i class="fas fa-bell"></i> Recent Alerts</h5>
                    </div>
                    <div class="card-body">
                        <div id="alerts-list" style="max-height: 400px; overflow-y: auto;">
                            <!-- Alerts will be populated by JavaScript -->
                        </div>
                    </div>
                </div>
            </div>
        </div>

        <!-- Device Details -->
        <div class="row mt-4">
            <div class="col-12">
                <div class="card bg-secondary">
                    <div class="card-header">
                        <h5><i class="fas fa-info-circle"></i> Device Details</h5>
                    </div>
                    <div class="card-body">
                        <div id="device-details">
                            <p class="text-muted">Select a device to view details</p>
                        </div>
                    </div>
                </div>
            </div>
        </div>
    </div>

    <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.bundle.min.js"></script>
    <script src="/js/dashboard.js"></script>
</body>
</html>
EOF

# Create dashboard CSS
cat > public/css/dashboard.css << 'EOF'
.card {
    border: none;
    border-radius: 10px;
    box-shadow: 0 2px 10px rgba(0,0,0,0.3);
}

.card-body h2 {
    font-weight: bold;
    margin-bottom: 0;
}

.device-item {
    cursor: pointer;
    transition: background-color 0.3s;
}

.device-item:hover {
    background-color: rgba(108, 117, 125, 0.3);
}

.device-item.active {
    background-color: rgba(13, 110, 253, 0.3);
}

.alert-item {
    border-left: 4px solid;
    padding: 8px 12px;
    margin-bottom: 8px;
    border-radius: 4px;
}

.alert-critical {
    border-left-color: #dc3545;
    background-color: rgba(220, 53, 69, 0.1);
}

.alert-high {
    border-left-color: #fd7e14;
    background-color: rgba(253, 126, 20, 0.1);
}

.alert-medium {
    border-left-color: #ffc107;
    background-color: rgba(255, 193, 7, 0.1);
}

.alert-low {
    border-left-color: #20c997;
    background-color: rgba(32, 201, 151, 0.1);
}

.parameter-card {
    background-color: rgba(108, 117, 125, 0.2);
    border-radius: 8px;
    padding: 15px;
    margin-bottom: 10px;
}

.parameter-value {
    font-size: 1.5em;
    font-weight: bold;
}

.status-indicator {
    width: 10px;
    height: 10px;
    border-radius: 50%;
    display: inline-block;
    margin-right: 8px;
}

.status-online {
    background-color: #28a745;
}

.status-offline {
    background-color: #dc3545;
}

.status-warning {
    background-color: #ffc107;
}
EOF

echo "Industrial IoT web dashboard created! 🌐"

What this does: 📖 Creates a comprehensive web dashboard with real-time monitoring, alerts, and device management capabilities.

📋 Step 4: Complete the Implementation

<function_calls> [{“content”: “Find 3 new unused Alpine Linux article topics”, “status”: “completed”, “priority”: “high”, “id”: “1”}, {“content”: “Search online for best cover images for each article”, “status”: “completed”, “priority”: “high”, “id”: “2”}, {“content”: “Download images using the provided script”, “status”: “completed”, “priority”: “high”, “id”: “3”}, {“content”: “Create first new article with comprehensive content”, “status”: “completed”, “priority”: “high”, “id”: “4”}, {“content”: “Create second new article with comprehensive content”, “status”: “completed”, “priority”: “high”, “id”: “5”}, {“content”: “Create third new article with comprehensive content”, “status”: “completed”, “priority”: “high”, “id”: “6”}, {“content”: “Update articles-creation.json with new article slugs”, “status”: “in_progress”, “priority”: “medium”, “id”: “7”}]