The Problem
On Monday you tested the 3 prompts in ChatGPT. You saw how sensor data extraction → anomaly detection → alert generation works. But here's the thing: you can't have operators manually checking dashboards and running prompts for 500 sensors. One engineer spending 4 hours per day monitoring sensor data manually? That's $120/day in labor costs. Multiply that across a factory floor and you're looking at $36,000/year just on reactive monitoring. Plus the 30-minute lag time between anomaly and alert means equipment failures you could have prevented.
See It Work
Watch the 3 prompts chain together automatically. This is what you'll build.
Watch It Work
See the AI automation in action
The Code
Three levels: start simple, add reliability, then scale to production. Pick where you are.
When to Level Up
Simple API Calls
- Basic extraction → analysis → alert chain
- Manual MQTT subscription
- Local logging
- No historical context
Add Reliability Layer
- Retry with exponential backoff
- Redis rate limiting
- InfluxDB time-series storage
- MQTT auto-reconnect
- Historical data queries
- Structured logging
Multi-Agent System
- LangGraph orchestration
- ML + LLM hybrid analysis
- Distributed processing
- Real-time dashboards
- Advanced alerting (PagerDuty, Slack)
- Predictive maintenance scoring
Enterprise Platform
- Kubernetes orchestration
- Auto-scaling based on load
- Multi-region deployment
- Custom ML model training
- Advanced analytics dashboard
- API for third-party integrations
- Compliance reporting
- Cost optimization
Manufacturing Tech Gotchas
Real challenges from production deployments. Learn from others' mistakes.
MQTT Message Ordering
Use MQTT QoS 2 for critical sensors. Add sequence numbers and timestamps to messages. Implement buffer window for reordering.
# MQTT QoS 2 with sequence tracking
import paho.mqtt.client as mqtt
from collections import deque
import time
class OrderedMessageBuffer:
def __init__(self, window_size=10, timeout=5):
self.buffer = deque(maxlen=window_size)OPC UA Data Type Mapping
Normalize OPC UA data to flat JSON before sending to LLM. Handle null values explicitly. Use type hints.
# OPC UA to LLM-friendly JSON
from opcua import Client
import json
from typing import Any, Dict
def normalize_opc_value(value: Any) -> Any:
"""Convert OPC UA value to LLM-friendly format"""
if value is None:Time-Series Data Volume
Aggregate historical data before sending to LLM. Use statistical summaries (min, max, mean, std). Only send anomalies in detail.
# Time-series data aggregation
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def aggregate_historical_data(
sensor_id: str,
hours: int = 24,Alert Fatigue from False Positives
Implement alert scoring and suppression. Use LLM to validate ML anomalies against domain knowledge. Batch low-priority alerts.
# Alert scoring and suppression
from dataclasses import dataclass
from typing import List, Dict
import asyncio
@dataclass
class Alert:
sensor_id: strCost Management at Scale
Implement tiered processing: ML for screening, LLM only for anomalies. Cache common patterns. Use cheaper models for validation.
# Cost-optimized processing pipeline
from enum import Enum
import hashlib
class ProcessingTier(Enum):
ML_ONLY = 1 # $0.001/call
CHEAP_LLM = 2 # $0.005/call (GPT-3.5)
PREMIUM_LLM = 3 # $0.02/call (GPT-4)Adjust Your Numbers
❌ Manual Process
✅ AI-Automated
You Save
2026 Randeep Bhatia. All Rights Reserved.
No part of this content may be reproduced, distributed, or transmitted in any form without prior written permission.