The Problem
On Monday you tested the 3 prompts in ChatGPT. You saw how analyzing sensor patterns → detecting anomalies → predicting failures works. But here's the reality: by the time your maintenance team reviews yesterday's sensor logs, the bearing's already seized up. You need real-time automated monitoring that catches problems 48 hours before they cause downtime.
See It Work
Watch the 3 prompts chain together automatically. This is what you'll build to monitor your production line.
The Code
Three levels: start simple with single machine monitoring, add ML models for better predictions, then scale to full factory automation. Pick where you are.
Level 1: Simple Threshold Monitoring
Good for: 10-50 machines | Setup time: 2 hours
# Simple Threshold Monitoring (10-50 machines) import json import anthropic from datetime import datetime, timedelta import paho.mqtt.client as mqtt class SimpleMaintenanceMonitor: def __init__(self, anthropic_api_key: str): self.client = anthropic.Anthropic(api_key=anthropic_api_key) self.mqtt_client = mqtt.Client() def analyze_sensor_data(self, sensor_data: dict) -> dict: """Chain the 3 prompts: analyze → detect → predict""" # Step 1: Analyze sensor patterns analysis_prompt = f"""Analyze this industrial sensor data and identify deviations from normal ranges. Format as JSON with: readings (each with value, unit, normal_range, deviation_percent, status). Sensor data: {json.dumps(sensor_data, indent=2)} Output valid JSON only.""" response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2048, messages=[{"role": "user", "content": analysis_prompt}] ) analysis = json.loads(response.content[0].text) # Step 2: Detect anomalies and root cause anomaly_prompt = f"""Based on this sensor analysis, identify anomalies and determine root cause. Return JSON with: anomalies_detected (type, confidence, indicators, severity), root_cause_analysis (primary_issue, contributing_factors, failure_mechanism). Analysis: {json.dumps(analysis, indent=2)} Output valid JSON only.""" response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2048, messages=[{"role": "user", "content": anomaly_prompt}] ) anomalies = json.loads(response.content[0].text) # Step 3: Predict failure timeline if critical if anomalies.get('requires_immediate_action'): prediction_prompt = f"""Predict failure timeline and recommend actions based on these anomalies. Return JSON with: failure_prediction (predicted_failure_time, time_until_failure, confidence), recommended_actions (action, priority, timeline, impact), cost_analysis. Anomalies: {json.dumps(anomalies, indent=2)} Current time: {datetime.now().isoformat()} Output valid JSON only.""" response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2048, messages=[{"role": "user", "content": prediction_prompt}] ) prediction = json.loads(response.content[0].text) else: prediction = {"status": "normal", "no_immediate_action_required": True} return { "machine_id": sensor_data.get('machine_id'), "timestamp": datetime.now().isoformat(), "analysis": analysis, "anomalies": anomalies, "prediction": prediction } def send_alert(self, result: dict, severity: str): """Send alert via MQTT to maintenance system""" alert = { "machine_id": result['machine_id'], "severity": severity, "message": result['prediction'].get('failure_prediction', {}).get('failure_mode', 'Anomaly detected'), "time_until_failure": result['prediction'].get('failure_prediction', {}).get('time_until_failure'), "actions": result['prediction'].get('recommended_actions', []) } self.mqtt_client.publish(f"factory/alerts/{severity}", json.dumps(alert)) print(f"Alert sent for {result['machine_id']}: {severity}") # Usage monitor = SimpleMaintenanceMonitor(anthropic_api_key="your-key") # Sample sensor data from MQTT broker sensor_data = { "machine_id": "CNC-047", "timestamp": "2025-07-01T14:23:15Z", "vibration": {"value": 8.2, "unit": "mm/s", "normal_range": [3, 5]}, "temperature": {"value": 78, "unit": "celsius", "normal_range": [45, 65]}, "amperage": {"value": 42, "unit": "amps", "normal_range": [28, 35]}, "oil_pressure": {"value": 45, "unit": "psi", "normal_range": [55, 65]}, "runtime_hours": 847 } result = monitor.analyze_sensor_data(sensor_data) if result['anomalies'].get('requires_immediate_action'): monitor.send_alert(result, 'critical') print(f"Analysis complete. Status: {result['analysis'].get('overall_health')}")
Level 2: ML-Enhanced Prediction with Historical Analysis
Good for: 50-500 machines | Setup time: 1 day
// ML-Enhanced Prediction (50-500 machines) import Anthropic from '@anthropic-ai/sdk'; import { InfluxDB, Point } from '@influxdata/influxdb-client'; import * as tf from '@tensorflow/tfjs-node'; interface SensorReading { timestamp: string; machine_id: string; vibration: number; temperature: number; amperage: number; oil_pressure: number; rpm: number; } interface PredictionResult { machine_id: string; failure_probability: number; time_until_failure_hours: number; anomalies: any[]; recommended_actions: any[]; } class MLMaintenancePredictor { private anthropic: Anthropic; private influx: InfluxDB; private model: tf.LayersModel | null = null; constructor(anthropicKey: string, influxUrl: string, influxToken: string) { this.anthropic = new Anthropic({ apiKey: anthropicKey }); this.influx = new InfluxDB({ url: influxUrl, token: influxToken }); } async loadMLModel() { // Load pre-trained anomaly detection model this.model = await tf.loadLayersModel('file://./models/bearing-failure-model/model.json'); } async getHistoricalData(machineId: string, hours: number = 72): Promise<SensorReading[]> { const queryApi = this.influx.getQueryApi('manufacturing'); const query = ` from(bucket: "sensors") |> range(start: -${hours}h) |> filter(fn: (r) => r["machine_id"] == "${machineId}") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") `; const data: SensorReading[] = []; return new Promise((resolve, reject) => { queryApi.queryRows(query, { next(row: any, tableMeta: any) { const reading = tableMeta.toObject(row) as SensorReading; data.push(reading); }, error(error: Error) { reject(error); }, complete() { resolve(data); }, }); }); } async predictWithML(currentReading: SensorReading, historicalData: SensorReading[]): Promise<number> { if (!this.model) throw new Error('Model not loaded'); // Prepare features: current + rolling statistics from history const features = [ currentReading.vibration, currentReading.temperature, currentReading.amperage, currentReading.oil_pressure, this.calculateTrend(historicalData, 'vibration'), this.calculateTrend(historicalData, 'temperature'), this.calculateStdDev(historicalData, 'vibration'), this.calculateStdDev(historicalData, 'temperature'), ]; const tensor = tf.tensor2d([features]); const prediction = this.model.predict(tensor) as tf.Tensor; const failureProbability = (await prediction.data())[0]; return failureProbability; } calculateTrend(data: SensorReading[], metric: keyof SensorReading): number { if (data.length < 2) return 0; const recent = data.slice(-10).map(d => d[metric] as number); const older = data.slice(-20, -10).map(d => d[metric] as number); const recentAvg = recent.reduce((a, b) => a + b, 0) / recent.length; const olderAvg = older.reduce((a, b) => a + b, 0) / older.length; return ((recentAvg - olderAvg) / olderAvg) * 100; } calculateStdDev(data: SensorReading[], metric: keyof SensorReading): number { const values = data.map(d => d[metric] as number); const mean = values.reduce((a, b) => a + b, 0) / values.length; const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length; return Math.sqrt(variance); } async analyzeMachine(currentReading: SensorReading): Promise<PredictionResult> { // Get historical context const historicalData = await this.getHistoricalData(currentReading.machine_id, 72); // ML prediction const mlFailureProbability = await this.predictWithML(currentReading, historicalData); // LLM analysis for context and recommendations const analysisPrompt = `Analyze this machine sensor data with ML prediction and historical context. Current Reading: ${JSON.stringify(currentReading, null, 2)} ML Failure Probability: ${(mlFailureProbability * 100).toFixed(1)}% Historical Trends (last 72 hours): - Vibration trend: ${this.calculateTrend(historicalData, 'vibration').toFixed(1)}% - Temperature trend: ${this.calculateTrend(historicalData, 'temperature').toFixed(1)}% - Vibration volatility: ${this.calculateStdDev(historicalData, 'vibration').toFixed(2)} mm/s Provide: 1. Anomalies detected with severity 2. Root cause analysis 3. Time until failure estimate (hours) 4. Recommended actions with priority Format as JSON with: anomalies[], root_cause, time_until_failure_hours, recommended_actions[]`; const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 3072, messages: [{ role: 'user', content: analysisPrompt }], }); const analysis = JSON.parse(response.content[0].text); return { machine_id: currentReading.machine_id, failure_probability: mlFailureProbability, time_until_failure_hours: analysis.time_until_failure_hours, anomalies: analysis.anomalies, recommended_actions: analysis.recommended_actions, }; } async monitorContinuously() { // Subscribe to real-time sensor stream const mqtt = require('mqtt'); const client = mqtt.connect('mqtt://factory-broker:1883'); client.on('message', async (topic: string, message: Buffer) => { const reading: SensorReading = JSON.parse(message.toString()); try { const prediction = await this.analyzeMachine(reading); if (prediction.failure_probability > 0.7) { await this.sendCriticalAlert(prediction); } else if (prediction.failure_probability > 0.4) { await this.sendWarningAlert(prediction); } // Store prediction in InfluxDB await this.storePrediction(prediction); } catch (error) { console.error(`Error analyzing ${reading.machine_id}:`, error); } }); client.subscribe('factory/sensors/+'); } async sendCriticalAlert(prediction: PredictionResult) { // Send to PagerDuty await fetch('https://events.pagerduty.com/v2/enqueue', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ routing_key: process.env.PAGERDUTY_KEY, event_action: 'trigger', payload: { summary: `CRITICAL: ${prediction.machine_id} failure predicted in ${prediction.time_until_failure_hours}h`, severity: 'critical', source: prediction.machine_id, custom_details: prediction, }, }), }); } async sendWarningAlert(prediction: PredictionResult) { // Send to Slack await fetch(process.env.SLACK_WEBHOOK_URL!, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text: `⚠️ Warning: ${prediction.machine_id} showing degradation`, blocks: [ { type: 'section', text: { type: 'mrkdwn', text: `*Machine:* ${prediction.machine_id}\n*Failure Probability:* ${(prediction.failure_probability * 100).toFixed(1)}%\n*Estimated Time:* ${prediction.time_until_failure_hours} hours`, }, }, ], }), }); } async storePrediction(prediction: PredictionResult) { const writeApi = this.influx.getWriteApi('manufacturing', 'predictions'); const point = new Point('maintenance_prediction') .tag('machine_id', prediction.machine_id) .floatField('failure_probability', prediction.failure_probability) .floatField('time_until_failure', prediction.time_until_failure_hours) .timestamp(new Date()); writeApi.writePoint(point); await writeApi.close(); } } // Usage const predictor = new MLMaintenancePredictor( process.env.ANTHROPIC_API_KEY!, 'http://influxdb:8086', process.env.INFLUX_TOKEN! ); await predictor.loadMLModel(); await predictor.monitorContinuously(); console.log('Continuous monitoring active...');
Level 3: Production Multi-Agent System with LangGraph
Good for: 500+ machines, multiple production lines | Setup time: 3-5 days
# Production Multi-Agent System (500+ machines) from langgraph.graph import Graph, END from typing import TypedDict, List, Optional import anthropic import numpy as np from datetime import datetime, timedelta import asyncio import aiohttp class MaintenanceState(TypedDict): machine_id: str sensor_data: dict historical_data: List[dict] ml_prediction: Optional[dict] anomaly_analysis: Optional[dict] root_cause: Optional[dict] failure_prediction: Optional[dict] maintenance_plan: Optional[dict] alerts_sent: List[str] cmms_ticket: Optional[str] class ProductionMaintenanceSystem: def __init__(self): self.client = anthropic.Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY')) self.graph = self.build_graph() def build_graph(self) -> Graph: graph = Graph() # Agent nodes graph.add_node("data_collector", self.collect_data_node) graph.add_node("ml_predictor", self.ml_prediction_node) graph.add_node("anomaly_detector", self.anomaly_detection_node) graph.add_node("root_cause_analyzer", self.root_cause_node) graph.add_node("failure_predictor", self.failure_prediction_node) graph.add_node("action_planner", self.action_planning_node) graph.add_node("alert_dispatcher", self.alert_dispatch_node) graph.add_node("cmms_integrator", self.cmms_integration_node) # Define flow graph.set_entry_point("data_collector") graph.add_edge("data_collector", "ml_predictor") graph.add_edge("ml_predictor", "anomaly_detector") # Conditional routing based on severity graph.add_conditional_edges( "anomaly_detector", self.route_by_severity, { "critical": "root_cause_analyzer", "warning": "failure_predictor", "normal": END } ) graph.add_edge("root_cause_analyzer", "failure_predictor") graph.add_edge("failure_predictor", "action_planner") graph.add_edge("action_planner", "alert_dispatcher") graph.add_edge("alert_dispatcher", "cmms_integrator") graph.add_edge("cmms_integrator", END) return graph.compile() async def collect_data_node(self, state: MaintenanceState) -> MaintenanceState: """Collect real-time and historical sensor data""" # Fetch from InfluxDB async with aiohttp.ClientSession() as session: # Get last 72 hours of data query = f""" from(bucket: "sensors") |> range(start: -72h) |> filter(fn: (r) => r["machine_id"] == "{state['machine_id']}") """ async with session.post( 'http://influxdb:8086/api/v2/query', headers={'Authorization': f'Token {os.getenv("INFLUX_TOKEN")}'}, json={'query': query} ) as resp: historical = await resp.json() state['historical_data'] = historical return state async def ml_prediction_node(self, state: MaintenanceState) -> MaintenanceState: """Run ML model for failure probability""" # Load TensorFlow model and predict import tensorflow as tf model = tf.keras.models.load_model('./models/bearing-failure-lstm') # Prepare time series features features = self.prepare_features(state['historical_data']) prediction = model.predict(features) state['ml_prediction'] = { 'failure_probability': float(prediction[0][0]), 'confidence': float(prediction[0][1]), 'model_version': 'bearing-failure-lstm-v2.3' } return state async def anomaly_detection_node(self, state: MaintenanceState) -> MaintenanceState: """Detect anomalies using Claude with ML context""" prompt = f"""Analyze sensor data for anomalies with ML prediction context. Current Reading: {json.dumps(state['sensor_data'], indent=2)} ML Failure Probability: {state['ml_prediction']['failure_probability']:.2%} Historical Statistics (72h): {self.calculate_statistics(state['historical_data'])} Identify: 1. Anomalies (type, severity, confidence) 2. Deviation patterns 3. Overall health status (normal/warning/critical) Format as JSON with: anomalies[], severity, health_status""" response = await self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2048, messages=[{"role": "user", "content": prompt}] ) state['anomaly_analysis'] = json.loads(response.content[0].text) return state async def root_cause_node(self, state: MaintenanceState) -> MaintenanceState: """Determine root cause of critical anomalies""" prompt = f"""Perform root cause analysis for critical anomalies. Anomalies Detected: {json.dumps(state['anomaly_analysis'], indent=2)} Machine Context: - Type: CNC Milling Machine - Age: 8 years - Last Maintenance: {self.get_last_maintenance(state['machine_id'])} - Runtime: {state['sensor_data'].get('runtime_hours')} hours Determine: 1. Primary root cause 2. Contributing factors 3. Failure mechanism 4. Similar historical failures Format as JSON with: primary_cause, contributing_factors[], failure_mechanism, historical_precedents[]""" response = await self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=3072, messages=[{"role": "user", "content": prompt}] ) state['root_cause'] = json.loads(response.content[0].text) return state async def failure_prediction_node(self, state: MaintenanceState) -> MaintenanceState: """Predict failure timeline with high accuracy""" prompt = f"""Predict precise failure timeline based on all available data. ML Prediction: {state['ml_prediction']['failure_probability']:.2%} failure probability Anomalies: {json.dumps(state['anomaly_analysis'], indent=2)} Root Cause: {json.dumps(state['root_cause'], indent=2)} Historical Failure Data: - Similar failures typically occur {self.get_historical_failure_time(state['root_cause'])} hours after this stage - Degradation rate: {self.calculate_degradation_rate(state['historical_data'])} Predict: 1. Time until failure (hours, with confidence interval) 2. Failure mode and severity 3. Safe operating window 4. Risk if no action taken Format as JSON with: time_until_failure_hours, confidence_interval, failure_mode, safe_operating_window, risk_assessment""" response = await self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2048, messages=[{"role": "user", "content": prompt}] ) state['failure_prediction'] = json.loads(response.content[0].text) return state async def action_planning_node(self, state: MaintenanceState) -> MaintenanceState: """Generate optimal maintenance plan""" prompt = f"""Create optimal maintenance action plan. Failure Prediction: {json.dumps(state['failure_prediction'], indent=2)} Root Cause: {json.dumps(state['root_cause'], indent=2)} Production Schedule: - Current shift: {self.get_current_shift()} - Next maintenance window: {self.get_next_maintenance_window()} - Production priority: High (order #4721 due in 36 hours) Spare Parts Inventory: {self.check_spare_parts_availability(state['root_cause'])} Create plan with: 1. Immediate actions (reduce risk) 2. Optimal maintenance timing (minimize production impact) 3. Required parts and labor 4. Cost-benefit analysis (planned vs emergency) 5. Contingency plan if failure occurs before maintenance Format as JSON with: immediate_actions[], optimal_timing, required_resources, cost_analysis, contingency_plan""" response = await self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=4096, messages=[{"role": "user", "content": prompt}] ) state['maintenance_plan'] = json.loads(response.content[0].text) return state async def alert_dispatch_node(self, state: MaintenanceState) -> MaintenanceState: """Send alerts to appropriate channels based on severity""" severity = state['anomaly_analysis']['severity'] alerts_sent = [] # Critical: PagerDuty + SMS + Slack if severity == 'critical': await self.send_pagerduty_alert(state) alerts_sent.append('pagerduty') await self.send_sms_alert(state) alerts_sent.append('sms') # All severities: Slack await self.send_slack_alert(state) alerts_sent.append('slack') # Log to monitoring dashboard await self.log_to_grafana(state) alerts_sent.append('grafana') state['alerts_sent'] = alerts_sent return state async def cmms_integration_node(self, state: MaintenanceState) -> MaintenanceState: """Create work order in CMMS (SAP/Maximo)""" # Create work order via SAP API work_order = { 'equipment_id': state['machine_id'], 'description': f"Predictive maintenance: {state['root_cause']['primary_cause']}", 'priority': 'High' if state['anomaly_analysis']['severity'] == 'critical' else 'Medium', 'planned_start': state['maintenance_plan']['optimal_timing'], 'estimated_duration': state['maintenance_plan']['required_resources']['labor_hours'], 'required_parts': state['maintenance_plan']['required_resources']['parts'], 'failure_prediction': state['failure_prediction'], 'cost_justification': state['maintenance_plan']['cost_analysis'] } async with aiohttp.ClientSession() as session: async with session.post( 'https://sap-api.factory.com/workorders', headers={'Authorization': f'Bearer {os.getenv("SAP_TOKEN")}'}, json=work_order ) as resp: result = await resp.json() state['cmms_ticket'] = result['work_order_id'] return state def route_by_severity(self, state: MaintenanceState) -> str: """Route to appropriate next node based on severity""" severity = state['anomaly_analysis']['severity'] if severity == 'critical': return 'critical' elif severity == 'warning': return 'warning' else: return 'normal' # Helper methods def prepare_features(self, historical_data: List[dict]) -> np.ndarray: # Convert to time series features for LSTM pass def calculate_statistics(self, historical_data: List[dict]) -> dict: # Calculate rolling means, std devs, trends pass def get_last_maintenance(self, machine_id: str) -> str: # Query CMMS for last maintenance date pass def get_historical_failure_time(self, root_cause: dict) -> float: # Query historical failure database pass def calculate_degradation_rate(self, historical_data: List[dict]) -> float: # Calculate rate of degradation from trend pass async def send_pagerduty_alert(self, state: MaintenanceState): # Send to PagerDuty pass async def send_sms_alert(self, state: MaintenanceState): # Send via Twilio pass async def send_slack_alert(self, state: MaintenanceState): # Send to Slack channel pass async def log_to_grafana(self, state: MaintenanceState): # Push metrics to Prometheus/Grafana pass # Usage - Process machines continuously system = ProductionMaintenanceSystem() async def monitor_all_machines(): machines = await get_active_machines() # From factory database while True: tasks = [] for machine in machines: sensor_data = await get_latest_sensor_reading(machine['id']) initial_state = { 'machine_id': machine['id'], 'sensor_data': sensor_data, 'historical_data': [], 'ml_prediction': None, 'anomaly_analysis': None, 'root_cause': None, 'failure_prediction': None, 'maintenance_plan': None, 'alerts_sent': [], 'cmms_ticket': None } task = system.graph.ainvoke(initial_state) tasks.append(task) # Process all machines in parallel results = await asyncio.gather(*tasks) print(f"Processed {len(results)} machines. {sum(1 for r in results if r['cmms_ticket'])} work orders created.") # Wait 5 minutes before next scan await asyncio.sleep(300) # Run continuous monitoring asyncio.run(monitor_all_machines())
When to Level Up
Start: Simple Threshold Monitoring
10-50 machines
- Rule-based alerts when sensors exceed thresholds
- LLM analysis for context and recommendations
- Manual review of predictions before action
- MQTT or webhook integration for sensor data
Scale: Add ML Predictions
50-500 machines
- TensorFlow/PyTorch models for failure probability
- Historical trend analysis (72-hour rolling window)
- Automated alerts to PagerDuty/Slack based on ML confidence
- Time-series database (InfluxDB) for sensor history
- Integration with CMMS for work order creation
Production: Multi-Agent Orchestration
500-2,000 machines
- LangGraph workflow with specialized agents (data collector, ML predictor, root cause analyzer, action planner)
- Conditional routing based on severity (critical → immediate action, warning → scheduled maintenance)
- Real-time monitoring dashboard (Grafana) with predictive metrics
- Automated work order creation in SAP/Maximo CMMS
- Cost-benefit optimization (planned vs emergency maintenance)
- Human-in-the-loop for high-impact decisions
Enterprise: Factory-Wide Intelligence
2,000+ machines across multiple facilities
- Distributed processing with Kubernetes (scale to 10,000+ machines)
- Multi-model ensemble (LSTM + Transformer + Physics-based models)
- Cross-machine correlation analysis (detect cascading failures)
- Predictive supply chain integration (auto-order parts 3 days before failure)
- Digital twin simulation (test maintenance strategies before execution)
- Federated learning across facilities (improve models without sharing raw data)
- Edge computing on factory floor (sub-second latency for critical alerts)
Manufacturing-Specific Gotchas
The code examples work for basic monitoring. But manufacturing has unique challenges that'll bite you if you don't handle them.
Sensor Calibration Drift
Industrial sensors drift over time. A vibration sensor that reads 5mm/s today might read 5.3mm/s next month for the same actual vibration. Your ML model will think the machine is degrading when it's just sensor drift. You need to track calibration dates and adjust baselines.
from datetime import datetime, timedelta class SensorCalibrationManager: def __init__(self): self.calibration_db = {} # sensor_id -> {date, baseline_offset} def adjust_reading(self, sensor_id: str, raw_value: float) -> float: """Adjust sensor reading for calibration drift""" calibration = self.calibration_db.get(sensor_id) if not calibration: return raw_value # Calculate drift based on time since last calibration days_since_cal = (datetime.now() - calibration['date']).days # Assume 0.5% drift per month for vibration sensors drift_factor = 1 + (0.005 * days_since_cal / 30) # Adjust reading adjusted = raw_value / drift_factor # Flag if calibration is overdue (>6 months) if days_since_cal > 180: self.flag_calibration_needed(sensor_id) return adjusted def update_calibration(self, sensor_id: str, baseline_offset: float): """Record new calibration after maintenance""" self.calibration_db[sensor_id] = { 'date': datetime.now(), 'baseline_offset': baseline_offset } # Usage in monitoring pipeline cal_manager = SensorCalibrationManager() raw_vibration = 8.2 # mm/s from sensor adjusted_vibration = cal_manager.adjust_reading('VIB-047-01', raw_vibration) # Use adjusted value for analysis sensor_data['vibration'] = adjusted_vibration
Production Schedule Conflicts
You can't just shut down a machine because your AI says it might fail in 48 hours. That machine might be running a $2M order due tomorrow. You need to integrate with production scheduling systems (ERP) to find optimal maintenance windows that minimize revenue impact.
interface ProductionOrder { order_id: string; machine_id: string; value: number; due_date: Date; completion_percent: number; } class MaintenanceScheduleOptimizer { async findOptimalMaintenanceWindow( machineId: string, urgency: 'critical' | 'high' | 'medium', estimatedDuration: number ): Promise<Date> { // Get active orders on this machine const orders = await this.getActiveOrders(machineId); // Calculate revenue at risk for each potential window const windows = this.generatePotentialWindows(urgency); const scored = windows.map(window => { const impactedOrders = orders.filter(order => this.isOrderImpacted(order, window, estimatedDuration) ); const revenueAtRisk = impactedOrders.reduce((sum, order) => sum + (order.value * (1 - order.completion_percent)), 0 ); const lateDeliveryPenalty = impactedOrders .filter(order => this.wouldBeLate(order, window, estimatedDuration)) .reduce((sum, order) => sum + order.value * 0.1, 0); // 10% penalty return { window, revenueAtRisk, lateDeliveryPenalty, totalCost: revenueAtRisk + lateDeliveryPenalty }; }); // Sort by lowest cost scored.sort((a, b) => a.totalCost - b.totalCost); // If critical, may need to accept some revenue loss if (urgency === 'critical' && scored[0].totalCost > 50000) { await this.notifyProductionManager({ machineId, optimalWindow: scored[0].window, estimatedCost: scored[0].totalCost, reason: 'Critical failure prevention' }); } return scored[0].window; } private generatePotentialWindows(urgency: string): Date[] { const now = new Date(); const windows: Date[] = []; // Critical: next 24 hours if (urgency === 'critical') { for (let i = 0; i < 24; i += 2) { windows.push(new Date(now.getTime() + i * 3600000)); } } else { // High/Medium: next 7 days, prefer nights/weekends for (let day = 0; day < 7; day++) { const date = new Date(now.getTime() + day * 86400000); // Add night shift (11pm) date.setHours(23, 0, 0, 0); windows.push(new Date(date)); // Add weekend days if (date.getDay() === 0 || date.getDay() === 6) { date.setHours(8, 0, 0, 0); windows.push(new Date(date)); } } } return windows; } } // Usage const optimizer = new MaintenanceScheduleOptimizer(); const optimalTime = await optimizer.findOptimalMaintenanceWindow( 'CNC-047', 'high', 4 // 4 hour maintenance window ); console.log(`Schedule maintenance for: ${optimalTime}`);
Environmental Factors (Temperature, Humidity, Dust)
Factory floor conditions affect sensor readings. A hot summer day makes temperature sensors read higher, but that doesn't mean the bearing is failing. You need to normalize for ambient conditions. Same with humidity affecting electrical readings and dust affecting optical sensors.
import requests from datetime import datetime class EnvironmentalNormalizer: def __init__(self, weather_api_key: str, factory_location: dict): self.api_key = weather_api_key self.location = factory_location self.baseline_temp = 20 # Celsius self.baseline_humidity = 50 # % def get_current_conditions(self) -> dict: """Get current weather from OpenWeather API""" response = requests.get( f"https://api.openweathermap.org/data/2.5/weather", params={ 'lat': self.location['lat'], 'lon': self.location['lon'], 'appid': self.api_key, 'units': 'metric' } ) data = response.json() return { 'ambient_temp': data['main']['temp'], 'humidity': data['main']['humidity'], 'timestamp': datetime.now() } def normalize_temperature_reading(self, machine_temp: float, ambient_temp: float) -> float: """Adjust machine temperature for ambient conditions""" # Machine temp typically runs 40-50°C above ambient expected_delta = 45 # Calculate what temp would be at baseline ambient ambient_delta = ambient_temp - self.baseline_temp normalized = machine_temp - ambient_delta return normalized def normalize_electrical_reading(self, amperage: float, humidity: float) -> float: """Adjust electrical readings for humidity""" # High humidity increases resistance, lowers current draw # Normalize to baseline 50% humidity humidity_factor = 1 + ((humidity - self.baseline_humidity) * 0.002) normalized = amperage * humidity_factor return normalized def apply_normalizations(self, sensor_data: dict) -> dict: """Apply all environmental normalizations""" conditions = self.get_current_conditions() normalized = sensor_data.copy() # Normalize temperature if 'temperature' in sensor_data: normalized['temperature'] = self.normalize_temperature_reading( sensor_data['temperature'], conditions['ambient_temp'] ) # Normalize electrical readings if 'amperage' in sensor_data: normalized['amperage'] = self.normalize_electrical_reading( sensor_data['amperage'], conditions['humidity'] ) # Store ambient conditions for reference normalized['ambient_conditions'] = conditions return normalized # Usage normalizer = EnvironmentalNormalizer( weather_api_key='your-key', factory_location={'lat': 42.3601, 'lon': -71.0589} # Boston ) # Before analysis, normalize readings raw_sensor_data = { 'machine_id': 'CNC-047', 'temperature': 78, # Celsius 'amperage': 42, 'vibration': 8.2 } normalized_data = normalizer.apply_normalizations(raw_sensor_data) # Use normalized data for failure prediction result = analyze_machine(normalized_data)
Spare Parts Inventory Integration
Your AI predicts a bearing failure in 48 hours. Great! But do you have the replacement bearing in stock? If not, can you get it in time? You need to integrate with inventory management systems and automatically trigger parts orders when predictions are made. Otherwise your predictions are useless.
interface SparePart { part_number: string; description: string; quantity_on_hand: number; lead_time_days: number; cost: number; supplier: string; } class SparePartsManager { private inventoryApi: string; private purchasingApi: string; async checkAvailability(requiredParts: string[]): Promise<{ available: string[]; needsOrder: string[]; canMeetTimeline: boolean; }> { const inventory = await this.getInventoryLevels(requiredParts); const available: string[] = []; const needsOrder: string[] = []; for (const part of requiredParts) { const stock = inventory.find(i => i.part_number === part); if (!stock || stock.quantity_on_hand === 0) { needsOrder.push(part); } else { available.push(part); } } // Check if we can get needed parts in time const canMeetTimeline = await this.canDeliverInTime(needsOrder, 48); return { available, needsOrder, canMeetTimeline }; } async autoOrderParts( parts: string[], urgency: 'critical' | 'high' | 'medium', requiredBy: Date ): Promise<{ orders: any[]; totalCost: number; estimatedDelivery: Date; }> { const orders = []; let totalCost = 0; for (const partNumber of parts) { const partInfo = await this.getPartInfo(partNumber); // Check multiple suppliers for fastest delivery const suppliers = await this.getSuppliers(partNumber); const sorted = suppliers.sort((a, b) => { // For critical, prioritize speed over cost if (urgency === 'critical') { return a.lead_time_days - b.lead_time_days; } // For others, balance cost and speed return (a.cost * a.lead_time_days) - (b.cost * b.lead_time_days); }); const bestSupplier = sorted[0]; // Place order const order = await fetch(this.purchasingApi, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ part_number: partNumber, quantity: 1, supplier: bestSupplier.supplier, urgency: urgency, required_by: requiredBy.toISOString(), reason: 'Predictive maintenance - equipment failure prevention', expedite: urgency === 'critical' }) }); const orderResult = await order.json(); orders.push(orderResult); totalCost += bestSupplier.cost; } // Find latest delivery date const deliveryDates = orders.map(o => new Date(o.estimated_delivery)); const estimatedDelivery = new Date(Math.max(...deliveryDates.map(d => d.getTime()))); return { orders, totalCost, estimatedDelivery }; } async getInventoryLevels(partNumbers: string[]): Promise<SparePart[]> { const response = await fetch( `${this.inventoryApi}/parts?ids=${partNumbers.join(',')}` ); return response.json(); } async canDeliverInTime(parts: string[], hoursNeeded: number): Promise<boolean> { if (parts.length === 0) return true; const inventory = await this.getInventoryLevels(parts); const maxLeadTime = Math.max(...inventory.map(p => p.lead_time_days)); return (maxLeadTime * 24) <= hoursNeeded; } } // Usage in maintenance workflow const partsManager = new SparePartsManager(); // When failure is predicted const prediction = { machine_id: 'CNC-047', time_until_failure_hours: 48, required_parts: ['BEARING-6205-2RS', 'SEAL-47-62-8'], urgency: 'critical' }; // Check if parts are available const availability = await partsManager.checkAvailability(prediction.required_parts); if (!availability.canMeetTimeline) { console.log('WARNING: Parts cannot be delivered in time!'); console.log('Needed:', availability.needsOrder); // Escalate to production manager await notifyProductionManager({ issue: 'parts_unavailable', machine: prediction.machine_id, timeline: prediction.time_until_failure_hours }); } else if (availability.needsOrder.length > 0) { // Auto-order needed parts const requiredBy = new Date(Date.now() + prediction.time_until_failure_hours * 3600000); const orderResult = await partsManager.autoOrderParts( availability.needsOrder, prediction.urgency, requiredBy ); console.log(`Parts ordered. Total cost: $${orderResult.totalCost}`); console.log(`Delivery: ${orderResult.estimatedDelivery}`); }
Multi-Machine Correlation (Cascading Failures)
Machines don't fail in isolation. If Machine A fails, it might cause downstream Machine B to fail 6 hours later. Your system needs to detect these correlations. When CNC-047 shows bearing issues, check if the next machine in the production line is handling extra load. Use graph analysis to map dependencies.
import networkx as nx from typing import Dict, List import anthropic class CascadingFailureDetector: def __init__(self): self.client = anthropic.Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY')) self.production_graph = self.build_production_graph() def build_production_graph(self) -> nx.DiGraph: """Build directed graph of production line dependencies""" G = nx.DiGraph() # Add machines as nodes machines = [ {'id': 'CNC-047', 'type': 'milling', 'capacity': 100}, {'id': 'CNC-048', 'type': 'milling', 'capacity': 100}, {'id': 'LATHE-12', 'type': 'lathe', 'capacity': 80}, {'id': 'GRIND-05', 'type': 'grinder', 'capacity': 120}, ] for machine in machines: G.add_node(machine['id'], **machine) # Add edges for material flow G.add_edge('CNC-047', 'LATHE-12', flow_rate=50) G.add_edge('CNC-048', 'LATHE-12', flow_rate=50) G.add_edge('LATHE-12', 'GRIND-05', flow_rate=80) return G async def analyze_cascade_risk( self, failing_machine: str, time_until_failure: float ) -> Dict: """Analyze risk of cascading failures""" # Find downstream machines downstream = list(nx.descendants(self.production_graph, failing_machine)) # Calculate load redistribution failing_capacity = self.production_graph.nodes[failing_machine]['capacity'] impact_analysis = [] for machine in downstream: # Calculate extra load this machine will handle current_load = self.get_current_load(machine) extra_load = self.calculate_extra_load(failing_machine, machine) new_load = current_load + extra_load capacity = self.production_graph.nodes[machine]['capacity'] utilization = (new_load / capacity) * 100 # Get current health status health = await self.get_machine_health(machine) impact_analysis.append({ 'machine_id': machine, 'current_load': current_load, 'extra_load': extra_load, 'new_utilization': utilization, 'current_health': health, 'cascade_risk': 'high' if utilization > 90 and health['status'] != 'good' else 'medium' }) # Use LLM to analyze complex cascade scenarios cascade_prompt = f"""Analyze cascading failure risk in production line. Failing Machine: {failing_machine} Time Until Failure: {time_until_failure} hours Downstream Impact: {json.dumps(impact_analysis, indent=2)} Production Line Graph: {self.graph_to_text(self.production_graph)} Analyze: 1. Which machines are at highest risk of cascading failure? 2. What's the timeline for each potential cascade? 3. What preventive actions should be taken? 4. What's the total production impact if cascades occur? Format as JSON with: at_risk_machines[], timeline, preventive_actions[], production_impact""" response = await self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=3072, messages=[{"role": "user", "content": cascade_prompt}] ) cascade_analysis = json.loads(response.content[0].text) cascade_analysis['downstream_machines'] = impact_analysis return cascade_analysis def calculate_extra_load(self, failing_machine: str, downstream_machine: str) -> float: """Calculate extra load on downstream machine when upstream fails""" # If there are parallel machines, load redistributes upstream_machines = list(self.production_graph.predecessors(downstream_machine)) if failing_machine not in upstream_machines: return 0 # Get flow from failing machine failing_flow = self.production_graph.edges[failing_machine, downstream_machine]['flow_rate'] # Redistribute to other upstream machines active_upstream = [m for m in upstream_machines if m != failing_machine] if len(active_upstream) == 0: return 0 # Downstream will stop # Distribute load evenly (in reality, would check capacity) extra_per_machine = failing_flow / len(active_upstream) return extra_per_machine def get_current_load(self, machine_id: str) -> float: # Query real-time production data pass async def get_machine_health(self, machine_id: str) -> dict: # Get latest health status from monitoring system pass def graph_to_text(self, graph: nx.DiGraph) -> str: # Convert graph to text representation for LLM lines = [] for edge in graph.edges(data=True): lines.append(f"{edge[0]} -> {edge[1]} (flow: {edge[2]['flow_rate']} units/hr)") return '\n'.join(lines) # Usage detector = CascadingFailureDetector() # When a failure is predicted prediction = { 'machine_id': 'CNC-047', 'time_until_failure_hours': 48 } # Analyze cascade risk cascade_risk = await detector.analyze_cascade_risk( prediction['machine_id'], prediction['time_until_failure_hours'] ) if cascade_risk['at_risk_machines']: print(f"WARNING: {len(cascade_risk['at_risk_machines'])} machines at risk of cascade failure") print(f"Preventive actions: {cascade_risk['preventive_actions']}") # Alert production manager await send_cascade_alert(cascade_risk)
Cost Calculator
Manual Process (Reactive Maintenance)
Limitations:
- • Only catches 15% of failures before breakdown
- • 4-6 hour delay between issue and detection
- • No predictive capability
- • Reactive approach causes production disruptions
Automated Predictive Maintenance
Benefits:
- ✓ Catches 85% of failures 48+ hours early
- ✓ Real-time monitoring with sub-minute detection
- ✓ Predictive timeline allows optimal scheduling
- ✓ Reduced emergency repairs and overtime