← Monday's Prompts

Automate Competitive Analysis 🚀

From Monday's framework to real-time intelligence

June 3, 2025
📊 Strategy🐍 Python + TypeScript⚡ Weekly → Real-Time

The Problem

On Monday you tested the 3-prompt framework in ChatGPT. You saw how data collection → analysis → recommendations works. But here's the brutal truth: by the time you've manually scraped 10 competitor sites, compiled pricing into a spreadsheet, and written your analysis, your competitors have already changed their pricing, launched new features, or shifted their messaging. Manual competitive intelligence is like reading yesterday's newspaper to make today's decisions.

8-12 hours
Per week manually tracking 5-10 competitors
7-14 days
Lag time between competitor changes and your awareness
Can't scale
Beyond 10 competitors without hiring analysts

See It Work

Watch the 3 prompts chain together automatically. This scrapes real competitor data, analyzes it, and generates strategic insights.

The Code

Three levels: start with basic scraping, add intelligence, then scale to real-time monitoring. Pick where you are.

Level 1: Simple Scraping + Analysis

Good for: Weekly manual runs | Setup time: 30 minutes

# Simple Scraping + Analysis (Weekly manual runs)
import requests
from bs4 import BeautifulSoup
import openai
import json
from datetime import datetime

def scrape_competitor_page(url: str) -> str:
    """Scrape text content from competitor page"""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    response = requests.get(url, headers=headers, timeout=10)
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Remove scripts, styles, nav, footer
    for element in soup(['script', 'style', 'nav', 'footer', 'header']):
        element.decompose()
    
    return soup.get_text(separator=' ', strip=True)

def extract_competitive_data(competitors: list[dict]) -> dict:
    """Extract structured data from competitor pages"""
    results = []
    
    for comp in competitors:
        # Scrape pricing page
        pricing_text = scrape_competitor_page(comp['pricing_url'])
        
        # Extract with GPT-4
        extraction_prompt = f"""Extract pricing and features from this competitor page.
Competitor: {comp['name']}

Page content:
{pricing_text[:8000]}  # Limit to fit context window

Extract as JSON:
{{
  "name": "competitor name",
  "pricing": {{
    "standard_rate": "X% + $Y",
    "volume_discounts": "description",
    "enterprise_custom": true/false
  }},
  "features": ["feature 1", "feature 2"],
  "target_market": "who they serve",
  "messaging": "their main value prop"
}}"""

        response = openai.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[{"role": "user", "content": extraction_prompt}],
            temperature=0.3,
            response_format={"type": "json_object"}
        )
        
        extracted = json.loads(response.choices[0].message.content)
        results.append(extracted)
    
    return {
        "timestamp": datetime.now().isoformat(),
        "competitors": results
    }

def analyze_competitive_landscape(data: dict) -> dict:
    """Analyze competitive positioning and generate insights"""
    analysis_prompt = f"""Analyze this competitive intelligence data and provide:
1. Market trends (what patterns do you see?)
2. Competitive gaps (where are vulnerabilities?)
3. Strategic recommendations (what should we do?)

Data:
{json.dumps(data, indent=2)}

Output as JSON:
{{
  "market_trends": [{{"trend": "", "evidence": "", "implication": ""}}],
  "competitive_gaps": [{{"gap": "", "opportunity": "", "threat_level": ""}}],
  "immediate_actions": [{{"action": "", "rationale": "", "timeline": ""}}],
  "watch_items": [{{"item": "", "why": "", "check_frequency": ""}}]
}}"""

    response = openai.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{"role": "user", "content": analysis_prompt}],
        temperature=0.7,
        response_format={"type": "json_object"}
    )
    
    return json.loads(response.choices[0].message.content)

# Usage
competitors = [
    {"name": "Stripe", "pricing_url": "https://stripe.com/pricing"},
    {"name": "Square", "pricing_url": "https://squareup.com/pricing"},
    {"name": "PayPal", "pricing_url": "https://paypal.com/pricing"}
]

# Step 1: Scrape and extract
data = extract_competitive_data(competitors)
print(f"Extracted data for {len(data['competitors'])} competitors")

