The Problem
On Monday you tested the 3-prompt framework in ChatGPT. You saw how data collection → analysis → recommendations works. But here's the brutal truth: by the time you've manually scraped 10 competitor sites, compiled pricing into a spreadsheet, and written your analysis, your competitors have already changed their pricing, launched new features, or shifted their messaging. Manual competitive intelligence is like reading yesterday's newspaper to make today's decisions.
See It Work
Watch the 3 prompts chain together automatically. This scrapes real competitor data, analyzes it, and generates strategic insights.
The Code
Three levels: start with basic scraping, add intelligence, then scale to real-time monitoring. Pick where you are.
Level 1: Simple Scraping + Analysis
Good for: Weekly manual runs | Setup time: 30 minutes
# Simple Scraping + Analysis (Weekly manual runs) import requests from bs4 import BeautifulSoup import openai import json from datetime import datetime def scrape_competitor_page(url: str) -> str: """Scrape text content from competitor page""" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) soup = BeautifulSoup(response.content, 'html.parser') # Remove scripts, styles, nav, footer for element in soup(['script', 'style', 'nav', 'footer', 'header']): element.decompose() return soup.get_text(separator=' ', strip=True) def extract_competitive_data(competitors: list[dict]) -> dict: """Extract structured data from competitor pages""" results = [] for comp in competitors: # Scrape pricing page pricing_text = scrape_competitor_page(comp['pricing_url']) # Extract with GPT-4 extraction_prompt = f"""Extract pricing and features from this competitor page. Competitor: {comp['name']} Page content: {pricing_text[:8000]} # Limit to fit context window Extract as JSON: {{ "name": "competitor name", "pricing": {{ "standard_rate": "X% + $Y", "volume_discounts": "description", "enterprise_custom": true/false }}, "features": ["feature 1", "feature 2"], "target_market": "who they serve", "messaging": "their main value prop" }}""" response = openai.chat.completions.create( model="gpt-4-turbo-preview", messages=[{"role": "user", "content": extraction_prompt}], temperature=0.3, response_format={"type": "json_object"} ) extracted = json.loads(response.choices[0].message.content) results.append(extracted) return { "timestamp": datetime.now().isoformat(), "competitors": results } def analyze_competitive_landscape(data: dict) -> dict: """Analyze competitive positioning and generate insights""" analysis_prompt = f"""Analyze this competitive intelligence data and provide: 1. Market trends (what patterns do you see?) 2. Competitive gaps (where are vulnerabilities?) 3. Strategic recommendations (what should we do?) Data: {json.dumps(data, indent=2)} Output as JSON: {{ "market_trends": [{{"trend": "", "evidence": "", "implication": ""}}], "competitive_gaps": [{{"gap": "", "opportunity": "", "threat_level": ""}}], "immediate_actions": [{{"action": "", "rationale": "", "timeline": ""}}], "watch_items": [{{"item": "", "why": "", "check_frequency": ""}}] }}""" response = openai.chat.completions.create( model="gpt-4-turbo-preview", messages=[{"role": "user", "content": analysis_prompt}], temperature=0.7, response_format={"type": "json_object"} ) return json.loads(response.choices[0].message.content) # Usage competitors = [ {"name": "Stripe", "pricing_url": "https://stripe.com/pricing"}, {"name": "Square", "pricing_url": "https://squareup.com/pricing"}, {"name": "PayPal", "pricing_url": "https://paypal.com/pricing"} ] # Step 1: Scrape and extract data = extract_competitive_data(competitors) print(f"Extracted data for {len(data['competitors'])} competitors") # Step 2: Analyze analysis = analyze_competitive_landscape(data) print(f"Found {len(analysis['market_trends'])} trends") print(f"Identified {len(analysis['immediate_actions'])} action items") # Save results with open(f"competitive_analysis_{datetime.now().strftime('%Y%m%d')}.json", 'w') as f: json.dump({"data": data, "analysis": analysis}, f, indent=2)
Level 2: With Change Detection & Alerts
Good for: Daily automated runs | Setup time: 2 hours
// With Change Detection & Alerts (Daily automated runs) import Anthropic from '@anthropic-ai/sdk'; import axios from 'axios'; import * as cheerio from 'cheerio'; import { createHash } from 'crypto'; interface CompetitorData { name: string; pricing: any; features: string[]; content_hash: string; scraped_at: string; } interface ChangeDetection { competitor: string; changes: Array<{ field: string; old_value: any; new_value: any; significance: 'high' | 'medium' | 'low'; }>; } class CompetitiveIntelligence { private anthropic: Anthropic; private previousData: Map<string, CompetitorData> = new Map(); constructor(apiKey: string) { this.anthropic = new Anthropic({ apiKey }); } async scrapeWithRetries(url: string, maxRetries = 3): Promise<string> { for (let attempt = 0; attempt < maxRetries; attempt++) { try { const response = await axios.get(url, { headers: { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', }, timeout: 15000, }); const $ = cheerio.load(response.data); $('script, style, nav, footer, header').remove(); return $('body').text().replace(/\s+/g, ' ').trim(); } catch (error) { if (attempt === maxRetries - 1) throw error; await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempt) * 1000) ); } } throw new Error('Max retries exceeded'); } async extractData(name: string, content: string): Promise<CompetitorData> { const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [ { role: 'user', content: `Extract pricing and features for ${name}:\n\n${content.slice(0, 10000)}\n\nOutput as JSON with: pricing (object), features (array), target_market (string)`, }, ], }); const extracted = JSON.parse( (response.content[0] as any).text ); // Generate content hash for change detection const contentHash = createHash('sha256') .update(JSON.stringify(extracted)) .digest('hex'); return { name, ...extracted, content_hash: contentHash, scraped_at: new Date().toISOString(), }; } detectChanges( previous: CompetitorData, current: CompetitorData ): ChangeDetection | null { if (previous.content_hash === current.content_hash) { return null; // No changes } const changes: ChangeDetection['changes'] = []; // Check pricing changes if ( JSON.stringify(previous.pricing) !== JSON.stringify(current.pricing) ) { changes.push({ field: 'pricing', old_value: previous.pricing, new_value: current.pricing, significance: 'high', }); } // Check feature changes const newFeatures = current.features.filter( (f) => !previous.features.includes(f) ); const removedFeatures = previous.features.filter( (f) => !current.features.includes(f) ); if (newFeatures.length > 0 || removedFeatures.length > 0) { changes.push({ field: 'features', old_value: { removed: removedFeatures }, new_value: { added: newFeatures }, significance: newFeatures.length > 2 ? 'high' : 'medium', }); } return changes.length > 0 ? { competitor: current.name, changes } : null; } async sendAlert(changes: ChangeDetection[]): Promise<void> { // Send to Slack/email/etc const highPriorityChanges = changes.filter((c) => c.changes.some((ch) => ch.significance === 'high') ); if (highPriorityChanges.length > 0) { console.log('🚨 HIGH PRIORITY CHANGES DETECTED:'); highPriorityChanges.forEach((change) => { console.log(`\n${change.competitor}:`); change.changes.forEach((ch) => { console.log(` - ${ch.field}: ${JSON.stringify(ch.new_value)}`); }); }); // In production, send to Slack: // await axios.post(process.env.SLACK_WEBHOOK_URL, { // text: `Competitor changes detected: ${highPriorityChanges.map(c => c.competitor).join(', ')}` // }); } } async monitorCompetitors( competitors: Array<{ name: string; url: string }> ): Promise<void> { const allChanges: ChangeDetection[] = []; for (const comp of competitors) { try { console.log(`Scraping ${comp.name}...`); const content = await this.scrapeWithRetries(comp.url); const data = await this.extractData(comp.name, content); // Check for changes const previous = this.previousData.get(comp.name); if (previous) { const changes = this.detectChanges(previous, data); if (changes) { allChanges.push(changes); } } // Store current data this.previousData.set(comp.name, data); } catch (error) { console.error(`Error scraping ${comp.name}:`, error); } } // Send alerts if changes detected if (allChanges.length > 0) { await this.sendAlert(allChanges); } console.log( `\nMonitoring complete. ${allChanges.length} competitors changed.` ); } } // Usage const monitor = new CompetitiveIntelligence(process.env.ANTHROPIC_API_KEY!); const competitors = [ { name: 'Stripe', url: 'https://stripe.com/pricing' }, { name: 'Square', url: 'https://squareup.com/pricing' }, { name: 'PayPal', url: 'https://paypal.com/pricing' }, ]; // Run daily via cron monitor.monitorCompetitors(competitors); // In production, set up cron: // 0 9 * * * node dist/monitor.js
Level 3: Production: Real-Time Monitoring with Multi-Source Intelligence
Good for: Continuous monitoring with alerts | Setup time: 1 day
# Production: Real-Time Monitoring (Continuous with alerts) from langgraph.graph import Graph, END from typing import TypedDict, List, Dict, Any import asyncio import aiohttp from bs4 import BeautifulSoup import openai from datetime import datetime, timedelta import hashlib import json import redis from perplexity import Perplexity class MonitoringState(TypedDict): competitors: List[Dict[str, Any]] scraped_data: List[Dict[str, Any]] previous_data: Dict[str, Dict[str, Any]] changes: List[Dict[str, Any]] news_mentions: List[Dict[str, Any]] analysis: Dict[str, Any] alerts_sent: bool class CompetitiveMonitoringSystem: def __init__(self): self.redis_client = redis.Redis(host='localhost', port=6379, db=0) self.perplexity = Perplexity(api_key=os.getenv('PERPLEXITY_API_KEY')) async def scrape_node(self, state: MonitoringState) -> MonitoringState: """Scrape competitor pages concurrently""" async with aiohttp.ClientSession() as session: tasks = [] for comp in state['competitors']: tasks.append(self._scrape_competitor(session, comp)) state['scraped_data'] = await asyncio.gather(*tasks) return state async def _scrape_competitor(self, session: aiohttp.ClientSession, competitor: Dict[str, Any]) -> Dict[str, Any]: """Scrape single competitor with error handling""" try: async with session.get( competitor['url'], headers={'User-Agent': 'Mozilla/5.0'}, timeout=aiohttp.ClientTimeout(total=15) ) as response: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Remove noise for tag in soup(['script', 'style', 'nav', 'footer']): tag.decompose() content = soup.get_text(separator=' ', strip=True) # Generate content hash content_hash = hashlib.sha256( content.encode('utf-8') ).hexdigest() return { 'name': competitor['name'], 'url': competitor['url'], 'content': content[:10000], # Limit size 'content_hash': content_hash, 'scraped_at': datetime.now().isoformat() } except Exception as e: print(f"Error scraping {competitor['name']}: {e}") return { 'name': competitor['name'], 'error': str(e) } def extract_node(self, state: MonitoringState) -> MonitoringState: """Extract structured data using GPT-4""" for data in state['scraped_data']: if 'error' in data: continue extraction_prompt = f"""Extract competitive intelligence from this page. Competitor: {data['name']} Content: {data['content']} Extract as JSON: {{ "pricing": {{"rates": {{}}, "tiers": [], "discounts": ""}}, "features": ["feature1", "feature2"], "target_market": "description", "unique_value_props": ["prop1", "prop2"], "recent_updates": ["update1"] }}""" response = openai.chat.completions.create( model="gpt-4-turbo-preview", messages=[{"role": "user", "content": extraction_prompt}], temperature=0.3, response_format={"type": "json_object"} ) extracted = json.loads(response.choices[0].message.content) data['extracted'] = extracted data['extraction_hash'] = hashlib.sha256( json.dumps(extracted, sort_keys=True).encode() ).hexdigest() return state def detect_changes_node(self, state: MonitoringState) -> MonitoringState: """Compare with previous data to detect changes""" changes = [] for data in state['scraped_data']: if 'error' in data or 'extracted' not in data: continue # Get previous data from Redis prev_key = f"competitor:{data['name']}:latest" prev_data = self.redis_client.get(prev_key) if prev_data: prev_data = json.loads(prev_data) # Check if content changed if prev_data.get('extraction_hash') != data['extraction_hash']: change = self._analyze_change(prev_data, data) if change: changes.append(change) # Store current data self.redis_client.setex( prev_key, timedelta(days=30), json.dumps(data) ) state['changes'] = changes return state def _analyze_change(self, old: Dict, new: Dict) -> Dict[str, Any]: """Analyze what changed and assess significance""" changes_detected = [] old_extracted = old.get('extracted', {}) new_extracted = new.get('extracted', {}) # Pricing changes (high priority) if old_extracted.get('pricing') != new_extracted.get('pricing'): changes_detected.append({ 'type': 'pricing', 'old': old_extracted.get('pricing'), 'new': new_extracted.get('pricing'), 'significance': 'high' }) # Feature changes (medium priority) old_features = set(old_extracted.get('features', [])) new_features = set(new_extracted.get('features', [])) added_features = new_features - old_features removed_features = old_features - new_features if added_features or removed_features: changes_detected.append({ 'type': 'features', 'added': list(added_features), 'removed': list(removed_features), 'significance': 'high' if len(added_features) > 2 else 'medium' }) if changes_detected: return { 'competitor': new['name'], 'timestamp': new['scraped_at'], 'changes': changes_detected } return None async def news_monitoring_node(self, state: MonitoringState) -> MonitoringState: """Monitor news/social mentions using Perplexity""" mentions = [] for comp in state['competitors']: query = f"Latest news and announcements from {comp['name']} in the last 7 days" response = await self.perplexity.search( query=query, search_recency_filter="week" ) mentions.append({ 'competitor': comp['name'], 'articles': response.get('results', [])[:5], 'summary': response.get('answer', '') }) state['news_mentions'] = mentions return state def analysis_node(self, state: MonitoringState) -> MonitoringState: """Generate strategic analysis of changes""" if not state['changes'] and not state['news_mentions']: state['analysis'] = {'status': 'no_changes'} return state analysis_prompt = f"""Analyze these competitive intelligence updates: Website Changes: {json.dumps(state['changes'], indent=2)} News Mentions: {json.dumps(state['news_mentions'], indent=2)} Provide strategic analysis: 1. What do these changes mean for market positioning? 2. What threats/opportunities do they present? 3. What should we do in response? Output as JSON with: market_implications, threats, opportunities, recommended_actions""" response = openai.chat.completions.create( model="gpt-4-turbo-preview", messages=[{"role": "user", "content": analysis_prompt}], temperature=0.7, response_format={"type": "json_object"} ) state['analysis'] = json.loads(response.choices[0].message.content) return state async def alert_node(self, state: MonitoringState) -> MonitoringState: """Send alerts for high-priority changes""" high_priority = [ change for change in state['changes'] if any(c['significance'] == 'high' for c in change['changes']) ] if high_priority: alert_message = self._format_alert(high_priority, state['analysis']) # Send to Slack await self._send_slack_alert(alert_message) # Send email to strategy team await self._send_email_alert(alert_message) state['alerts_sent'] = True else: state['alerts_sent'] = False return state def _format_alert(self, changes: List[Dict], analysis: Dict) -> str: """Format alert message""" message = "🚨 **HIGH PRIORITY COMPETITIVE CHANGES DETECTED**\n\n" for change in changes: message += f"**{change['competitor']}**\n" for c in change['changes']: if c['significance'] == 'high': message += f" • {c['type'].upper()}: {json.dumps(c.get('new', c.get('added')))}\n" message += "\n" if analysis.get('recommended_actions'): message += "\n**Recommended Actions:**\n" for action in analysis['recommended_actions']: message += f" • {action}\n" return message async def _send_slack_alert(self, message: str): """Send alert to Slack""" webhook_url = os.getenv('SLACK_WEBHOOK_URL') if webhook_url: async with aiohttp.ClientSession() as session: await session.post(webhook_url, json={'text': message}) async def _send_email_alert(self, message: str): """Send email alert""" # Implement email sending (SendGrid, AWS SES, etc.) pass def build_monitoring_graph(self) -> Graph: """Build LangGraph monitoring workflow""" graph = Graph() # Add nodes graph.add_node("scrape", self.scrape_node) graph.add_node("extract", self.extract_node) graph.add_node("detect_changes", self.detect_changes_node) graph.add_node("monitor_news", self.news_monitoring_node) graph.add_node("analyze", self.analysis_node) graph.add_node("alert", self.alert_node) # Add edges graph.set_entry_point("scrape") graph.add_edge("scrape", "extract") graph.add_edge("extract", "detect_changes") graph.add_edge("detect_changes", "monitor_news") graph.add_edge("monitor_news", "analyze") graph.add_edge("analyze", "alert") graph.add_edge("alert", END) return graph.compile() # Usage async def run_monitoring(): system = CompetitiveMonitoringSystem() graph = system.build_monitoring_graph() initial_state: MonitoringState = { 'competitors': [ {'name': 'Stripe', 'url': 'https://stripe.com/pricing'}, {'name': 'Square', 'url': 'https://squareup.com/pricing'}, {'name': 'PayPal', 'url': 'https://paypal.com/pricing'} ], 'scraped_data': [], 'previous_data': {}, 'changes': [], 'news_mentions': [], 'analysis': {}, 'alerts_sent': False } result = await graph.ainvoke(initial_state) print(f"Monitoring complete. Alerts sent: {result['alerts_sent']}") # Run continuously if __name__ == '__main__': while True: asyncio.run(run_monitoring()) # Wait 6 hours between runs time.sleep(6 * 60 * 60)
When to Level Up
Start: Weekly Manual Runs
5-10 competitors, weekly reports
- Basic web scraping with requests/BeautifulSoup
- GPT-4 extraction of pricing and features
- Manual review of results in JSON files
- Copy-paste insights into presentations
Scale: Daily Automated Monitoring
10-20 competitors, daily checks
- Automated cron jobs (daily at 9am)
- Change detection with content hashing
- Slack alerts for pricing/feature changes
- Historical data storage in Redis/PostgreSQL
- Retry logic with exponential backoff
Production: Real-Time Intelligence
20-50 competitors, continuous monitoring
- Multi-source intelligence (web + news + social)
- LangGraph orchestration with parallel scraping
- AI-powered change significance scoring
- Automated strategic analysis and recommendations
- Integration with BI tools (Tableau, Looker)
- Perplexity API for news monitoring
Enterprise: Predictive Intelligence
50+ competitors, global markets
- Predictive modeling (what will competitors do next?)
- Multi-region monitoring (US, EU, APAC)
- Custom ML models for pattern recognition
- Integration with CRM/sales data (see impact on deals)
- Competitive war room dashboards
- API for internal teams to query intelligence
Strategy-Specific Gotchas
The code examples work. But competitive intelligence has unique challenges you need to handle.
Anti-Scraping Measures (Cloudflare, Rate Limits)
Most competitor sites use Cloudflare or similar protection. Simple requests will get blocked. You need residential proxies or headless browsers to avoid detection.
from playwright.async_api import async_playwright import random async def scrape_with_browser(url: str) -> str: """Use headless browser to bypass anti-scraping""" async with async_playwright() as p: # Use real browser fingerprints browser = await p.chromium.launch( headless=True, args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage' ] ) # Randomize user agent user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36' ] context = await browser.new_context( user_agent=random.choice(user_agents), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() # Add random delays to mimic human behavior await page.goto(url, wait_until='networkidle') await page.wait_for_timeout(random.randint(1000, 3000)) content = await page.content() await browser.close() return content # Or use residential proxy service import requests def scrape_with_proxy(url: str) -> str: proxies = { 'http': 'http://username:password@proxy.brightdata.com:22225', 'https': 'http://username:password@proxy.brightdata.com:22225' } response = requests.get(url, proxies=proxies, timeout=30) return response.text
Dynamic Pricing (Changes by Location, User Type)
Many SaaS companies show different prices based on IP location, user agent, or cookies. You need to scrape from multiple locations and user contexts to get the full picture.
// Scrape from multiple locations const LOCATIONS = [ { country: 'US', proxy: 'us-proxy.example.com:8080' }, { country: 'UK', proxy: 'uk-proxy.example.com:8080' }, { country: 'IN', proxy: 'in-proxy.example.com:8080' }, ]; interface PricingByLocation { location: string; pricing: any; currency: string; } async function scrapePricingAllLocations( url: string ): Promise<PricingByLocation[]> { const results: PricingByLocation[] = []; for (const loc of LOCATIONS) { const response = await axios.get(url, { proxy: { host: loc.proxy.split(':')[0], port: parseInt(loc.proxy.split(':')[1]), }, headers: { 'Accept-Language': loc.country === 'US' ? 'en-US' : 'en-GB', }, }); // Extract pricing with GPT-4 const pricing = await extractPricing(response.data); results.push({ location: loc.country, pricing, currency: pricing.currency || 'USD', }); } return results; } // Detect regional pricing differences function detectPricingArbitrage( pricingData: PricingByLocation[] ): Array<{ finding: string; impact: string }> { const findings = []; // Compare US vs other regions const usPricing = pricingData.find((p) => p.location === 'US'); const otherRegions = pricingData.filter((p) => p.location !== 'US'); for (const region of otherRegions) { const priceDiff = calculatePriceDifference( usPricing!.pricing, region.pricing ); if (Math.abs(priceDiff) > 0.15) { // >15% difference findings.push({ finding: `${region.location} pricing is ${priceDiff > 0 ? 'higher' : 'lower'} by ${Math.abs(priceDiff * 100).toFixed(1)}%`, impact: 'Potential arbitrage opportunity or localization strategy', }); } } return findings; }
Content Behind Paywalls or Login Walls
Some competitor resources (case studies, detailed features, pricing calculators) require login. You need automated accounts or use public data sources like G2, Capterra for feature comparisons.
from playwright.async_api import async_playwright async def scrape_behind_login(url: str, credentials: dict) -> str: """Scrape content that requires authentication""" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context() page = await context.new_page() # Navigate to login page await page.goto('https://competitor.com/login') # Fill login form await page.fill('input[name="email"]', credentials['email']) await page.fill('input[name="password"]', credentials['password']) await page.click('button[type="submit"]') # Wait for redirect await page.wait_for_url('**/dashboard', timeout=10000) # Navigate to target page await page.goto(url) await page.wait_for_load_state('networkidle') content = await page.content() await browser.close() return content # Alternative: Use public review sites import requests def scrape_g2_reviews(competitor_name: str) -> dict: """Get feature comparisons from G2 (public data)""" # G2 has public APIs for some data response = requests.get( f'https://www.g2.com/products/{competitor_name}/reviews.json', headers={'Accept': 'application/json'} ) reviews = response.json() # Extract common themes features_mentioned = [] for review in reviews['reviews']: # Parse review text for features features_mentioned.extend(extract_features(review['text'])) return { 'competitor': competitor_name, 'avg_rating': reviews['avg_rating'], 'top_features': most_common(features_mentioned, 10) }
Legal Compliance (ToS Violations, GDPR)
Web scraping can violate Terms of Service. Some jurisdictions (EU) have stricter rules. Use public APIs when available, respect robots.txt, and consult legal before aggressive scraping.
import urllib.robotparser import requests from urllib.parse import urlparse def check_robots_txt(url: str) -> bool: """Check if scraping is allowed by robots.txt""" parsed = urlparse(url) robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" rp = urllib.robotparser.RobotFileParser() rp.set_url(robots_url) rp.read() # Check if our user agent is allowed can_fetch = rp.can_fetch('CompetitiveIntelBot', url) if not can_fetch: print(f"⚠️ robots.txt disallows scraping {url}") print(f"Consider using official API or public data sources") return can_fetch # Use official APIs when available def use_official_apis(): """Prefer official APIs over scraping""" # Example: Stripe has public pricing API stripe_pricing = requests.get( 'https://api.stripe.com/v1/prices', headers={'Authorization': f'Bearer {STRIPE_PUBLIC_KEY}'} ).json() # Example: GitHub has public API for repo stats github_stats = requests.get( 'https://api.github.com/repos/competitor/product', headers={'Accept': 'application/vnd.github.v3+json'} ).json() return { 'stars': github_stats['stargazers_count'], 'forks': github_stats['forks_count'], 'activity': github_stats['updated_at'] } # Add rate limiting and respect site policies import time from ratelimit import limits, sleep_and_retry @sleep_and_retry @limits(calls=10, period=60) # Max 10 requests per minute def polite_scrape(url: str) -> str: """Scrape with rate limiting""" if not check_robots_txt(url): raise ValueError(f"Scraping not allowed for {url}") response = requests.get(url, timeout=10) time.sleep(2) # Be polite, wait between requests return response.text
Data Quality and Hallucination Detection
LLMs can hallucinate pricing or features that aren't actually on the page. Always validate extracted data against multiple sources and flag low-confidence extractions for manual review.
import Anthropic from '@anthropic-ai/sdk'; interface ExtractionResult { data: any; confidence: number; needs_review: boolean; validation_issues: string[]; } async function extractWithValidation( content: string, competitor: string ): Promise<ExtractionResult> { const anthropic = new Anthropic(); // Extract with confidence scoring const response = await anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [ { role: 'user', content: `Extract pricing from this page. For each field, include confidence (0-100).\n\n${content.slice(0, 8000)}\n\nOutput as JSON with confidence scores.`, }, ], }); const extracted = JSON.parse((response.content[0] as any).text); // Validate against known patterns const validationIssues: string[] = []; // Check 1: Pricing should have % or $ symbols if (extracted.pricing && !extracted.pricing.toString().match(/[%$]/)) { validationIssues.push('Pricing missing currency/percentage symbols'); } // Check 2: Feature count should be reasonable (3-20) if ( extracted.features && (extracted.features.length < 3 || extracted.features.length > 20) ) { validationIssues.push( `Unusual feature count: ${extracted.features.length}` ); } // Check 3: Cross-reference with previous data const previousData = await getPreviousData(competitor); if (previousData) { const pricingChange = calculatePricingChange( previousData.pricing, extracted.pricing ); if (Math.abs(pricingChange) > 0.5) { // >50% change validationIssues.push( `Pricing changed by ${(pricingChange * 100).toFixed(1)}% - verify manually` ); } } // Calculate overall confidence const avgConfidence = Object.values(extracted) .filter((v: any) => typeof v === 'object' && v.confidence) .reduce((sum: number, v: any) => sum + v.confidence, 0) / Object.keys(extracted).length; return { data: extracted, confidence: avgConfidence, needs_review: avgConfidence < 70 || validationIssues.length > 0, validation_issues: validationIssues, }; } // Flag low-confidence extractions for human review async function processWithReview( competitors: string[] ): Promise<void> { const results = []; for (const comp of competitors) { const content = await scrapeCompetitor(comp); const extraction = await extractWithValidation(content, comp); if (extraction.needs_review) { // Send to review queue (Slack, Airtable, etc.) await sendToReviewQueue({ competitor: comp, data: extraction.data, issues: extraction.validation_issues, confidence: extraction.confidence, }); } else { // Auto-approve high-confidence extractions results.push(extraction.data); } } console.log( `Processed ${competitors.length} competitors, ${results.filter((r) => r.needs_review).length} need review` ); }
Cost Calculator
Manual Competitive Analysis
Limitations:
- • 7-14 day lag between changes and awareness
- • Limited to 5-10 competitors maximum
- • No real-time alerts for pricing/feature changes
- • Human error in data entry (40% error rate)
- • Can't scale beyond weekly reports
Automated Intelligence System
Benefits:
- ✓ Real-time alerts within 6 hours of changes
- ✓ Monitor 20-50 competitors continuously
- ✓ Automated analysis and strategic recommendations
- ✓ 99% accuracy with validation checks
- ✓ Scales to daily or hourly monitoring