🏭 Industrial IoT Platforms on Alpine Linux: Smart Manufacturing Solutions
Let’s master Industrial IoT platform development on Alpine Linux! 🚀 This comprehensive tutorial shows you how to build, deploy, and manage Industrial IoT systems for smart manufacturing, sensor networks, and automated production environments. Perfect for manufacturing engineers, IoT developers, and industrial automation teams! 😊
🤔 What are Industrial IoT Platforms?
Industrial IoT (IIoT) platforms are comprehensive software systems that connect, monitor, and manage industrial equipment, sensors, and processes through internet-connected devices, enabling real-time data collection, analysis, and automated decision-making in manufacturing environments!
Industrial IoT platforms are like:
- 🧠 Central nervous system connecting every machine and sensor in a smart factory
- 🌐 Digital ecosystem that transforms traditional manufacturing into intelligent operations
- 🔧 Command center orchestrating seamless communication between all industrial components
🎯 What You Need
Before we start, you need:
- ✅ Alpine Linux system with adequate resources (4GB+ RAM recommended for industrial workloads)
- ✅ Understanding of industrial protocols (MQTT, OPC-UA, Modbus) and networking concepts
- ✅ Knowledge of sensor technologies, PLCs, and industrial automation systems
- ✅ Familiarity with database management and time-series data handling
📋 Step 1: Install Industrial IoT Foundation
Install Core IoT Development Stack
Let’s set up the essential Industrial IoT infrastructure! 😊
What we’re doing: Installing MQTT brokers, time-series databases, and industrial protocol libraries for comprehensive IoT platform development.
# Update package list
apk update
# Install core development tools
apk add build-base cmake make gcc g++
apk add python3 python3-dev py3-pip
apk add nodejs npm
apk add git curl wget
# Install MQTT broker (Eclipse Mosquitto)
apk add mosquitto mosquitto-clients mosquitto-dev
# Install database systems
apk add postgresql postgresql-dev postgresql-client
apk add redis redis-dev
apk add sqlite sqlite-dev
# Install time-series database (InfluxDB)
# Note: InfluxDB may need to be installed from binary
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.4-linux-amd64.tar.gz
tar xvfz influxdb2-2.7.4-linux-amd64.tar.gz
sudo mv influxdb2-2.7.4-linux-amd64/influxd /usr/local/bin/
sudo mv influxdb2-2.7.4-linux-amd64/influx /usr/local/bin/
# Install networking and communication libraries
apk add libmodbus-dev
apk add openssl-dev libssl-dev
apk add libzmq zeromq-dev
# Install data processing libraries
apk add librdkafka-dev
apk add protobuf-dev
# Install monitoring and visualization tools
apk add grafana
apk add prometheus node-exporter
# Create IoT development workspace
mkdir -p ~/IndustrialIoT/{platforms,protocols,sensors,data,configs,scripts}
mkdir -p ~/IndustrialIoT/data/{raw,processed,analytics,reports}
# Verify installations
mosquitto_sub --help | head -1
influx version
grafana-server --version
echo "Industrial IoT foundation installed! 🏭"
What this does: 📖 Installs complete Industrial IoT development stack with messaging, databases, and monitoring tools.
Example output:
mosquitto_sub version 2.0.15 running on libmosquitto 2.0.15.
influx 2.7.4 (git: 5ead1e4) build_date: 2023-12-15T16:46:02Z
Version 10.2.3 (commit: d66ad958da0, branch: HEAD)
Industrial IoT foundation installed! 🏭
What this means: Industrial IoT development environment is ready! ✅
Configure Industrial Communication Protocols
Let’s set up industrial protocol support! 🔗
What we’re doing: Installing and configuring support for Modbus, OPC-UA, and other industrial communication protocols.
# Install Python industrial protocol libraries
pip3 install pymodbus opcua-client
pip3 install paho-mqtt influxdb-client
pip3 install modbus-tk pyserial
# Install Node.js industrial libraries
cd ~/IndustrialIoT/platforms
npm init -y
npm install node-opcua mqtt modbus-serial
npm install influx pg redis ws
npm install express socket.io cors
# Create Modbus configuration
cat > ~/IndustrialIoT/configs/modbus_config.json << 'EOF'
{
"modbus_devices": [
{
"name": "Temperature_Sensor_1",
"address": "192.168.1.100",
"port": 502,
"unit_id": 1,
"registers": {
"temperature": {
"address": 30001,
"type": "input_register",
"data_type": "float",
"scale": 0.1,
"unit": "°C"
},
"humidity": {
"address": 30002,
"type": "input_register",
"data_type": "float",
"scale": 0.1,
"unit": "%"
}
},
"poll_interval": 5000
},
{
"name": "Motor_Controller_1",
"address": "192.168.1.101",
"port": 502,
"unit_id": 2,
"registers": {
"motor_speed": {
"address": 40001,
"type": "holding_register",
"data_type": "uint16",
"scale": 1,
"unit": "RPM"
},
"motor_status": {
"address": 40002,
"type": "holding_register",
"data_type": "bool",
"scale": 1,
"unit": "status"
}
},
"poll_interval": 2000
}
]
}
EOF
# Create OPC-UA configuration
cat > ~/IndustrialIoT/configs/opcua_config.json << 'EOF'
{
"opcua_servers": [
{
"name": "Production_Line_1",
"endpoint": "opc.tcp://192.168.1.200:4840",
"security_policy": "None",
"security_mode": "None",
"nodes": [
{
"node_id": "ns=2;i=1001",
"name": "ProductionRate",
"data_type": "Double",
"unit": "units/hour"
},
{
"node_id": "ns=2;i=1002",
"name": "QualityScore",
"data_type": "Double",
"unit": "percentage"
},
{
"node_id": "ns=2;i=1003",
"name": "MachineState",
"data_type": "Int32",
"unit": "state_code"
}
],
"subscription_interval": 1000
}
]
}
EOF
# Create MQTT configuration
cat > ~/IndustrialIoT/configs/mqtt_config.json << 'EOF'
{
"mqtt_broker": {
"host": "localhost",
"port": 1883,
"keepalive": 60,
"client_id": "industrial_iot_platform",
"username": "iot_user",
"password": "iot_password"
},
"topics": {
"sensor_data": "factory/sensors/+/data",
"machine_status": "factory/machines/+/status",
"production_metrics": "factory/production/metrics",
"alarms": "factory/alarms/+",
"commands": "factory/commands/+"
},
"quality_of_service": 1,
"retain_messages": true
}
EOF
echo "Industrial communication protocols configured! 🔗"
What this does: 📖 Sets up comprehensive support for industrial communication protocols with proper configuration.
📋 Step 2: Create Data Collection and Processing Engine
Build Multi-Protocol Data Collector
Let’s create a robust data collection system! 📊
What we’re doing: Developing a multi-protocol data collector that can simultaneously gather data from Modbus, OPC-UA, and MQTT sources.
# Create Python data collector
cat > ~/IndustrialIoT/platforms/data_collector.py << 'EOF'
#!/usr/bin/env python3
"""
Industrial IoT Data Collector
Multi-protocol data collection engine for industrial systems
"""
import json
import time
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from concurrent.futures import ThreadPoolExecutor
import threading
# Protocol libraries
from pymodbus.client.sync import ModbusTcpClient
from opcua import Client as OPCUAClient
import paho.mqtt.client as mqtt
# Database libraries
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import redis
import psycopg2
from psycopg2.extras import RealDictCursor
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/industrial_iot.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class IndustrialDataCollector:
def __init__(self, config_path: str = "configs"):
self.config_path = config_path
self.load_configurations()
self.init_databases()
self.running = False
self.data_buffer = []
self.buffer_lock = threading.Lock()
def load_configurations(self):
"""Load all configuration files"""
try:
with open(f"{self.config_path}/modbus_config.json", 'r') as f:
self.modbus_config = json.load(f)
with open(f"{self.config_path}/opcua_config.json", 'r') as f:
self.opcua_config = json.load(f)
with open(f"{self.config_path}/mqtt_config.json", 'r') as f:
self.mqtt_config = json.load(f)
logger.info("Configuration files loaded successfully")
except Exception as e:
logger.error(f"Failed to load configurations: {e}")
raise
def init_databases(self):
"""Initialize database connections"""
try:
# InfluxDB for time-series data
self.influx_client = InfluxDBClient(
url="http://localhost:8086",
token="your-influxdb-token",
org="industrial-iot"
)
self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS)
# Redis for caching and real-time data
self.redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
# PostgreSQL for configuration and metadata
self.pg_conn = psycopg2.connect(
host="localhost",
database="industrial_iot",
user="iot_user",
password="iot_password"
)
logger.info("Database connections initialized")
except Exception as e:
logger.error(f"Failed to initialize databases: {e}")
raise
def collect_modbus_data(self, device_config: Dict) -> List[Dict]:
"""Collect data from Modbus devices"""
collected_data = []
try:
client = ModbusTcpClient(
device_config['address'],
port=device_config['port']
)
if client.connect():
logger.info(f"Connected to Modbus device: {device_config['name']}")
for reg_name, reg_config in device_config['registers'].items():
try:
address = reg_config['address']
unit_id = device_config['unit_id']
if reg_config['type'] == 'input_register':
result = client.read_input_registers(address, 1, unit=unit_id)
elif reg_config['type'] == 'holding_register':
result = client.read_holding_registers(address, 1, unit=unit_id)
else:
continue
if not result.isError():
raw_value = result.registers[0]
scaled_value = raw_value * reg_config.get('scale', 1)
data_point = {
'device': device_config['name'],
'parameter': reg_name,
'value': scaled_value,
'unit': reg_config.get('unit', ''),
'timestamp': datetime.now(timezone.utc).isoformat(),
'protocol': 'modbus',
'quality': 'good'
}
collected_data.append(data_point)
else:
logger.error(f"Error reading {reg_name}: {result}")
except Exception as e:
logger.error(f"Error reading register {reg_name}: {e}")
client.close()
else:
logger.error(f"Failed to connect to Modbus device: {device_config['name']}")
except Exception as e:
logger.error(f"Modbus collection error for {device_config['name']}: {e}")
return collected_data
def collect_opcua_data(self, server_config: Dict) -> List[Dict]:
"""Collect data from OPC-UA servers"""
collected_data = []
try:
client = OPCUAClient(server_config['endpoint'])
client.connect()
logger.info(f"Connected to OPC-UA server: {server_config['name']}")
for node_config in server_config['nodes']:
try:
node = client.get_node(node_config['node_id'])
value = node.get_value()
data_point = {
'device': server_config['name'],
'parameter': node_config['name'],
'value': value,
'unit': node_config.get('unit', ''),
'timestamp': datetime.now(timezone.utc).isoformat(),
'protocol': 'opcua',
'quality': 'good'
}
collected_data.append(data_point)
except Exception as e:
logger.error(f"Error reading OPC-UA node {node_config['name']}: {e}")
client.disconnect()
except Exception as e:
logger.error(f"OPC-UA collection error for {server_config['name']}: {e}")
return collected_data
def setup_mqtt_client(self):
"""Set up MQTT client for data collection"""
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to MQTT broker")
# Subscribe to all configured topics
for topic_name, topic_pattern in self.mqtt_config['topics'].items():
client.subscribe(topic_pattern, qos=self.mqtt_config['quality_of_service'])
logger.info(f"Subscribed to topic: {topic_pattern}")
else:
logger.error(f"Failed to connect to MQTT broker: {rc}")
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
data_point = {
'device': msg.topic.split('/')[2] if len(msg.topic.split('/')) > 2 else 'unknown',
'parameter': payload.get('parameter', 'mqtt_data'),
'value': payload.get('value'),
'unit': payload.get('unit', ''),
'timestamp': payload.get('timestamp', datetime.now(timezone.utc).isoformat()),
'protocol': 'mqtt',
'quality': payload.get('quality', 'good'),
'topic': msg.topic
}
with self.buffer_lock:
self.data_buffer.append(data_point)
logger.debug(f"Received MQTT data: {data_point}")
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
self.mqtt_client = mqtt.Client(self.mqtt_config['mqtt_broker']['client_id'])
self.mqtt_client.on_connect = on_connect
self.mqtt_client.on_message = on_message
if self.mqtt_config['mqtt_broker'].get('username'):
self.mqtt_client.username_pw_set(
self.mqtt_config['mqtt_broker']['username'],
self.mqtt_config['mqtt_broker']['password']
)
def store_data(self, data_points: List[Dict]):
"""Store collected data in databases"""
if not data_points:
return
try:
# Store in InfluxDB (time-series database)
influx_points = []
for point_data in data_points:
point = Point("sensor_data") \
.tag("device", point_data['device']) \
.tag("parameter", point_data['parameter']) \
.tag("protocol", point_data['protocol']) \
.field("value", float(point_data['value'])) \
.field("quality", point_data['quality']) \
.time(point_data['timestamp'])
influx_points.append(point)
self.write_api.write(bucket="industrial_data", record=influx_points)
# Store latest values in Redis for real-time access
for point_data in data_points:
key = f"{point_data['device']}:{point_data['parameter']}"
value = {
'value': point_data['value'],
'unit': point_data['unit'],
'timestamp': point_data['timestamp'],
'quality': point_data['quality']
}
self.redis_client.hset(key, mapping=value)
self.redis_client.expire(key, 3600) # Expire after 1 hour
logger.info(f"Stored {len(data_points)} data points")
except Exception as e:
logger.error(f"Error storing data: {e}")
def start_collection(self):
"""Start the data collection process"""
self.running = True
logger.info("Starting Industrial IoT data collection")
# Set up MQTT client
self.setup_mqtt_client()
self.mqtt_client.connect(
self.mqtt_config['mqtt_broker']['host'],
self.mqtt_config['mqtt_broker']['port'],
self.mqtt_config['mqtt_broker']['keepalive']
)
self.mqtt_client.loop_start()
# Create thread pool for concurrent data collection
with ThreadPoolExecutor(max_workers=10) as executor:
while self.running:
try:
all_data = []
# Collect Modbus data
modbus_futures = []
for device in self.modbus_config['modbus_devices']:
future = executor.submit(self.collect_modbus_data, device)
modbus_futures.append(future)
# Collect OPC-UA data
opcua_futures = []
for server in self.opcua_config['opcua_servers']:
future = executor.submit(self.collect_opcua_data, server)
opcua_futures.append(future)
# Gather results
for future in modbus_futures + opcua_futures:
try:
data = future.result(timeout=30)
all_data.extend(data)
except Exception as e:
logger.error(f"Error in data collection future: {e}")
# Process MQTT data from buffer
with self.buffer_lock:
all_data.extend(self.data_buffer)
self.data_buffer.clear()
# Store all collected data
if all_data:
self.store_data(all_data)
# Wait before next collection cycle
time.sleep(5)
except KeyboardInterrupt:
logger.info("Stopping data collection...")
self.running = False
except Exception as e:
logger.error(f"Error in collection loop: {e}")
time.sleep(10) # Wait before retrying
# Cleanup
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
logger.info("Data collection stopped")
def stop_collection(self):
"""Stop the data collection process"""
self.running = False
if __name__ == "__main__":
collector = IndustrialDataCollector("../configs")
collector.start_collection()
EOF
chmod +x ~/IndustrialIoT/platforms/data_collector.py
echo "Multi-protocol data collector created! 📊"
What this does: 📖 Creates a comprehensive data collection engine that can simultaneously gather data from multiple industrial protocols.
Create Data Processing and Analytics Engine
Let’s build real-time data processing capabilities! ⚡
What we’re doing: Developing analytics engine for real-time processing, anomaly detection, and predictive maintenance calculations.
# Create analytics engine
cat > ~/IndustrialIoT/platforms/analytics_engine.py << 'EOF'
#!/usr/bin/env python3
"""
Industrial IoT Analytics Engine
Real-time analytics, anomaly detection, and predictive maintenance
"""
import json
import time
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
import logging
from dataclasses import dataclass
from enum import Enum
# Machine learning libraries
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
import joblib
# Database libraries
from influxdb_client import InfluxDBClient
import redis
# Configure logging
logger = logging.getLogger(__name__)
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Alert:
device: str
parameter: str
severity: AlertSeverity
message: str
value: float
threshold: float
timestamp: datetime
alert_id: str
class AnalyticsEngine:
def __init__(self):
self.init_databases()
self.load_models()
self.thresholds = self.load_thresholds()
self.running = False
def init_databases(self):
"""Initialize database connections"""
try:
self.influx_client = InfluxDBClient(
url="http://localhost:8086",
token="your-influxdb-token",
org="industrial-iot"
)
self.redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
logger.info("Analytics engine database connections initialized")
except Exception as e:
logger.error(f"Failed to initialize databases: {e}")
raise
def load_models(self):
"""Load machine learning models for predictive analytics"""
try:
# Initialize anomaly detection models
self.anomaly_models = {}
self.scalers = {}
# For each device/parameter combination, we'll have separate models
# This is a simplified example - in production, you'd load pre-trained models
self.default_anomaly_model = IsolationForest(
contamination=0.1,
random_state=42
)
logger.info("Analytics models loaded")
except Exception as e:
logger.error(f"Failed to load models: {e}")
def load_thresholds(self) -> Dict:
"""Load alert thresholds configuration"""
try:
with open('../configs/alert_thresholds.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
# Default thresholds if config file doesn't exist
return {
"temperature": {
"min": -10,
"max": 100,
"critical_max": 120
},
"humidity": {
"min": 0,
"max": 95,
"critical_max": 100
},
"motor_speed": {
"min": 0,
"max": 3000,
"critical_max": 3500
},
"vibration": {
"max": 10,
"critical_max": 15
}
}
def get_recent_data(self, device: str, parameter: str, hours: int = 24) -> pd.DataFrame:
"""Retrieve recent data for analysis"""
try:
query = f'''
from(bucket: "industrial_data")
|> range(start: -{hours}h)
|> filter(fn: (r) => r["device"] == "{device}")
|> filter(fn: (r) => r["parameter"] == "{parameter}")
|> filter(fn: (r) => r["_field"] == "value")
'''
result = self.influx_client.query_api().query_data_frame(query)
if not result.empty:
result['_time'] = pd.to_datetime(result['_time'])
return result[['_time', '_value']].rename(columns={'_time': 'timestamp', '_value': 'value'})
else:
return pd.DataFrame(columns=['timestamp', 'value'])
except Exception as e:
logger.error(f"Error retrieving data for {device}:{parameter}: {e}")
return pd.DataFrame(columns=['timestamp', 'value'])
def detect_anomalies(self, device: str, parameter: str, current_value: float) -> Tuple[bool, float]:
"""Detect anomalies using statistical and ML methods"""
try:
# Get historical data
historical_data = self.get_recent_data(device, parameter, hours=168) # 1 week
if len(historical_data) < 50: # Need minimum data for analysis
return False, 0.0
values = historical_data['value'].values
# Statistical anomaly detection (Z-score)
mean_val = np.mean(values)
std_val = np.std(values)
z_score = abs((current_value - mean_val) / std_val) if std_val > 0 else 0
# Machine learning anomaly detection
model_key = f"{device}:{parameter}"
if model_key not in self.anomaly_models:
# Train new model if we have enough data
if len(values) >= 100:
scaler = StandardScaler()
scaled_data = scaler.fit_transform(values.reshape(-1, 1))
model = IsolationForest(contamination=0.1, random_state=42)
model.fit(scaled_data)
self.anomaly_models[model_key] = model
self.scalers[model_key] = scaler
else:
# Use statistical method only
return z_score > 3, z_score
# Use trained model
scaler = self.scalers[model_key]
model = self.anomaly_models[model_key]
scaled_current = scaler.transform([[current_value]])
anomaly_score = model.decision_function(scaled_current)[0]
is_anomaly = model.predict(scaled_current)[0] == -1
# Combine statistical and ML results
combined_score = max(z_score / 3, abs(anomaly_score))
is_anomalous = is_anomaly or z_score > 3
return is_anomalous, combined_score
except Exception as e:
logger.error(f"Error in anomaly detection for {device}:{parameter}: {e}")
return False, 0.0
def check_thresholds(self, device: str, parameter: str, value: float) -> List[Alert]:
"""Check if values exceed defined thresholds"""
alerts = []
try:
# Get threshold configuration for this parameter
param_thresholds = self.thresholds.get(parameter.lower())
if not param_thresholds:
return alerts
alert_id = f"{device}_{parameter}_{int(time.time())}"
current_time = datetime.now()
# Check critical maximum
if 'critical_max' in param_thresholds and value > param_thresholds['critical_max']:
alert = Alert(
device=device,
parameter=parameter,
severity=AlertSeverity.CRITICAL,
message=f"Critical maximum threshold exceeded: {value} > {param_thresholds['critical_max']}",
value=value,
threshold=param_thresholds['critical_max'],
timestamp=current_time,
alert_id=alert_id + "_crit_max"
)
alerts.append(alert)
# Check maximum
elif 'max' in param_thresholds and value > param_thresholds['max']:
alert = Alert(
device=device,
parameter=parameter,
severity=AlertSeverity.HIGH,
message=f"Maximum threshold exceeded: {value} > {param_thresholds['max']}",
value=value,
threshold=param_thresholds['max'],
timestamp=current_time,
alert_id=alert_id + "_max"
)
alerts.append(alert)
# Check minimum
if 'min' in param_thresholds and value < param_thresholds['min']:
alert = Alert(
device=device,
parameter=parameter,
severity=AlertSeverity.HIGH,
message=f"Minimum threshold exceeded: {value} < {param_thresholds['min']}",
value=value,
threshold=param_thresholds['min'],
timestamp=current_time,
alert_id=alert_id + "_min"
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking thresholds for {device}:{parameter}: {e}")
return alerts
def calculate_oee(self, device: str, hours: int = 24) -> Dict:
"""Calculate Overall Equipment Effectiveness (OEE)"""
try:
# Get production data
production_data = self.get_recent_data(device, "production_rate", hours)
status_data = self.get_recent_data(device, "machine_status", hours)
quality_data = self.get_recent_data(device, "quality_score", hours)
if production_data.empty or status_data.empty:
return {"oee": 0, "availability": 0, "performance": 0, "quality": 100}
# Calculate Availability (uptime / planned production time)
total_time = hours * 60 # minutes
downtime = len(status_data[status_data['value'] == 0]) * 5 # 5-minute intervals
availability = ((total_time - downtime) / total_time) * 100
# Calculate Performance (actual production rate / ideal rate)
ideal_rate = 100 # units per hour (configurable)
actual_rate = production_data['value'].mean() if not production_data.empty else 0
performance = min((actual_rate / ideal_rate) * 100, 100) if ideal_rate > 0 else 0
# Calculate Quality (good parts / total parts)
quality = quality_data['value'].mean() if not quality_data.empty else 100
# Calculate OEE
oee = (availability * performance * quality) / 10000 # Convert to percentage
return {
"oee": round(oee, 2),
"availability": round(availability, 2),
"performance": round(performance, 2),
"quality": round(quality, 2),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error calculating OEE for {device}: {e}")
return {"oee": 0, "availability": 0, "performance": 0, "quality": 100}
def predict_maintenance(self, device: str) -> Dict:
"""Predict maintenance needs based on sensor data trends"""
try:
# Get multiple sensor readings for the device
parameters = ['temperature', 'vibration', 'motor_speed', 'pressure']
sensor_data = {}
for param in parameters:
data = self.get_recent_data(device, param, hours=168) # 1 week
if not data.empty:
sensor_data[param] = data
if not sensor_data:
return {"maintenance_score": 0, "days_until_maintenance": 365, "recommendations": []}
# Calculate maintenance score based on trends and anomalies
maintenance_score = 0
recommendations = []
for param, data in sensor_data.items():
if len(data) < 10:
continue
values = data['value'].values
# Check for increasing trend (potential wear)
if param in ['temperature', 'vibration']:
slope, _, r_value, _, _ = stats.linregress(range(len(values)), values)
if slope > 0 and r_value > 0.5: # Significant increasing trend
maintenance_score += 20
recommendations.append(f"Increasing {param} trend detected - check {device}")
# Check for decreasing efficiency
if param in ['motor_speed', 'pressure']:
slope, _, r_value, _, _ = stats.linregress(range(len(values)), values)
if slope < 0 and r_value > 0.5: # Significant decreasing trend
maintenance_score += 15
recommendations.append(f"Decreasing {param} efficiency - service {device}")
# Check recent anomalies
recent_values = values[-10:] # Last 10 readings
for value in recent_values:
is_anomaly, score = self.detect_anomalies(device, param, value)
if is_anomaly:
maintenance_score += 10
# Convert score to days until maintenance
max_score = 100
maintenance_score = min(maintenance_score, max_score)
days_until_maintenance = max(1, int(30 * (1 - maintenance_score / max_score)))
return {
"maintenance_score": maintenance_score,
"days_until_maintenance": days_until_maintenance,
"recommendations": recommendations,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error predicting maintenance for {device}: {e}")
return {"maintenance_score": 0, "days_until_maintenance": 365, "recommendations": []}
def process_real_time_data(self):
"""Process real-time data from Redis"""
try:
# Get all current sensor data from Redis
keys = self.redis_client.keys("*:*")
for key in keys:
try:
device, parameter = key.split(':', 1)
data = self.redis_client.hgetall(key)
if not data or 'value' not in data:
continue
current_value = float(data['value'])
# Check thresholds
threshold_alerts = self.check_thresholds(device, parameter, current_value)
# Check for anomalies
is_anomaly, anomaly_score = self.detect_anomalies(device, parameter, current_value)
if is_anomaly:
anomaly_alert = Alert(
device=device,
parameter=parameter,
severity=AlertSeverity.MEDIUM,
message=f"Anomaly detected: {parameter} = {current_value} (score: {anomaly_score:.2f})",
value=current_value,
threshold=anomaly_score,
timestamp=datetime.now(),
alert_id=f"{device}_{parameter}_anomaly_{int(time.time())}"
)
threshold_alerts.append(anomaly_alert)
# Publish alerts
for alert in threshold_alerts:
self.publish_alert(alert)
# Calculate and store analytics
if parameter in ['production_rate', 'machine_status']:
oee = self.calculate_oee(device)
self.store_analytics(device, "oee", oee)
# Predictive maintenance (run less frequently)
if int(time.time()) % 300 == 0: # Every 5 minutes
maintenance = self.predict_maintenance(device)
self.store_analytics(device, "maintenance_prediction", maintenance)
except Exception as e:
logger.error(f"Error processing data for key {key}: {e}")
except Exception as e:
logger.error(f"Error in real-time data processing: {e}")
def publish_alert(self, alert: Alert):
"""Publish alert to notification systems"""
try:
alert_data = {
"device": alert.device,
"parameter": alert.parameter,
"severity": alert.severity.value,
"message": alert.message,
"value": alert.value,
"threshold": alert.threshold,
"timestamp": alert.timestamp.isoformat(),
"alert_id": alert.alert_id
}
# Store in Redis for real-time access
self.redis_client.lpush("alerts", json.dumps(alert_data))
self.redis_client.ltrim("alerts", 0, 999) # Keep last 1000 alerts
# Log the alert
logger.warning(f"ALERT [{alert.severity.value.upper()}] {alert.device}: {alert.message}")
except Exception as e:
logger.error(f"Error publishing alert: {e}")
def store_analytics(self, device: str, metric_type: str, data: Dict):
"""Store analytics results"""
try:
# Store in Redis for real-time access
key = f"analytics:{device}:{metric_type}"
self.redis_client.hset(key, mapping=data)
self.redis_client.expire(key, 3600) # Expire after 1 hour
logger.debug(f"Stored analytics for {device}: {metric_type}")
except Exception as e:
logger.error(f"Error storing analytics: {e}")
def start_analytics(self):
"""Start the analytics engine"""
self.running = True
logger.info("Starting Industrial IoT analytics engine")
while self.running:
try:
self.process_real_time_data()
time.sleep(10) # Process every 10 seconds
except KeyboardInterrupt:
logger.info("Stopping analytics engine...")
self.running = False
except Exception as e:
logger.error(f"Error in analytics loop: {e}")
time.sleep(30) # Wait before retrying
logger.info("Analytics engine stopped")
def stop_analytics(self):
"""Stop the analytics engine"""
self.running = False
if __name__ == "__main__":
engine = AnalyticsEngine()
engine.start_analytics()
EOF
# Create alert thresholds configuration
cat > ~/IndustrialIoT/configs/alert_thresholds.json << 'EOF'
{
"temperature": {
"min": -5,
"max": 85,
"critical_max": 100,
"unit": "°C"
},
"humidity": {
"min": 10,
"max": 90,
"critical_max": 95,
"unit": "%"
},
"motor_speed": {
"min": 100,
"max": 2800,
"critical_max": 3200,
"unit": "RPM"
},
"pressure": {
"min": 1,
"max": 10,
"critical_max": 12,
"unit": "bar"
},
"vibration": {
"max": 8,
"critical_max": 12,
"unit": "mm/s"
},
"production_rate": {
"min": 10,
"max": 150,
"unit": "units/hour"
},
"quality_score": {
"min": 80,
"critical_min": 70,
"unit": "%"
}
}
EOF
chmod +x ~/IndustrialIoT/platforms/analytics_engine.py
echo "Analytics engine and data processing created! ⚡"
What this does: 📖 Creates a comprehensive analytics engine with anomaly detection, predictive maintenance, and real-time processing capabilities.
📋 Step 3: Build Web Dashboard and API
Create Industrial IoT Web Dashboard
Let’s build a comprehensive web interface! 🌐
What we’re doing: Creating a real-time web dashboard for monitoring industrial systems, displaying metrics, alerts, and analytics.
# Create Node.js web dashboard
cd ~/IndustrialIoT/platforms
# Initialize Node.js project for dashboard
npm init -y
# Install web framework dependencies
npm install express socket.io cors
npm install ejs multer helmet morgan
npm install redis ioredis influx
npm install mqtt ws
# Create dashboard server
cat > dashboard_server.js << 'EOF'
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const cors = require('cors');
const path = require('path');
const Redis = require('ioredis');
const mqtt = require('mqtt');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: {
origin: "*",
methods: ["GET", "POST"]
}
});
// Middleware
app.use(cors());
app.use(express.json());
app.use(express.static(path.join(__dirname, 'public')));
app.set('view engine', 'ejs');
app.set('views', path.join(__dirname, 'views'));
// Initialize Redis connection
const redis = new Redis({
host: 'localhost',
port: 6379,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3
});
// Initialize MQTT client
const mqttClient = mqtt.connect('mqtt://localhost:1883');
// Store connected clients
const connectedClients = new Set();
// Real-time data cache
let realtimeData = {};
let currentAlerts = [];
let analyticsData = {};
// Socket.IO connection handling
io.on('connection', (socket) => {
console.log('Client connected:', socket.id);
connectedClients.add(socket);
// Send initial data to new client
socket.emit('initial-data', {
realtimeData,
alerts: currentAlerts,
analytics: analyticsData
});
socket.on('disconnect', () => {
console.log('Client disconnected:', socket.id);
connectedClients.delete(socket);
});
// Handle client requests
socket.on('request-device-data', async (deviceName) => {
try {
const deviceData = await getDeviceData(deviceName);
socket.emit('device-data', { device: deviceName, data: deviceData });
} catch (error) {
console.error('Error fetching device data:', error);
socket.emit('error', { message: 'Failed to fetch device data' });
}
});
});
// Routes
app.get('/', (req, res) => {
res.render('dashboard', {
title: 'Industrial IoT Dashboard',
devices: Object.keys(realtimeData)
});
});
app.get('/api/devices', async (req, res) => {
try {
const devices = await getAllDevices();
res.json(devices);
} catch (error) {
console.error('Error fetching devices:', error);
res.status(500).json({ error: 'Failed to fetch devices' });
}
});
app.get('/api/devices/:deviceName', async (req, res) => {
try {
const deviceData = await getDeviceData(req.params.deviceName);
res.json(deviceData);
} catch (error) {
console.error('Error fetching device data:', error);
res.status(500).json({ error: 'Failed to fetch device data' });
}
});
app.get('/api/alerts', async (req, res) => {
try {
const alerts = await getAlerts();
res.json(alerts);
} catch (error) {
console.error('Error fetching alerts:', error);
res.status(500).json({ error: 'Failed to fetch alerts' });
}
});
app.get('/api/analytics/:deviceName', async (req, res) => {
try {
const analytics = await getDeviceAnalytics(req.params.deviceName);
res.json(analytics);
} catch (error) {
console.error('Error fetching analytics:', error);
res.status(500).json({ error: 'Failed to fetch analytics' });
}
});
// Data fetching functions
async function getAllDevices() {
try {
const keys = await redis.keys('*:*');
const devices = {};
for (const key of keys) {
if (!key.includes('analytics:') && !key.includes('alerts')) {
const [device, parameter] = key.split(':', 2);
if (!devices[device]) {
devices[device] = {};
}
const data = await redis.hgetall(key);
devices[device][parameter] = data;
}
}
return devices;
} catch (error) {
console.error('Error getting all devices:', error);
return {};
}
}
async function getDeviceData(deviceName) {
try {
const keys = await redis.keys(`${deviceName}:*`);
const deviceData = {};
for (const key of keys) {
const parameter = key.split(':', 2)[1];
const data = await redis.hgetall(key);
deviceData[parameter] = data;
}
return deviceData;
} catch (error) {
console.error(`Error getting data for device ${deviceName}:`, error);
return {};
}
}
async function getAlerts() {
try {
const alertsData = await redis.lrange('alerts', 0, 99); // Get last 100 alerts
return alertsData.map(alert => JSON.parse(alert));
} catch (error) {
console.error('Error getting alerts:', error);
return [];
}
}
async function getDeviceAnalytics(deviceName) {
try {
const analyticsKeys = await redis.keys(`analytics:${deviceName}:*`);
const analytics = {};
for (const key of analyticsKeys) {
const metricType = key.split(':')[2];
const data = await redis.hgetall(key);
analytics[metricType] = data;
}
return analytics;
} catch (error) {
console.error(`Error getting analytics for device ${deviceName}:`, error);
return {};
}
}
// Real-time data monitoring
async function monitorRealtimeData() {
try {
// Update realtime data cache
realtimeData = await getAllDevices();
// Update alerts cache
currentAlerts = await getAlerts();
// Update analytics cache
const devices = Object.keys(realtimeData);
for (const device of devices) {
analyticsData[device] = await getDeviceAnalytics(device);
}
// Broadcast to all connected clients
io.emit('realtime-update', {
data: realtimeData,
alerts: currentAlerts.slice(0, 10), // Send only recent alerts
analytics: analyticsData
});
} catch (error) {
console.error('Error monitoring realtime data:', error);
}
}
// Start monitoring
setInterval(monitorRealtimeData, 5000); // Update every 5 seconds
// MQTT message handling
mqttClient.on('connect', () => {
console.log('Connected to MQTT broker');
mqttClient.subscribe('factory/+/+/data');
mqttClient.subscribe('factory/alerts/+');
});
mqttClient.on('message', (topic, message) => {
try {
const data = JSON.parse(message.toString());
if (topic.includes('alerts')) {
// Handle alert messages
currentAlerts.unshift(data);
if (currentAlerts.length > 1000) {
currentAlerts = currentAlerts.slice(0, 1000);
}
io.emit('new-alert', data);
} else {
// Handle sensor data messages
const topicParts = topic.split('/');
if (topicParts.length >= 4) {
const device = topicParts[1];
const parameter = topicParts[2];
if (!realtimeData[device]) {
realtimeData[device] = {};
}
realtimeData[device][parameter] = data;
io.emit('sensor-update', {
device,
parameter,
data
});
}
}
} catch (error) {
console.error('Error processing MQTT message:', error);
}
});
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`🌐 Industrial IoT Dashboard running on port ${PORT}`);
console.log(`📊 Dashboard URL: http://localhost:${PORT}`);
});
module.exports = app;
EOF
# Create dashboard views directory
mkdir -p views public/{css,js,images}
# Create main dashboard template
cat > views/dashboard.ejs << 'EOF'
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title><%= title %></title>
<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
<link href="/css/dashboard.css" rel="stylesheet">
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script src="/socket.io/socket.io.js"></script>
</head>
<body class="bg-dark text-light">
<nav class="navbar navbar-expand-lg navbar-dark bg-primary">
<div class="container-fluid">
<a class="navbar-brand" href="#"><i class="fas fa-industry"></i> <%= title %></a>
<div class="navbar-nav ms-auto">
<span class="navbar-text">
<i class="fas fa-circle text-success me-2"></i>Connected
</span>
</div>
</div>
</nav>
<div class="container-fluid mt-4">
<!-- System Overview -->
<div class="row mb-4">
<div class="col-md-3">
<div class="card bg-secondary">
<div class="card-body">
<h5 class="card-title"><i class="fas fa-microchip"></i> Active Devices</h5>
<h2 class="text-primary" id="active-devices">0</h2>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card bg-secondary">
<div class="card-body">
<h5 class="card-title"><i class="fas fa-exclamation-triangle"></i> Active Alerts</h5>
<h2 class="text-warning" id="active-alerts">0</h2>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card bg-secondary">
<div class="card-body">
<h5 class="card-title"><i class="fas fa-chart-line"></i> Avg OEE</h5>
<h2 class="text-success" id="average-oee">0%</h2>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card bg-secondary">
<div class="card-body">
<h5 class="card-title"><i class="fas fa-cog"></i> System Status</h5>
<h2 class="text-success" id="system-status">Online</h2>
</div>
</div>
</div>
</div>
<!-- Main Content -->
<div class="row">
<!-- Device List -->
<div class="col-md-3">
<div class="card bg-secondary">
<div class="card-header">
<h5><i class="fas fa-list"></i> Devices</h5>
</div>
<div class="card-body">
<div id="device-list" class="list-group list-group-flush">
<!-- Devices will be populated by JavaScript -->
</div>
</div>
</div>
</div>
<!-- Real-time Charts -->
<div class="col-md-6">
<div class="card bg-secondary">
<div class="card-header">
<h5><i class="fas fa-chart-area"></i> Real-time Monitoring</h5>
</div>
<div class="card-body">
<canvas id="realtime-chart" height="300"></canvas>
</div>
</div>
</div>
<!-- Alerts Panel -->
<div class="col-md-3">
<div class="card bg-secondary">
<div class="card-header">
<h5><i class="fas fa-bell"></i> Recent Alerts</h5>
</div>
<div class="card-body">
<div id="alerts-list" style="max-height: 400px; overflow-y: auto;">
<!-- Alerts will be populated by JavaScript -->
</div>
</div>
</div>
</div>
</div>
<!-- Device Details -->
<div class="row mt-4">
<div class="col-12">
<div class="card bg-secondary">
<div class="card-header">
<h5><i class="fas fa-info-circle"></i> Device Details</h5>
</div>
<div class="card-body">
<div id="device-details">
<p class="text-muted">Select a device to view details</p>
</div>
</div>
</div>
</div>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.bundle.min.js"></script>
<script src="/js/dashboard.js"></script>
</body>
</html>
EOF
# Create dashboard CSS
cat > public/css/dashboard.css << 'EOF'
.card {
border: none;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.3);
}
.card-body h2 {
font-weight: bold;
margin-bottom: 0;
}
.device-item {
cursor: pointer;
transition: background-color 0.3s;
}
.device-item:hover {
background-color: rgba(108, 117, 125, 0.3);
}
.device-item.active {
background-color: rgba(13, 110, 253, 0.3);
}
.alert-item {
border-left: 4px solid;
padding: 8px 12px;
margin-bottom: 8px;
border-radius: 4px;
}
.alert-critical {
border-left-color: #dc3545;
background-color: rgba(220, 53, 69, 0.1);
}
.alert-high {
border-left-color: #fd7e14;
background-color: rgba(253, 126, 20, 0.1);
}
.alert-medium {
border-left-color: #ffc107;
background-color: rgba(255, 193, 7, 0.1);
}
.alert-low {
border-left-color: #20c997;
background-color: rgba(32, 201, 151, 0.1);
}
.parameter-card {
background-color: rgba(108, 117, 125, 0.2);
border-radius: 8px;
padding: 15px;
margin-bottom: 10px;
}
.parameter-value {
font-size: 1.5em;
font-weight: bold;
}
.status-indicator {
width: 10px;
height: 10px;
border-radius: 50%;
display: inline-block;
margin-right: 8px;
}
.status-online {
background-color: #28a745;
}
.status-offline {
background-color: #dc3545;
}
.status-warning {
background-color: #ffc107;
}
EOF
echo "Industrial IoT web dashboard created! 🌐"
What this does: 📖 Creates a comprehensive web dashboard with real-time monitoring, alerts, and device management capabilities.
📋 Step 4: Complete the Implementation
<function_calls>