# Step 2: Analyze
analysis = analyze_competitive_landscape(data)
print(f"Found {len(analysis['market_trends'])} trends")
print(f"Identified {len(analysis['immediate_actions'])} action items")

# Save results
with open(f"competitive_analysis_{datetime.now().strftime('%Y%m%d')}.json", 'w') as f:
    json.dump({"data": data, "analysis": analysis}, f, indent=2)

Level 2: With Change Detection & Alerts

Good for: Daily automated runs | Setup time: 2 hours

// With Change Detection & Alerts (Daily automated runs)
import Anthropic from '@anthropic-ai/sdk';
import axios from 'axios';
import * as cheerio from 'cheerio';
import { createHash } from 'crypto';

interface CompetitorData {
  name: string;
  pricing: any;
  features: string[];
  content_hash: string;
  scraped_at: string;
}

interface ChangeDetection {
  competitor: string;
  changes: Array<{
    field: string;
    old_value: any;
    new_value: any;
    significance: 'high' | 'medium' | 'low';
  }>;
}

class CompetitiveIntelligence {
  private anthropic: Anthropic;
  private previousData: Map<string, CompetitorData> = new Map();

  constructor(apiKey: string) {
    this.anthropic = new Anthropic({ apiKey });
  }

  async scrapeWithRetries(url: string, maxRetries = 3): Promise<string> {
    for (let attempt = 0; attempt < maxRetries; attempt++) {
      try {
        const response = await axios.get(url, {
          headers: {
            'User-Agent':
              'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
          },
          timeout: 15000,
        });

        const $ = cheerio.load(response.data);
        $('script, style, nav, footer, header').remove();
        return $('body').text().replace(/\s+/g, ' ').trim();
      } catch (error) {
        if (attempt === maxRetries - 1) throw error;
        await new Promise((resolve) =>
          setTimeout(resolve, Math.pow(2, attempt) * 1000)
        );
      }
    }
    throw new Error('Max retries exceeded');
  }

  async extractData(name: string, content: string): Promise<CompetitorData> {
    const response = await this.anthropic.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 2048,
      messages: [
        {
          role: 'user',
          content: `Extract pricing and features for ${name}:\n\n${content.slice(0, 10000)}\n\nOutput as JSON with: pricing (object), features (array), target_market (string)`,
        },
      ],
    });

    const extracted = JSON.parse(
      (response.content[0] as any).text
    );

    // Generate content hash for change detection
    const contentHash = createHash('sha256')
      .update(JSON.stringify(extracted))
      .digest('hex');

    return {
      name,
      ...extracted,
      content_hash: contentHash,
      scraped_at: new Date().toISOString(),
    };
  }

  detectChanges(
    previous: CompetitorData,
    current: CompetitorData
  ): ChangeDetection | null {
    if (previous.content_hash === current.content_hash) {
      return null; // No changes
    }

    const changes: ChangeDetection['changes'] = [];

    // Check pricing changes
    if (
      JSON.stringify(previous.pricing) !== JSON.stringify(current.pricing)
    ) {
      changes.push({
        field: 'pricing',
        old_value: previous.pricing,
        new_value: current.pricing,
        significance: 'high',
      });
    }

    // Check feature changes
    const newFeatures = current.features.filter(
      (f) => !previous.features.includes(f)
    );
    const removedFeatures = previous.features.filter(
      (f) => !current.features.includes(f)
    );

    if (newFeatures.length > 0 || removedFeatures.length > 0) {
      changes.push({
        field: 'features',
        old_value: { removed: removedFeatures },
        new_value: { added: newFeatures },
        significance: newFeatures.length > 2 ? 'high' : 'medium',
      });
    }

    return changes.length > 0
      ? { competitor: current.name, changes }
      : null;
  }

  async sendAlert(changes: ChangeDetection[]): Promise<void> {
    // Send to Slack/email/etc
    const highPriorityChanges = changes.filter((c) =>
      c.changes.some((ch) => ch.significance === 'high')
    );

    if (highPriorityChanges.length > 0) {
      console.log('🚨 HIGH PRIORITY CHANGES DETECTED:');
      highPriorityChanges.forEach((change) => {
        console.log(`\n${change.competitor}:`);
        change.changes.forEach((ch) => {
          console.log(`  - ${ch.field}: ${JSON.stringify(ch.new_value)}`);
        });
      });

      // In production, send to Slack:
      // await axios.post(process.env.SLACK_WEBHOOK_URL, {
      //   text: `Competitor changes detected: ${highPriorityChanges.map(c => c.competitor).join(', ')}`
      // });
    }
  }

  async monitorCompetitors(
    competitors: Array<{ name: string; url: string }>
  ): Promise<void> {
    const allChanges: ChangeDetection[] = [];

    for (const comp of competitors) {
      try {
        console.log(`Scraping ${comp.name}...`);
        const content = await this.scrapeWithRetries(comp.url);
        const data = await this.extractData(comp.name, content);

        // Check for changes
        const previous = this.previousData.get(comp.name);
        if (previous) {
          const changes = this.detectChanges(previous, data);
          if (changes) {
            allChanges.push(changes);
          }
        }

        // Store current data
        this.previousData.set(comp.name, data);
      } catch (error) {
        console.error(`Error scraping ${comp.name}:`, error);
      }
    }

    // Send alerts if changes detected
    if (allChanges.length > 0) {
      await this.sendAlert(allChanges);
    }

    console.log(
      `\nMonitoring complete. ${allChanges.length} competitors changed.`
    );
  }
}

