The Problem
On Monday you tested the 3 prompts in ChatGPT. You saw how detection â assessment â response works. But here's the thing: you can't have someone manually checking Twitter, news sites, and review platforms every 30 minutes. By the time you notice the angry tweets or negative press, it's already viral. You need automated monitoring that catches signals before they become fires.
See It Work
Watch the 3 prompts chain together automatically. This is what you'll build - real-time monitoring that catches crises early.
The Code
Three levels: start simple, add reliability, then scale to production. Pick where you are.
Level 1: Simple Polling Script
Good for: 10-50 sources | Setup time: 30 minutes
# Simple Polling Script (10-50 sources) import tweepy import openai import requests from datetime import datetime import time # Twitter API setup twitter_client = tweepy.Client( bearer_token="your_twitter_bearer_token", consumer_key="your_consumer_key", consumer_secret="your_consumer_secret", access_token="your_access_token", access_token_secret="your_access_token_secret" ) def monitor_twitter(keywords: list[str]) -> list[dict]: """Poll Twitter for crisis signals""" query = " OR ".join([f'"{kw}"' for kw in keywords]) tweets = twitter_client.search_recent_tweets( query=query, max_results=100, tweet_fields=['created_at', 'public_metrics', 'author_id'] ) signals = [] for tweet in tweets.data or []: signals.append({ 'source': 'twitter', 'content': tweet.text, 'reach': tweet.public_metrics['impression_count'], 'timestamp': tweet.created_at.isoformat(), 'url': f'https://twitter.com/user/status/{tweet.id}' }) return signals def analyze_sentiment(signals: list[dict]) -> dict: """Use GPT-4 to analyze sentiment and severity""" content = "\n\n".join([s['content'] for s in signals]) prompt = f"""Analyze these social media posts for crisis signals. Rate severity 0-10, sentiment -1 to +1, and identify key issues. Posts: {content} Return JSON with: crisis_score (0-10), sentiment (-1 to +1), key_issues (list), recommended_actions (list)""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.3 ) import json return json.loads(response.choices[0].message.content) def send_slack_alert(analysis: dict, signals: list[dict]): """Send alert to Slack if crisis detected""" if analysis['crisis_score'] < 7: return # Not critical enough webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL" message = { "text": f"đ¨ CRISIS ALERT đ¨", "blocks": [ { "type": "header", "text": {"type": "plain_text", "text": "Crisis Detected"} }, { "type": "section", "fields": [ {"type": "mrkdwn", "text": f"*Severity:* {analysis['crisis_score']}/10"}, {"type": "mrkdwn", "text": f"*Sentiment:* {analysis['sentiment']}"}, {"type": "mrkdwn", "text": f"*Sources:* {len(signals)}"}, {"type": "mrkdwn", "text": f"*Time:* {datetime.now().strftime('%H:%M')}"} ] }, { "type": "section", "text": {"type": "mrkdwn", "text": f"*Key Issues:*\n" + "\n".join([f"âĸ {issue}" for issue in analysis['key_issues']])} }, { "type": "section", "text": {"type": "mrkdwn", "text": f"*Recommended Actions:*\n" + "\n".join([f"âĸ {action}" for action in analysis['recommended_actions']])} } ] } requests.post(webhook_url, json=message) # Main monitoring loop def run_monitor(): keywords = [ "@YourCompany", "YourProduct defect", "YourCompany recall", "YourCompany fire", "YourCompany dangerous" ] while True: print(f"[{datetime.now()}] Checking for crisis signals...") # Step 1: Collect signals signals = monitor_twitter(keywords) if signals: # Step 2: Analyze analysis = analyze_sentiment(signals) # Step 3: Alert if needed send_slack_alert(analysis, signals) print(f"Found {len(signals)} signals. Crisis score: {analysis['crisis_score']}") # Poll every 15 minutes time.sleep(900) if __name__ == "__main__": run_monitor()
Level 2: With Multi-Source Monitoring & Error Handling
Good for: 50-500 sources | Setup time: 2 hours
// Multi-Source Monitoring with Error Handling (50-500 sources) import Anthropic from '@anthropic-ai/sdk'; import axios from 'axios'; import { TwitterApi } from 'twitter-api-v2'; interface CrisisSignal { source: string; content: string; sentiment: number; reach: number; timestamp: string; url?: string; } interface CrisisAnalysis { crisis_level: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL'; crisis_score: number; sentiment: number; key_issues: string[]; recommended_actions: string[]; affected_areas: string[]; } class CrisisMonitor { private anthropic: Anthropic; private twitter: TwitterApi; private slackWebhook: string; private pagerdutyKey: string; constructor() { this.anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY!, }); this.twitter = new TwitterApi(process.env.TWITTER_BEARER_TOKEN!); this.slackWebhook = process.env.SLACK_WEBHOOK_URL!; this.pagerdutyKey = process.env.PAGERDUTY_API_KEY!; } async monitorTwitter(keywords: string[]): Promise<CrisisSignal[]> { const signals: CrisisSignal[] = []; try { const query = keywords.map((k) => `"${k}"`).join(' OR '); const tweets = await this.twitter.v2.search(query, { max_results: 100, 'tweet.fields': ['created_at', 'public_metrics'], }); for (const tweet of tweets.data.data || []) { signals.push({ source: 'twitter', content: tweet.text, sentiment: 0, // Will be analyzed later reach: tweet.public_metrics?.impression_count || 0, timestamp: tweet.created_at!, url: `https://twitter.com/user/status/${tweet.id}`, }); } } catch (error) { console.error('Twitter monitoring error:', error); await this.logError('twitter', error); } return signals; } async monitorNews(keywords: string[]): Promise<CrisisSignal[]> { const signals: CrisisSignal[] = []; try { // Using NewsAPI.org const response = await axios.get('https://newsapi.org/v2/everything', { params: { q: keywords.join(' OR '), apiKey: process.env.NEWS_API_KEY, sortBy: 'publishedAt', pageSize: 50, }, }); for (const article of response.data.articles || []) { signals.push({ source: 'news', content: `${article.title}. ${article.description}`, sentiment: 0, reach: this.estimateNewsReach(article.source.name), timestamp: article.publishedAt, url: article.url, }); } } catch (error) { console.error('News monitoring error:', error); await this.logError('news', error); } return signals; } async monitorReddit(subreddits: string[]): Promise<CrisisSignal[]> { const signals: CrisisSignal[] = []; try { for (const sub of subreddits) { const response = await axios.get( `https://www.reddit.com/r/${sub}/new.json`, { headers: { 'User-Agent': 'CrisisMonitor/1.0' }, params: { limit: 25 }, } ); for (const post of response.data.data.children || []) { const data = post.data; signals.push({ source: 'reddit', content: `${data.title}. ${data.selftext}`, sentiment: 0, reach: data.ups * 10, // Rough estimate timestamp: new Date(data.created_utc * 1000).toISOString(), url: `https://reddit.com${data.permalink}`, }); } } } catch (error) { console.error('Reddit monitoring error:', error); await this.logError('reddit', error); } return signals; } async analyzeSignals(signals: CrisisSignal[]): Promise<CrisisAnalysis> { const content = signals .map((s) => `[${s.source}] ${s.content}`) .join('\n\n'); const prompt = `Analyze these crisis signals and assess severity. Signals: ${content} Return JSON with: - crisis_level: LOW/MEDIUM/HIGH/CRITICAL - crisis_score: 0-10 - sentiment: -1 to +1 - key_issues: array of main problems - recommended_actions: array of immediate actions - affected_areas: array of business areas impacted`; try { const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [{ role: 'user', content: prompt }], }); const content = response.content[0]; if (content.type !== 'text') throw new Error('Invalid response'); return JSON.parse(content.text); } catch (error) { console.error('Analysis error:', error); throw error; } } async sendAlerts(analysis: CrisisAnalysis, signals: CrisisSignal[]) { if (analysis.crisis_level === 'CRITICAL' || analysis.crisis_level === 'HIGH') { await Promise.all([ this.sendSlackAlert(analysis, signals), this.sendPagerDutyAlert(analysis), this.sendEmailAlert(analysis), ]); } else if (analysis.crisis_level === 'MEDIUM') { await this.sendSlackAlert(analysis, signals); } } private async sendSlackAlert( analysis: CrisisAnalysis, signals: CrisisSignal[] ) { const emoji = { CRITICAL: 'đ¨', HIGH: 'â ī¸', MEDIUM: 'âĄ', LOW: 'âšī¸', }[analysis.crisis_level]; await axios.post(this.slackWebhook, { text: `${emoji} Crisis Alert: ${analysis.crisis_level}`, blocks: [ { type: 'header', text: { type: 'plain_text', text: `${emoji} Crisis Detected: ${analysis.crisis_level}`, }, }, { type: 'section', fields: [ { type: 'mrkdwn', text: `*Score:* ${analysis.crisis_score}/10` }, { type: 'mrkdwn', text: `*Sentiment:* ${analysis.sentiment}` }, { type: 'mrkdwn', text: `*Sources:* ${signals.length}` }, { type: 'mrkdwn', text: `*Reach:* ${signals.reduce((sum, s) => sum + s.reach, 0).toLocaleString()}`, }, ], }, { type: 'section', text: { type: 'mrkdwn', text: `*Key Issues:*\n${analysis.key_issues.map((i) => `âĸ ${i}`).join('\n')}`, }, }, { type: 'section', text: { type: 'mrkdwn', text: `*Immediate Actions:*\n${analysis.recommended_actions.map((a) => `âĸ ${a}`).join('\n')}`, }, }, ], }); } private async sendPagerDutyAlert(analysis: CrisisAnalysis) { await axios.post( 'https://api.pagerduty.com/incidents', { incident: { type: 'incident', title: `Crisis Detected: ${analysis.crisis_level}`, service: { id: process.env.PAGERDUTY_SERVICE_ID, type: 'service_reference', }, urgency: analysis.crisis_level === 'CRITICAL' ? 'high' : 'low', body: { type: 'incident_body', details: `Crisis score: ${analysis.crisis_score}\nKey issues: ${analysis.key_issues.join(', ')}`, }, }, }, { headers: { Authorization: `Token token=${this.pagerdutyKey}`, 'Content-Type': 'application/json', }, } ); } private async sendEmailAlert(analysis: CrisisAnalysis) { // Implement email sending (SendGrid, AWS SES, etc.) console.log('Email alert sent:', analysis); } private estimateNewsReach(source: string): number { const reachMap: Record<string, number> = { 'TechCrunch': 5000000, 'The Verge': 3000000, 'Reuters': 10000000, 'Bloomberg': 8000000, }; return reachMap[source] || 100000; } private async logError(source: string, error: any) { // Log to monitoring service (Sentry, CloudWatch, etc.) console.error(`[${source}] Error:`, error); } async run() { const keywords = [ '@YourCompany', 'YourProduct defect', 'YourCompany recall', ]; const subreddits = ['YourIndustry', 'ProductReviews']; while (true) { console.log(`[${new Date().toISOString()}] Monitoring...`); try { // Collect from all sources in parallel const [twitterSignals, newsSignals, redditSignals] = await Promise.all( [ this.monitorTwitter(keywords), this.monitorNews(keywords), this.monitorReddit(subreddits), ] ); const allSignals = [ ...twitterSignals, ...newsSignals, ...redditSignals, ]; if (allSignals.length > 0) { const analysis = await this.analyzeSignals(allSignals); await this.sendAlerts(analysis, allSignals); console.log( `Found ${allSignals.length} signals. Level: ${analysis.crisis_level}` ); } } catch (error) { console.error('Monitoring cycle error:', error); } // Poll every 5 minutes await new Promise((resolve) => setTimeout(resolve, 300000)); } } } // Usage const monitor = new CrisisMonitor(); monitor.run();
Level 3: Production Pattern with Real-Time Streaming
Good for: 500+ sources, real-time alerts | Setup time: 1 day
# Production Pattern with Real-Time Streaming (500+ sources) import asyncio import aiohttp import tweepy from anthropic import AsyncAnthropic from dataclasses import dataclass from datetime import datetime import json import redis from typing import List, Dict import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CrisisSignal: source: str content: str sentiment: float reach: int timestamp: str url: str = None keywords: List[str] = None class RealTimeCrisisMonitor: def __init__(self): self.anthropic = AsyncAnthropic(api_key=os.getenv('ANTHROPIC_API_KEY')) self.redis_client = redis.Redis( host='localhost', port=6379, decode_responses=True ) self.signal_buffer = [] self.buffer_lock = asyncio.Lock() async def stream_twitter(self, keywords: List[str]): """Real-time Twitter stream using filtered stream API""" class StreamListener(tweepy.StreamingClient): def __init__(self, monitor, *args, **kwargs): super().__init__(*args, **kwargs) self.monitor = monitor def on_tweet(self, tweet): signal = CrisisSignal( source='twitter', content=tweet.text, sentiment=0, # Will be analyzed reach=tweet.public_metrics.get('impression_count', 0), timestamp=datetime.now().isoformat(), url=f'https://twitter.com/user/status/{tweet.id}' ) asyncio.create_task(self.monitor.process_signal(signal)) stream = StreamListener( self, bearer_token=os.getenv('TWITTER_BEARER_TOKEN') ) # Add rules for keywords for keyword in keywords: stream.add_rules(tweepy.StreamRule(f'"{keyword}"')) stream.filter(tweet_fields=['public_metrics', 'created_at']) async def stream_reddit(self, subreddits: List[str]): """Poll Reddit in real-time (no native streaming)""" while True: for sub in subreddits: try: async with aiohttp.ClientSession() as session: async with session.get( f'https://www.reddit.com/r/{sub}/new.json', headers={'User-Agent': 'CrisisMonitor/2.0'} ) as response: data = await response.json() for post in data['data']['children']: post_data = post['data'] # Check if we've seen this post post_id = f'reddit:{post_data["id"]}' if self.redis_client.exists(post_id): continue self.redis_client.setex(post_id, 3600, '1') signal = CrisisSignal( source='reddit', content=f"{post_data['title']}. {post_data.get('selftext', '')}", sentiment=0, reach=post_data['ups'] * 10, timestamp=datetime.fromtimestamp(post_data['created_utc']).isoformat(), url=f"https://reddit.com{post_data['permalink']}" ) await self.process_signal(signal) except Exception as e: logger.error(f'Reddit streaming error: {e}') await asyncio.sleep(60) # Poll every minute async def stream_news(self, keywords: List[str]): """Poll news APIs for updates""" while True: try: async with aiohttp.ClientSession() as session: async with session.get( 'https://newsapi.org/v2/everything', params={ 'q': ' OR '.join(keywords), 'apiKey': os.getenv('NEWS_API_KEY'), 'sortBy': 'publishedAt', 'pageSize': 20 } ) as response: data = await response.json() for article in data.get('articles', []): article_id = f"news:{article['url']}" if self.redis_client.exists(article_id): continue self.redis_client.setex(article_id, 3600, '1') signal = CrisisSignal( source='news', content=f"{article['title']}. {article['description']}", sentiment=0, reach=self._estimate_news_reach(article['source']['name']), timestamp=article['publishedAt'], url=article['url'] ) await self.process_signal(signal) except Exception as e: logger.error(f'News streaming error: {e}') await asyncio.sleep(300) # Poll every 5 minutes async def process_signal(self, signal: CrisisSignal): """Process individual signal and add to buffer""" async with self.buffer_lock: self.signal_buffer.append(signal) # Analyze in batches of 10 or every 2 minutes if len(self.signal_buffer) >= 10: await self.analyze_batch() async def analyze_batch(self): """Analyze batch of signals for crisis patterns""" async with self.buffer_lock: if not self.signal_buffer: return batch = self.signal_buffer.copy() self.signal_buffer.clear() logger.info(f'Analyzing batch of {len(batch)} signals') # Combine signals for analysis content = '\n\n'.join([ f"[{s.source}] {s.content}" for s in batch ]) prompt = f"""Analyze these crisis signals in real-time. Signals: {content} Return JSON with: - crisis_level: LOW/MEDIUM/HIGH/CRITICAL - crisis_score: 0-10 - sentiment: -1 to +1 - key_issues: array - recommended_actions: array - affected_areas: array - escalation_needed: boolean""" try: response = await self.anthropic.messages.create( model='claude-3-5-sonnet-20241022', max_tokens=2048, messages=[{'role': 'user', 'content': prompt}] ) content = response.content[0] if content.type != 'text': raise ValueError('Invalid response') analysis = json.loads(content.text) # Store in Redis for dashboard self.redis_client.setex( f'analysis:{datetime.now().isoformat()}', 3600, json.dumps(analysis) ) # Send alerts if needed if analysis.get('escalation_needed') or analysis['crisis_level'] in ['HIGH', 'CRITICAL']: await self.send_alerts(analysis, batch) logger.info(f"Analysis complete: {analysis['crisis_level']} (score: {analysis['crisis_score']})") except Exception as e: logger.error(f'Analysis error: {e}') async def send_alerts(self, analysis: Dict, signals: List[CrisisSignal]): """Send alerts through multiple channels""" tasks = [] # Always send Slack tasks.append(self._send_slack(analysis, signals)) # PagerDuty for CRITICAL if analysis['crisis_level'] == 'CRITICAL': tasks.append(self._send_pagerduty(analysis)) tasks.append(self._send_sms(analysis)) # Email for HIGH or CRITICAL if analysis['crisis_level'] in ['HIGH', 'CRITICAL']: tasks.append(self._send_email(analysis)) await asyncio.gather(*tasks, return_exceptions=True) async def _send_slack(self, analysis: Dict, signals: List[CrisisSignal]): emoji_map = { 'CRITICAL': 'đ¨', 'HIGH': 'â ī¸', 'MEDIUM': 'âĄ', 'LOW': 'âšī¸' } total_reach = sum(s.reach for s in signals) message = { 'text': f"{emoji_map[analysis['crisis_level']]} Crisis Alert: {analysis['crisis_level']}", 'blocks': [ { 'type': 'header', 'text': { 'type': 'plain_text', 'text': f"{emoji_map[analysis['crisis_level']]} Crisis Detected: {analysis['crisis_level']}" } }, { 'type': 'section', 'fields': [ {'type': 'mrkdwn', 'text': f"*Score:* {analysis['crisis_score']}/10"}, {'type': 'mrkdwn', 'text': f"*Sentiment:* {analysis['sentiment']}"}, {'type': 'mrkdwn', 'text': f"*Signals:* {len(signals)}"}, {'type': 'mrkdwn', 'text': f"*Reach:* {total_reach:,}"} ] }, { 'type': 'section', 'text': { 'type': 'mrkdwn', 'text': f"*Key Issues:*\n" + '\n'.join([f"âĸ {i}" for i in analysis['key_issues']]) } }, { 'type': 'section', 'text': { 'type': 'mrkdwn', 'text': f"*Immediate Actions:*\n" + '\n'.join([f"âĸ {a}" for a in analysis['recommended_actions']]) } } ] } async with aiohttp.ClientSession() as session: await session.post( os.getenv('SLACK_WEBHOOK_URL'), json=message ) async def _send_pagerduty(self, analysis: Dict): payload = { 'incident': { 'type': 'incident', 'title': f"CRITICAL Crisis: {', '.join(analysis['key_issues'][:2])}", 'service': { 'id': os.getenv('PAGERDUTY_SERVICE_ID'), 'type': 'service_reference' }, 'urgency': 'high', 'body': { 'type': 'incident_body', 'details': f"Crisis score: {analysis['crisis_score']}\nActions: {', '.join(analysis['recommended_actions'])}" } } } async with aiohttp.ClientSession() as session: await session.post( 'https://api.pagerduty.com/incidents', json=payload, headers={ 'Authorization': f"Token token={os.getenv('PAGERDUTY_API_KEY')}", 'Content-Type': 'application/json' } ) async def _send_email(self, analysis: Dict): # Implement email via SendGrid, AWS SES, etc. logger.info(f'Email alert sent: {analysis["crisis_level"]}') async def _send_sms(self, analysis: Dict): # Implement SMS via Twilio, AWS SNS, etc. logger.info(f'SMS alert sent: {analysis["crisis_level"]}') def _estimate_news_reach(self, source: str) -> int: reach_map = { 'TechCrunch': 5000000, 'The Verge': 3000000, 'Reuters': 10000000, 'Bloomberg': 8000000 } return reach_map.get(source, 100000) async def run(self): """Start all monitoring streams""" keywords = ['@YourCompany', 'YourProduct defect', 'YourCompany recall'] subreddits = ['YourIndustry', 'ProductReviews'] logger.info('Starting real-time crisis monitoring...') # Start all streams concurrently await asyncio.gather( self.stream_twitter(keywords), self.stream_reddit(subreddits), self.stream_news(keywords), self._periodic_batch_analysis(), # Backup timer return_exceptions=True ) async def _periodic_batch_analysis(self): """Backup: analyze every 2 minutes even if buffer not full""" while True: await asyncio.sleep(120) await self.analyze_batch() # Usage if __name__ == '__main__': monitor = RealTimeCrisisMonitor() asyncio.run(monitor.run())
When to Level Up
Start: Simple Polling
10-50 sources
- Poll Twitter/news every 15 minutes
- Basic sentiment analysis with GPT-4
- Slack alerts for high-severity issues
- Manual review of all alerts
Scale: Multi-Source with Error Handling
50-500 sources
- Monitor Twitter, Reddit, news, review sites
- Automatic retries and error logging
- PagerDuty integration for critical alerts
- Deduplication via Redis (don't alert twice)
- Email + Slack + SMS routing
Production: Real-Time Streaming
500-5,000 sources
- Real-time Twitter stream (not polling)
- Batch analysis every 2 minutes or 10 signals
- Redis for deduplication and caching
- Multi-channel alerts with escalation paths
- Dashboard for real-time monitoring (Grafana)
Enterprise: Multi-Region, Multi-Agent
5,000+ sources
- Distributed processing across regions (AWS Lambda + SQS)
- Specialized agents: sentiment, entity extraction, trend detection
- Machine learning for false positive reduction
- Custom alert routing by stakeholder role
- Historical analysis and predictive modeling
- Integration with incident management (ServiceNow, Jira)
Leadership-Specific Gotchas
The code examples work. But leadership crisis monitoring has unique challenges you need to handle.
False Positive Fatigue - Don't Cry Wolf
If you alert on every mildly negative tweet, your team will ignore real crises. Use severity thresholds and require multiple signals before escalating. Track alert accuracy and tune thresholds over time.
def should_escalate(analysis: dict, signals: list) -> bool: """Multi-factor escalation decision""" # Require multiple independent sources unique_sources = len(set(s['source'] for s in signals)) if unique_sources < 2 and analysis['crisis_score'] < 9: return False # Check for high-reach sources (news outlets) has_major_news = any( s['source'] == 'news' and s['reach'] > 1000000 for s in signals ) # Require either: # - CRITICAL score (9+) from any source # - HIGH score (7+) from multiple sources # - Major news coverage regardless of score if analysis['crisis_score'] >= 9: return True if analysis['crisis_score'] >= 7 and unique_sources >= 3: return True if has_major_news and analysis['crisis_score'] >= 6: return True return False # Track false positive rate def log_alert_outcome(alert_id: str, was_real_crisis: bool): """Track whether alerts were accurate""" redis_client.hincrby('alert_stats', 'total', 1) if was_real_crisis: redis_client.hincrby('alert_stats', 'true_positives', 1) else: redis_client.hincrby('alert_stats', 'false_positives', 1) # Calculate accuracy stats = redis_client.hgetall('alert_stats') accuracy = int(stats['true_positives']) / int(stats['total']) print(f'Alert accuracy: {accuracy:.1%}')
API Rate Limits - Twitter Will Block You
Twitter's free tier allows 500K tweets/month. If you're monitoring 50+ keywords, you'll hit limits fast. Use filtered streams (real-time) instead of polling, and implement exponential backoff when rate-limited.
class RateLimitedAPI { private requestQueue: Array<() => Promise<any>> = []; private processing = false; private lastRequestTime = 0; private minInterval = 1000; // 1 second between requests async makeRequest<T>(fn: () => Promise<T>): Promise<T> { return new Promise((resolve, reject) => { this.requestQueue.push(async () => { try { // Wait for rate limit window const now = Date.now(); const timeSinceLastRequest = now - this.lastRequestTime; if (timeSinceLastRequest < this.minInterval) { await new Promise((r) => setTimeout(r, this.minInterval - timeSinceLastRequest) ); } this.lastRequestTime = Date.now(); const result = await fn(); resolve(result); } catch (error: any) { // Handle rate limit errors if (error.status === 429) { const retryAfter = parseInt(error.headers['retry-after'] || '60'); console.log(`Rate limited. Retrying after ${retryAfter}s`); await new Promise((r) => setTimeout(r, retryAfter * 1000)); // Retry the request return this.makeRequest(fn).then(resolve).catch(reject); } reject(error); } }); this.processQueue(); }); } private async processQueue() { if (this.processing || this.requestQueue.length === 0) return; this.processing = true; while (this.requestQueue.length > 0) { const request = this.requestQueue.shift()!; await request(); } this.processing = false; } } // Usage const rateLimitedTwitter = new RateLimitedAPI(); const tweets = await rateLimitedTwitter.makeRequest(() => twitter.v2.search(query) );
Sentiment Analysis Isn't Perfect - Sarcasm Kills Accuracy
LLMs struggle with sarcasm and context. 'Great job breaking the product!' reads positive but means negative. Use domain-specific fine-tuning or combine multiple sentiment APIs (OpenAI + AWS Comprehend + custom keywords).
async def multi_source_sentiment(text: str) -> float: """Combine multiple sentiment sources for accuracy""" # Source 1: OpenAI GPT-4 gpt_response = await openai.chat.completions.create( model='gpt-4', messages=[{ 'role': 'user', 'content': f'Rate sentiment -1 to +1, accounting for sarcasm: {text}' }] ) gpt_sentiment = float(gpt_response.choices[0].message.content) # Source 2: AWS Comprehend comprehend = boto3.client('comprehend') aws_response = comprehend.detect_sentiment(Text=text, LanguageCode='en') aws_sentiment = { 'POSITIVE': 0.8, 'NEUTRAL': 0.0, 'NEGATIVE': -0.8, 'MIXED': -0.3 }[aws_response['Sentiment']] # Source 3: Keyword-based (for domain-specific terms) negative_keywords = ['defect', 'fire', 'dangerous', 'recall', 'lawsuit'] positive_keywords = ['love', 'amazing', 'excellent', 'recommend'] text_lower = text.lower() keyword_score = 0 for word in negative_keywords: if word in text_lower: keyword_score -= 0.3 for word in positive_keywords: if word in text_lower: keyword_score += 0.3 # Weighted average (trust GPT-4 most, keywords for edge cases) final_sentiment = ( gpt_sentiment * 0.5 + aws_sentiment * 0.3 + keyword_score * 0.2 ) return max(-1.0, min(1.0, final_sentiment)) # Clamp to [-1, 1]
Time Zone Awareness - Crisis at 3am Is Different
A crisis detected at 3am local time needs different routing than 2pm. Don't wake the CEO for a medium-severity issue at night. Implement time-aware escalation paths and on-call rotations.
interface EscalationRule { severity: string; timeRange: { start: number; end: number }; // 24-hour format channels: string[]; recipients: string[]; } const escalationRules: EscalationRule[] = [ { severity: 'CRITICAL', timeRange: { start: 0, end: 24 }, // Anytime channels: ['pagerduty', 'sms', 'slack', 'email'], recipients: ['ceo', 'cto', 'on-call-engineer'], }, { severity: 'HIGH', timeRange: { start: 8, end: 22 }, // Business hours + evening channels: ['slack', 'email'], recipients: ['crisis-team', 'pr-team'], }, { severity: 'HIGH', timeRange: { start: 22, end: 8 }, // Night channels: ['pagerduty'], // On-call only recipients: ['on-call-engineer'], }, { severity: 'MEDIUM', timeRange: { start: 8, end: 18 }, // Business hours only channels: ['slack'], recipients: ['crisis-team'], }, ]; function getEscalationPath( severity: string, timezone: string = 'America/Los_Angeles' ): EscalationRule | null { const now = new Date(); const localHour = parseInt( now.toLocaleString('en-US', { timeZone: timezone, hour: 'numeric', hour12: false, }) ); // Find matching rule for (const rule of escalationRules) { if (rule.severity !== severity) continue; const { start, end } = rule.timeRange; const isInRange = start <= end ? localHour >= start && localHour < end : localHour >= start || localHour < end; // Handles overnight ranges if (isInRange) return rule; } return null; } // Usage const rule = getEscalationPath('HIGH', 'America/New_York'); if (rule) { await sendAlerts(rule.channels, rule.recipients, analysis); }
Historical Context - Is This Actually New?
A spike in negative mentions might just be normal for your industry (airlines always have complaints). Store historical baselines and alert on anomalies, not absolute numbers. Use rolling averages and standard deviations.
import statistics from datetime import datetime, timedelta def is_anomalous_spike(current_score: float, source: str) -> bool: """Check if current score is statistically anomalous""" # Get past 7 days of scores for this source history_key = f'history:{source}' history = redis_client.lrange(history_key, 0, 167) # 7 days * 24 hours if len(history) < 24: # Need at least 24 hours of data return current_score >= 7 # Use absolute threshold # Convert to floats historical_scores = [float(s) for s in history] # Calculate baseline statistics mean = statistics.mean(historical_scores) stdev = statistics.stdev(historical_scores) # Current score is anomalous if > 2 standard deviations above mean threshold = mean + (2 * stdev) is_anomaly = current_score > threshold # Store current score for future baseline redis_client.lpush(history_key, str(current_score)) redis_client.ltrim(history_key, 0, 167) # Keep only 7 days if is_anomaly: print(f'Anomaly detected: {current_score:.1f} vs baseline {mean:.1f} Âą {stdev:.1f}') return is_anomaly # Usage in analysis if is_anomalous_spike(analysis['crisis_score'], 'twitter'): # This is unusual - escalate await send_alerts(analysis, signals) else: # Normal noise - just log logger.info(f'Score {analysis["crisis_score"]} within normal range')
Cost Calculator
Manual Monitoring
Limitations:
- âĸ Only monitors during business hours
- âĸ 4-6 hour average detection delay
- âĸ Misses 73% of early warning signals
- âĸ Can't scale beyond 10-15 sources
- âĸ No historical trend analysis
Automated Monitoring
Benefits:
- â 24/7 real-time monitoring
- â 2-5 minute average detection time
- â Catches 95%+ of early warning signals
- â Scales to 1000+ sources easily
- â Historical analysis and trend detection
- â Automatic severity scoring and routing