// Usage
const monitor = new CompetitiveIntelligence(process.env.ANTHROPIC_API_KEY!);

const competitors = [
  { name: 'Stripe', url: 'https://stripe.com/pricing' },
  { name: 'Square', url: 'https://squareup.com/pricing' },
  { name: 'PayPal', url: 'https://paypal.com/pricing' },
];

// Run daily via cron
monitor.monitorCompetitors(competitors);

// In production, set up cron:
// 0 9 * * * node dist/monitor.js

Level 3: Production: Real-Time Monitoring with Multi-Source Intelligence

Good for: Continuous monitoring with alerts | Setup time: 1 day

# Production: Real-Time Monitoring (Continuous with alerts)
from langgraph.graph import Graph, END
from typing import TypedDict, List, Dict, Any
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import openai
from datetime import datetime, timedelta
import hashlib
import json
import redis
from perplexity import Perplexity

class MonitoringState(TypedDict):
    competitors: List[Dict[str, Any]]
    scraped_data: List[Dict[str, Any]]
    previous_data: Dict[str, Dict[str, Any]]
    changes: List[Dict[str, Any]]
    news_mentions: List[Dict[str, Any]]
    analysis: Dict[str, Any]
    alerts_sent: bool

class CompetitiveMonitoringSystem:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.perplexity = Perplexity(api_key=os.getenv('PERPLEXITY_API_KEY'))
        
    async def scrape_node(self, state: MonitoringState) -> MonitoringState:
        """Scrape competitor pages concurrently"""
        async with aiohttp.ClientSession() as session:
            tasks = []
            for comp in state['competitors']:
                tasks.append(self._scrape_competitor(session, comp))
            
            state['scraped_data'] = await asyncio.gather(*tasks)
        
        return state
    
    async def _scrape_competitor(self, session: aiohttp.ClientSession, 
                                 competitor: Dict[str, Any]) -> Dict[str, Any]:
        """Scrape single competitor with error handling"""
        try:
            async with session.get(
                competitor['url'],
                headers={'User-Agent': 'Mozilla/5.0'},
                timeout=aiohttp.ClientTimeout(total=15)
            ) as response:
                html = await response.text()
                soup = BeautifulSoup(html, 'html.parser')
                
                # Remove noise
                for tag in soup(['script', 'style', 'nav', 'footer']):
                    tag.decompose()
                
                content = soup.get_text(separator=' ', strip=True)
                
                # Generate content hash
                content_hash = hashlib.sha256(
                    content.encode('utf-8')
                ).hexdigest()
                
                return {
                    'name': competitor['name'],
                    'url': competitor['url'],
                    'content': content[:10000],  # Limit size
                    'content_hash': content_hash,
                    'scraped_at': datetime.now().isoformat()
                }
        except Exception as e:
            print(f"Error scraping {competitor['name']}: {e}")
            return {
                'name': competitor['name'],
                'error': str(e)
            }
    
    def extract_node(self, state: MonitoringState) -> MonitoringState:
        """Extract structured data using GPT-4"""
        for data in state['scraped_data']:
            if 'error' in data:
                continue
            
            extraction_prompt = f"""Extract competitive intelligence from this page.
Competitor: {data['name']}

Content:
{data['content']}

Extract as JSON:
{{
  "pricing": {{"rates": {{}}, "tiers": [], "discounts": ""}},
  "features": ["feature1", "feature2"],
  "target_market": "description",
  "unique_value_props": ["prop1", "prop2"],
  "recent_updates": ["update1"]
}}"""

            response = openai.chat.completions.create(
                model="gpt-4-turbo-preview",
                messages=[{"role": "user", "content": extraction_prompt}],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            extracted = json.loads(response.choices[0].message.content)
            data['extracted'] = extracted
            data['extraction_hash'] = hashlib.sha256(
                json.dumps(extracted, sort_keys=True).encode()
            ).hexdigest()
        
        return state
    
    def detect_changes_node(self, state: MonitoringState) -> MonitoringState:
        """Compare with previous data to detect changes"""
        changes = []
        
        for data in state['scraped_data']:
            if 'error' in data or 'extracted' not in data:
                continue
            
            # Get previous data from Redis
            prev_key = f"competitor:{data['name']}:latest"
            prev_data = self.redis_client.get(prev_key)
            
            if prev_data:
                prev_data = json.loads(prev_data)
                
                # Check if content changed
                if prev_data.get('extraction_hash') != data['extraction_hash']:
                    change = self._analyze_change(prev_data, data)
                    if change:
                        changes.append(change)
            
            # Store current data
            self.redis_client.setex(
                prev_key,
                timedelta(days=30),
                json.dumps(data)
            )
        
        state['changes'] = changes
        return state
    
    def _analyze_change(self, old: Dict, new: Dict) -> Dict[str, Any]:
        """Analyze what changed and assess significance"""
        changes_detected = []
        
        old_extracted = old.get('extracted', {})
        new_extracted = new.get('extracted', {})
        
        # Pricing changes (high priority)
        if old_extracted.get('pricing') != new_extracted.get('pricing'):
            changes_detected.append({
                'type': 'pricing',
                'old': old_extracted.get('pricing'),
                'new': new_extracted.get('pricing'),
                'significance': 'high'
            })
        
        # Feature changes (medium priority)
        old_features = set(old_extracted.get('features', []))
        new_features = set(new_extracted.get('features', []))
        
        added_features = new_features - old_features
        removed_features = old_features - new_features
        
        if added_features or removed_features:
            changes_detected.append({
                'type': 'features',
                'added': list(added_features),
                'removed': list(removed_features),
                'significance': 'high' if len(added_features) > 2 else 'medium'
            })
        
        if changes_detected:
            return {
                'competitor': new['name'],
                'timestamp': new['scraped_at'],
                'changes': changes_detected
            }
        
        return None
    
    async def news_monitoring_node(self, state: MonitoringState) -> MonitoringState:
        """Monitor news/social mentions using Perplexity"""
        mentions = []
        
        for comp in state['competitors']:
            query = f"Latest news and announcements from {comp['name']} in the last 7 days"
            
            response = await self.perplexity.search(
                query=query,
                search_recency_filter="week"
            )
            
            mentions.append({
                'competitor': comp['name'],
                'articles': response.get('results', [])[:5],
                'summary': response.get('answer', '')
            })
        
        state['news_mentions'] = mentions
        return state
    
    def analysis_node(self, state: MonitoringState) -> MonitoringState:
        """Generate strategic analysis of changes"""
        if not state['changes'] and not state['news_mentions']:
            state['analysis'] = {'status': 'no_changes'}
            return state
        
        analysis_prompt = f"""Analyze these competitive intelligence updates:

Website Changes:
{json.dumps(state['changes'], indent=2)}

News Mentions:
{json.dumps(state['news_mentions'], indent=2)}

Provide strategic analysis:
1. What do these changes mean for market positioning?
2. What threats/opportunities do they present?
3. What should we do in response?

Output as JSON with: market_implications, threats, opportunities, recommended_actions"""

        response = openai.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[{"role": "user", "content": analysis_prompt}],
            temperature=0.7,
            response_format={"type": "json_object"}
        )
        
        state['analysis'] = json.loads(response.choices[0].message.content)
        return state
    
    async def alert_node(self, state: MonitoringState) -> MonitoringState:
        """Send alerts for high-priority changes"""
        high_priority = [
            change for change in state['changes']
            if any(c['significance'] == 'high' for c in change['changes'])
        ]
        
        if high_priority:
            alert_message = self._format_alert(high_priority, state['analysis'])
            
            # Send to Slack
            await self._send_slack_alert(alert_message)
            
            # Send email to strategy team
            await self._send_email_alert(alert_message)
            
            state['alerts_sent'] = True
        else:
            state['alerts_sent'] = False
        
        return state
    
    def _format_alert(self, changes: List[Dict], analysis: Dict) -> str:
        """Format alert message"""
        message = "🚨 **HIGH PRIORITY COMPETITIVE CHANGES DETECTED**\n\n"
        
        for change in changes:
            message += f"**{change['competitor']}**\n"
            for c in change['changes']:
                if c['significance'] == 'high':
                    message += f"  • {c['type'].upper()}: {json.dumps(c.get('new', c.get('added')))}\n"
            message += "\n"
        
        if analysis.get('recommended_actions'):
            message += "\n**Recommended Actions:**\n"
            for action in analysis['recommended_actions']:
                message += f"  • {action}\n"
        
        return message
    
    async def _send_slack_alert(self, message: str):
        """Send alert to Slack"""
        webhook_url = os.getenv('SLACK_WEBHOOK_URL')
        if webhook_url:
            async with aiohttp.ClientSession() as session:
                await session.post(webhook_url, json={'text': message})
    
    async def _send_email_alert(self, message: str):
        """Send email alert"""
        # Implement email sending (SendGrid, AWS SES, etc.)
        pass
    
    def build_monitoring_graph(self) -> Graph:
        """Build LangGraph monitoring workflow"""
        graph = Graph()
        
        # Add nodes
        graph.add_node("scrape", self.scrape_node)
        graph.add_node("extract", self.extract_node)
        graph.add_node("detect_changes", self.detect_changes_node)
        graph.add_node("monitor_news", self.news_monitoring_node)
        graph.add_node("analyze", self.analysis_node)
        graph.add_node("alert", self.alert_node)
        
        # Add edges
        graph.set_entry_point("scrape")
        graph.add_edge("scrape", "extract")
        graph.add_edge("extract", "detect_changes")
        graph.add_edge("detect_changes", "monitor_news")
        graph.add_edge("monitor_news", "analyze")
        graph.add_edge("analyze", "alert")
        graph.add_edge("alert", END)
        
        return graph.compile()

# Usage
async def run_monitoring():
    system = CompetitiveMonitoringSystem()
    graph = system.build_monitoring_graph()
    
    initial_state: MonitoringState = {
        'competitors': [
            {'name': 'Stripe', 'url': 'https://stripe.com/pricing'},
            {'name': 'Square', 'url': 'https://squareup.com/pricing'},
            {'name': 'PayPal', 'url': 'https://paypal.com/pricing'}
        ],
        'scraped_data': [],
        'previous_data': {},
        'changes': [],
        'news_mentions': [],
        'analysis': {},
        'alerts_sent': False
    }
    
    result = await graph.ainvoke(initial_state)
    print(f"Monitoring complete. Alerts sent: {result['alerts_sent']}")

# Run continuously
if __name__ == '__main__':
    while True:
        asyncio.run(run_monitoring())
        # Wait 6 hours between runs
        time.sleep(6 * 60 * 60)

When to Level Up

1

Start: Weekly Manual Runs

5-10 competitors, weekly reports

  • Basic web scraping with requests/BeautifulSoup
  • GPT-4 extraction of pricing and features
  • Manual review of results in JSON files
  • Copy-paste insights into presentations
2

Scale: Daily Automated Monitoring

10-20 competitors, daily checks

  • Automated cron jobs (daily at 9am)
  • Change detection with content hashing
  • Slack alerts for pricing/feature changes
  • Historical data storage in Redis/PostgreSQL
  • Retry logic with exponential backoff
3

Production: Real-Time Intelligence

20-50 competitors, continuous monitoring

  • Multi-source intelligence (web + news + social)
  • LangGraph orchestration with parallel scraping
  • AI-powered change significance scoring
  • Automated strategic analysis and recommendations
  • Integration with BI tools (Tableau, Looker)
  • Perplexity API for news monitoring
4

Enterprise: Predictive Intelligence

50+ competitors, global markets

  • Predictive modeling (what will competitors do next?)
  • Multi-region monitoring (US, EU, APAC)
  • Custom ML models for pattern recognition
  • Integration with CRM/sales data (see impact on deals)
  • Competitive war room dashboards
  • API for internal teams to query intelligence

Strategy-Specific Gotchas

The code examples work. But competitive intelligence has unique challenges you need to handle.

Anti-Scraping Measures (Cloudflare, Rate Limits)

Most competitor sites use Cloudflare or similar protection. Simple requests will get blocked. You need residential proxies or headless browsers to avoid detection.

from playwright.async_api import async_playwright
import random

async def scrape_with_browser(url: str) -> str:
    """Use headless browser to bypass anti-scraping"""
    async with async_playwright() as p:
        # Use real browser fingerprints
        browser = await p.chromium.launch(
            headless=True,
            args=[
                '--disable-blink-features=AutomationControlled',
                '--disable-dev-shm-usage'
            ]
        )
        
        # Randomize user agent
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
        ]
        
        context = await browser.new_context(
            user_agent=random.choice(user_agents),
            viewport={'width': 1920, 'height': 1080}
        )
        
        page = await context.new_page()
        
        # Add random delays to mimic human behavior
        await page.goto(url, wait_until='networkidle')
        await page.wait_for_timeout(random.randint(1000, 3000))
        
        content = await page.content()
        await browser.close()
        
        return content

# Or use residential proxy service
import requests

def scrape_with_proxy(url: str) -> str:
    proxies = {
        'http': 'http://username:password@proxy.brightdata.com:22225',
        'https': 'http://username:password@proxy.brightdata.com:22225'
    }
    
    response = requests.get(url, proxies=proxies, timeout=30)
    return response.text

Dynamic Pricing (Changes by Location, User Type)

Many SaaS companies show different prices based on IP location, user agent, or cookies. You need to scrape from multiple locations and user contexts to get the full picture.

// Scrape from multiple locations
const LOCATIONS = [
  { country: 'US', proxy: 'us-proxy.example.com:8080' },
  { country: 'UK', proxy: 'uk-proxy.example.com:8080' },
  { country: 'IN', proxy: 'in-proxy.example.com:8080' },
];

interface PricingByLocation {
  location: string;
  pricing: any;
  currency: string;
}

async function scrapePricingAllLocations(
  url: string
): Promise<PricingByLocation[]> {
  const results: PricingByLocation[] = [];

  for (const loc of LOCATIONS) {
    const response = await axios.get(url, {
      proxy: {
        host: loc.proxy.split(':')[0],
        port: parseInt(loc.proxy.split(':')[1]),
      },
      headers: {
        'Accept-Language': loc.country === 'US' ? 'en-US' : 'en-GB',
      },
    });

    // Extract pricing with GPT-4
    const pricing = await extractPricing(response.data);

    results.push({
      location: loc.country,
      pricing,
      currency: pricing.currency || 'USD',
    });
  }

  return results;
}

// Detect regional pricing differences
function detectPricingArbitrage(
  pricingData: PricingByLocation[]
): Array<{ finding: string; impact: string }> {
  const findings = [];

  // Compare US vs other regions
  const usPricing = pricingData.find((p) => p.location === 'US');
  const otherRegions = pricingData.filter((p) => p.location !== 'US');

  for (const region of otherRegions) {
    const priceDiff = calculatePriceDifference(
      usPricing!.pricing,
      region.pricing
    );

    if (Math.abs(priceDiff) > 0.15) {
      // >15% difference
      findings.push({
        finding: `${region.location} pricing is ${priceDiff > 0 ? 'higher' : 'lower'} by ${Math.abs(priceDiff * 100).toFixed(1)}%`,
        impact: 'Potential arbitrage opportunity or localization strategy',
      });
    }
  }

  return findings;
}

Content Behind Paywalls or Login Walls

Some competitor resources (case studies, detailed features, pricing calculators) require login. You need automated accounts or use public data sources like G2, Capterra for feature comparisons.

from playwright.async_api import async_playwright

async def scrape_behind_login(url: str, credentials: dict) -> str:
    """Scrape content that requires authentication"""
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()
        page = await context.new_page()
        
        # Navigate to login page
        await page.goto('https://competitor.com/login')
        
        # Fill login form
        await page.fill('input[name="email"]', credentials['email'])
        await page.fill('input[name="password"]', credentials['password'])
        await page.click('button[type="submit"]')
        
        # Wait for redirect
        await page.wait_for_url('**/dashboard', timeout=10000)
        
        # Navigate to target page
        await page.goto(url)
        await page.wait_for_load_state('networkidle')
        
        content = await page.content()
        await browser.close()
        
        return content

# Alternative: Use public review sites
import requests

def scrape_g2_reviews(competitor_name: str) -> dict:
    """Get feature comparisons from G2 (public data)"""
    # G2 has public APIs for some data
    response = requests.get(
        f'https://www.g2.com/products/{competitor_name}/reviews.json',
        headers={'Accept': 'application/json'}
    )
    
    reviews = response.json()
    
    # Extract common themes
    features_mentioned = []
    for review in reviews['reviews']:
        # Parse review text for features
        features_mentioned.extend(extract_features(review['text']))
    
    return {
        'competitor': competitor_name,
        'avg_rating': reviews['avg_rating'],
        'top_features': most_common(features_mentioned, 10)
    }

Legal Compliance (ToS Violations, GDPR)

Web scraping can violate Terms of Service. Some jurisdictions (EU) have stricter rules. Use public APIs when available, respect robots.txt, and consult legal before aggressive scraping.

import urllib.robotparser
import requests
from urllib.parse import urlparse

def check_robots_txt(url: str) -> bool:
    """Check if scraping is allowed by robots.txt"""
    parsed = urlparse(url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(robots_url)
    rp.read()
    
    # Check if our user agent is allowed
    can_fetch = rp.can_fetch('CompetitiveIntelBot', url)
    
    if not can_fetch:
        print(f"⚠️  robots.txt disallows scraping {url}")
        print(f"Consider using official API or public data sources")
    
    return can_fetch

# Use official APIs when available
def use_official_apis():
    """Prefer official APIs over scraping"""
    # Example: Stripe has public pricing API
    stripe_pricing = requests.get(
        'https://api.stripe.com/v1/prices',
        headers={'Authorization': f'Bearer {STRIPE_PUBLIC_KEY}'}
    ).json()
    
    # Example: GitHub has public API for repo stats
    github_stats = requests.get(
        'https://api.github.com/repos/competitor/product',
        headers={'Accept': 'application/vnd.github.v3+json'}
    ).json()
    
    return {
        'stars': github_stats['stargazers_count'],
        'forks': github_stats['forks_count'],
        'activity': github_stats['updated_at']
    }

# Add rate limiting and respect site policies
import time
from ratelimit import limits, sleep_and_retry

@sleep_and_retry
@limits(calls=10, period=60)  # Max 10 requests per minute
def polite_scrape(url: str) -> str:
    """Scrape with rate limiting"""
    if not check_robots_txt(url):
        raise ValueError(f"Scraping not allowed for {url}")
    
    response = requests.get(url, timeout=10)
    time.sleep(2)  # Be polite, wait between requests
    return response.text

Data Quality and Hallucination Detection

LLMs can hallucinate pricing or features that aren't actually on the page. Always validate extracted data against multiple sources and flag low-confidence extractions for manual review.

import Anthropic from '@anthropic-ai/sdk';

interface ExtractionResult {
  data: any;
  confidence: number;
  needs_review: boolean;
  validation_issues: string[];
}

async function extractWithValidation(
  content: string,
  competitor: string
): Promise<ExtractionResult> {
  const anthropic = new Anthropic();

  // Extract with confidence scoring
  const response = await anthropic.messages.create({
    model: 'claude-3-5-sonnet-20241022',
    max_tokens: 2048,
    messages: [
      {
        role: 'user',
        content: `Extract pricing from this page. For each field, include confidence (0-100).\n\n${content.slice(0, 8000)}\n\nOutput as JSON with confidence scores.`,
      },
    ],
  });

  const extracted = JSON.parse((response.content[0] as any).text);

  // Validate against known patterns
  const validationIssues: string[] = [];

  // Check 1: Pricing should have % or $ symbols
  if (extracted.pricing && !extracted.pricing.toString().match(/[%$]/)) {
    validationIssues.push('Pricing missing currency/percentage symbols');
  }

  // Check 2: Feature count should be reasonable (3-20)
  if (
    extracted.features &&
    (extracted.features.length < 3 || extracted.features.length > 20)
  ) {
    validationIssues.push(
      `Unusual feature count: ${extracted.features.length}`
    );
  }

  // Check 3: Cross-reference with previous data
  const previousData = await getPreviousData(competitor);
  if (previousData) {
    const pricingChange = calculatePricingChange(
      previousData.pricing,
      extracted.pricing
    );
    if (Math.abs(pricingChange) > 0.5) {
      // >50% change
      validationIssues.push(
        `Pricing changed by ${(pricingChange * 100).toFixed(1)}% - verify manually`
      );
    }
  }

  // Calculate overall confidence
  const avgConfidence =
    Object.values(extracted)
      .filter((v: any) => typeof v === 'object' && v.confidence)
      .reduce((sum: number, v: any) => sum + v.confidence, 0) /
    Object.keys(extracted).length;

  return {
    data: extracted,
    confidence: avgConfidence,
    needs_review: avgConfidence < 70 || validationIssues.length > 0,
    validation_issues: validationIssues,
  };
}

// Flag low-confidence extractions for human review
async function processWithReview(
  competitors: string[]
): Promise<void> {
  const results = [];

  for (const comp of competitors) {
    const content = await scrapeCompetitor(comp);
    const extraction = await extractWithValidation(content, comp);

    if (extraction.needs_review) {
      // Send to review queue (Slack, Airtable, etc.)
      await sendToReviewQueue({
        competitor: comp,
        data: extraction.data,
        issues: extraction.validation_issues,
        confidence: extraction.confidence,
      });
    } else {
      // Auto-approve high-confidence extractions
      results.push(extraction.data);
    }
  }

  console.log(
    `Processed ${competitors.length} competitors, ${results.filter((r) => r.needs_review).length} need review`
  );
}

Cost Calculator

Manual Competitive Analysis

Analyst time (8 hrs/week @ $75/hr)
$600
Research tools (SimilarWeb, SpyFu)
$400
Report compilation and presentation
$200
Total:$3,600
monthly

Limitations:

  • 7-14 day lag between changes and awareness
  • Limited to 5-10 competitors maximum
  • No real-time alerts for pricing/feature changes
  • Human error in data entry (40% error rate)
  • Can't scale beyond weekly reports

Automated Intelligence System

OpenAI API (GPT-4, ~500 requests/month)
$150
Web scraping service (Bright Data/Apify)
$200
Perplexity API (news monitoring)
$100
Redis/PostgreSQL hosting
$50
Setup and maintenance (2 hrs/week @ $150/hr)
$300
Total:$800
monthly

Benefits:

  • Real-time alerts within 6 hours of changes
  • Monitor 20-50 competitors continuously
  • Automated analysis and strategic recommendations
  • 99% accuracy with validation checks
  • Scales to daily or hourly monitoring
$93/day saved
78% cost reduction | $2,800/month | $33,600/year
💡 Pays for itself in 2 weeks
📊

Want This Running for Your Team?

We build custom competitive intelligence systems that monitor your market 24/7. From simple scrapers to enterprise-grade monitoring with predictive analytics.