← Monday's Prompts

Automate Pricing Optimization 🚀

Turn Monday's 3 prompts into a real-time pricing engine

September 9, 2025
📊 Strategy🐍 Python + TypeScript⚡ 10 → 10,000 SKUs

The Problem

On Monday you tested the 3 prompts in ChatGPT. You saw how competitor analysis → demand signals → price recommendations works. But here's reality: by the time you update your spreadsheet, competitors have changed prices twice. You're leaving 15-30% revenue on the table because you can't react fast enough.

4+ hours
Daily manual price checks across competitors
2-3 days
Lag time from insight to price change
15-30%
Revenue lost to static pricing

See It Work

Watch the 3 prompts chain together automatically. This is what you'll build - real-time competitor tracking, demand analysis, and price optimization.

The Code

Three levels: start simple with manual triggers, add real-time monitoring, then scale to fully automated dynamic pricing. Pick where you are.

Level 1: Simple Competitor Check

Good for: 10-50 SKUs | Setup time: 30 minutes

# Simple Competitor Check (10-50 SKUs)
import anthropic
import json
import requests
from datetime import datetime

def analyze_pricing(product_data: dict, competitor_prices: list) -> dict:
    """Chain the 3 prompts: analyze → validate → recommend"""
    
    client = anthropic.Anthropic(
        api_key="your_api_key_here"  # Use env var in production
    )
    
    # Step 1: Competitor & Market Analysis
    analysis_prompt = f"""Analyze this pricing data and extract key insights as JSON.

Product: {product_data['name']} at ${product_data['price']}
Competitors: {json.dumps(competitor_prices)}
Metrics: {json.dumps(product_data['metrics'])}

Output JSON with:
- competitive_position (vs_average, vs_lowest, vs_highest)
- market_trends (price_changes, timing, patterns)
- positioning_analysis"""

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=2048,
        messages=[{"role": "user", "content": analysis_prompt}]
    )
    
    analysis = json.loads(response.content[0].text)
    
    # Step 2: Demand Signal Validation
    validation_prompt = f"""Analyze demand signals and validate pricing opportunity.

Current analysis:
{json.dumps(analysis)}

Product metrics:
- Conversion rate: {product_data['metrics']['conversion_rate']}
- Traffic growth: {product_data['metrics']['traffic_growth']}
- Retention: {product_data['metrics']['retention_rate']}

Output JSON with:
- demand_indicators (array with signal, strength, recommendation)
- price_elasticity (estimated, confidence)
- optimal_price_range (min, max, confidence)"""

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=2048,
        messages=[{"role": "user", "content": validation_prompt}]
    )
    
    validation = json.loads(response.content[0].text)
    
    # Step 3: Price Recommendations
    if validation['optimal_price_range']['confidence'] > 0.7:
        recommendation_prompt = f"""Generate specific pricing recommendations with A/B test plan.

Analysis:
{json.dumps(analysis)}

Demand signals:
{json.dumps(validation)}

Current price: ${product_data['price']}

Output JSON with:
- recommendations (array: strategy, new_price, rationale, expected_impact, risk_level)
- ab_test_plan (variants, duration, success_metrics)
- monitoring_alerts (what to watch, thresholds, actions)"""

        response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=2048,
            messages=[{"role": "user", "content": recommendation_prompt}]
        )
        
        recommendations = json.loads(response.content[0].text)
    else:
        recommendations = {
            "status": "insufficient_confidence",
            "message": "Need more data before making pricing changes"
        }
    
    return {
        "analysis": analysis,
        "validation": validation,
        "recommendations": recommendations,
        "timestamp": datetime.now().isoformat()
    }

# Usage
product = {
    "name": "Premium Plan",
    "price": 99,
    "metrics": {
        "conversion_rate": 0.12,
        "traffic_growth": 0.23,
        "retention_rate": 0.85
    }
}

competitors = [
    {"name": "CompanyA", "price": 89, "change": -10},
    {"name": "CompanyB", "price": 119, "change": 0},
    {"name": "CompanyC", "price": 95, "change": 0}
]

result = analyze_pricing(product, competitors)
print(f"Recommendations: {len(result['recommendations'].get('recommendations', []))}")
print(f"Confidence: {result['validation']['optimal_price_range']['confidence']}")

Level 2: Real-Time Monitoring with Webhooks

Good for: 50-500 SKUs | Setup time: 2 hours

// Real-Time Monitoring (50-500 SKUs)
import Anthropic from '@anthropic-ai/sdk';
import Stripe from 'stripe';
import axios from 'axios';

interface PricingResult {
  analysis: any;
  validation: any;
  recommendations: any;
  autoApply: boolean;
}

class DynamicPricingEngine {
  private anthropic: Anthropic;
  private stripe: Stripe;
  private priceCache: Map<string, any>;

  constructor() {
    this.anthropic = new Anthropic({
      apiKey: process.env.ANTHROPIC_API_KEY!,
    });
    this.stripe = new Stripe(process.env.STRIPE_SECRET_KEY!, {
      apiVersion: '2023-10-16',
    });
    this.priceCache = new Map();
  }

  async monitorCompetitors(productId: string): Promise<any[]> {
    // Scrape competitor prices (use real scraping service in production)
    const competitors = [
      { url: 'https://competitor-a.com/pricing', name: 'CompanyA' },
      { url: 'https://competitor-b.com/pricing', name: 'CompanyB' },
      { url: 'https://competitor-c.com/pricing', name: 'CompanyC' },
    ];

    const prices = await Promise.all(
      competitors.map(async (comp) => {
        try {
          // Use price intelligence API in production
          const response = await axios.get(
            `https://api.priceapi.com/v1/check?url=${encodeURIComponent(comp.url)}`,
            {
              headers: { Authorization: `Bearer ${process.env.PRICE_API_KEY}` },
              timeout: 10000,
            }
          );

          const cached = this.priceCache.get(comp.name);
          const currentPrice = response.data.price;
          const change = cached ? currentPrice - cached.price : 0;

          this.priceCache.set(comp.name, {
            price: currentPrice,
            timestamp: new Date(),
          });

          return {
            name: comp.name,
            price: currentPrice,
            change: change,
            change_percent: cached ? (change / cached.price) * 100 : 0,
          };
        } catch (error) {
          console.error(`Failed to fetch ${comp.name}:`, error);
          return null;
        }
      })
    );

    return prices.filter((p) => p !== null);
  }

  async analyzeWithRetries(
    productData: any,
    competitorPrices: any[],
    maxRetries: number = 3
  ): Promise<PricingResult> {
    let lastError: Error | null = null;

    for (let attempt = 0; attempt < maxRetries; attempt++) {
      try {
        // Step 1: Analysis
        const analysisResponse = await Promise.race([
          this.anthropic.messages.create({
            model: 'claude-3-5-sonnet-20241022',
            max_tokens: 2048,
            messages: [
              {
                role: 'user',
                content: `Analyze pricing data as JSON: ${JSON.stringify({ product: productData, competitors: competitorPrices })}`,
              },
            ],
          }),
          this.timeout(30000),
        ]);

        const analysis = JSON.parse(
          analysisResponse.content[0].type === 'text'
            ? analysisResponse.content[0].text
            : '{}'
        );

        // Step 2: Validation
        const validationResponse = await this.anthropic.messages.create({
          model: 'claude-3-5-sonnet-20241022',
          max_tokens: 2048,
          messages: [
            {
              role: 'user',
              content: `Validate demand signals: ${JSON.stringify(analysis)}`,
            },
          ],
        });

        const validation = JSON.parse(
          validationResponse.content[0].type === 'text'
            ? validationResponse.content[0].text
            : '{}'
        );

        // Step 3: Recommendations
        const recommendationResponse = await this.anthropic.messages.create({
          model: 'claude-3-5-sonnet-20241022',
          max_tokens: 2048,
          messages: [
            {
              role: 'user',
              content: `Generate pricing recommendations: ${JSON.stringify({ analysis, validation })}`,
            },
          ],
        });

        const recommendations = JSON.parse(
          recommendationResponse.content[0].type === 'text'
            ? recommendationResponse.content[0].text
            : '{}'
        );

        // Auto-apply if low risk and high confidence
        const autoApply =
          validation.optimal_price_range?.confidence > 0.85 &&
          recommendations.recommendations?.[0]?.risk_level === 'low';

        return { analysis, validation, recommendations, autoApply };
      } catch (error) {
        lastError = error as Error;
        if (attempt < maxRetries - 1) {
          await this.delay(Math.pow(2, attempt) * 1000);
        }
      }
    }

    throw lastError;
  }

  async applyPriceChange(
    productId: string,
    newPrice: number,
    reason: string
  ): Promise<void> {
    // Update Stripe price
    const product = await this.stripe.products.retrieve(productId);
    const newStripePrice = await this.stripe.prices.create({
      product: productId,
      unit_amount: Math.round(newPrice * 100),
      currency: 'usd',
      recurring: { interval: 'month' },
    });

    // Log change for audit
    console.log({
      timestamp: new Date().toISOString(),
      product_id: productId,
      old_price: product.default_price,
      new_price: newStripePrice.id,
      reason: reason,
      auto_applied: true,
    });

    // Send alert to Slack/email
    await this.sendAlert(
      `Price updated for ${product.name}: $${newPrice} (${reason})`
    );
  }

  private async sendAlert(message: string): Promise<void> {
    // Implement Slack/email notification
    await axios.post(
      process.env.SLACK_WEBHOOK_URL!,
      { text: message },
      { timeout: 5000 }
    );
  }

  private timeout(ms: number): Promise<never> {
    return new Promise((_, reject) =>
      setTimeout(() => reject(new Error('Timeout')), ms)
    );
  }

  private delay(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }
}

// Usage
const engine = new DynamicPricingEngine();

// Run every hour
setInterval(async () => {
  const competitors = await engine.monitorCompetitors('prod_premium');
  const result = await engine.analyzeWithRetries(
    { name: 'Premium Plan', price: 99, metrics: {...} },
    competitors
  );

  if (result.autoApply && result.recommendations.recommendations?.[0]) {
    await engine.applyPriceChange(
      'prod_premium',
      result.recommendations.recommendations[0].new_price,
      result.recommendations.recommendations[0].rationale
    );
  }
}, 3600000);

Level 3: Production with Multi-Agent System

Good for: 500+ SKUs | Setup time: 1 week

# Production Multi-Agent System (500+ SKUs)
from langgraph.graph import Graph, END
from typing import TypedDict, List
import anthropic
import stripe
import redis
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class PricingState(TypedDict):
    product_id: str
    current_price: float
    competitor_data: List[dict]
    analytics_data: dict
    analysis: dict
    validation: dict
    recommendations: List[dict]
    selected_strategy: dict
    approval_status: str
    applied: bool

class PricingAgent:
    def __init__(self):
        self.anthropic = anthropic.Anthropic()
        self.stripe = stripe.Stripe(api_key="sk_live_...")
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        
    def competitor_monitor_node(self, state: PricingState) -> PricingState:
        """Agent 1: Monitor competitor prices in real-time"""
        # Fetch from multiple sources
        sources = ['priceapi', 'scraperapi', 'manual_updates']
        competitor_data = []
        
        for source in sources:
            cached = self.redis.get(f"competitors:{state['product_id']}:{source}")
            if cached:
                competitor_data.extend(json.loads(cached))
            else:
                # Fetch fresh data
                data = self._fetch_competitor_data(source, state['product_id'])
                self.redis.setex(
                    f"competitors:{state['product_id']}:{source}",
                    3600,  # Cache 1 hour
                    json.dumps(data)
                )
                competitor_data.extend(data)
        
        state['competitor_data'] = competitor_data
        return state
    
    def analytics_node(self, state: PricingState) -> PricingState:
        """Agent 2: Aggregate analytics from multiple sources"""
        # Pull from Google Analytics, Stripe, internal DB
        analytics = {
            'conversion_rate': self._get_conversion_rate(state['product_id']),
            'traffic_data': self._get_traffic_data(state['product_id']),
            'retention_rate': self._get_retention_rate(state['product_id']),
            'revenue_trend': self._get_revenue_trend(state['product_id']),
            'customer_feedback': self._get_sentiment_score(state['product_id'])
        }
        
        state['analytics_data'] = analytics
        return state
    
    def analysis_node(self, state: PricingState) -> PricingState:
        """Agent 3: Deep competitive & market analysis"""
        prompt = f"""Perform comprehensive pricing analysis.

Product: {state['product_id']} at ${state['current_price']}
Competitors: {json.dumps(state['competitor_data'])}
Analytics: {json.dumps(state['analytics_data'])}

Analyze:
1. Competitive positioning (market share, price gaps)
2. Market trends (direction, velocity, seasonality)
3. Customer value perception
4. Risk factors

Output detailed JSON analysis."""
        
        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4096,
            messages=[{"role": "user", "content": prompt}]
        )
        
        state['analysis'] = json.loads(response.content[0].text)
        return state
    
    def validation_node(self, state: PricingState) -> PricingState:
        """Agent 4: Validate pricing opportunity with demand signals"""
        prompt = f"""Validate pricing opportunity using demand signals.

Analysis: {json.dumps(state['analysis'])}
Current metrics: {json.dumps(state['analytics_data'])}

Calculate:
1. Price elasticity (estimated coefficient)
2. Demand strength indicators
3. Optimal price range with confidence intervals
4. Risk assessment

Output JSON with validation results."""
        
        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4096,
            messages=[{"role": "user", "content": prompt}]
        )
        
        state['validation'] = json.loads(response.content[0].text)
        return state
    
    def recommendation_node(self, state: PricingState) -> PricingState:
        """Agent 5: Generate multiple pricing strategies"""
        prompt = f"""Generate 3-5 pricing strategies with detailed impact analysis.

Analysis: {json.dumps(state['analysis'])}
Validation: {json.dumps(state['validation'])}
Current price: ${state['current_price']}

For each strategy:
1. New price point and rationale
2. Expected revenue impact (best/worst/likely)
3. Volume impact prediction
4. Risk level and mitigation
5. A/B test design
6. Rollback criteria

Output JSON array of strategies ranked by expected value."""
        
        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4096,
            messages=[{"role": "user", "content": prompt}]
        )
        
        state['recommendations'] = json.loads(response.content[0].text)
        return state
    
    def strategy_selection_node(self, state: PricingState) -> PricingState:
        """Agent 6: Select optimal strategy based on business rules"""
        # Apply business logic
        recommendations = state['recommendations']
        confidence = state['validation'].get('optimal_price_range', {}).get('confidence', 0)
        
        # Auto-select if high confidence + low risk
        if confidence > 0.85:
            low_risk = [r for r in recommendations if r.get('risk_level') == 'low']
            if low_risk:
                state['selected_strategy'] = low_risk[0]
                state['approval_status'] = 'auto_approved'
                return state
        
        # Flag for human review if:
        # - Large price change (>10%)
        # - High risk
        # - Low confidence
        selected = recommendations[0] if recommendations else None
        if selected:
            price_change = abs(selected['new_price'] - state['current_price']) / state['current_price']
            if price_change > 0.10 or selected.get('risk_level') == 'high' or confidence < 0.7:
                state['approval_status'] = 'human_review_required'
            else:
                state['approval_status'] = 'auto_approved'
            
            state['selected_strategy'] = selected
        
        return state
    
    def approval_router(self, state: PricingState) -> str:
        """Route based on approval status"""
        if state['approval_status'] == 'auto_approved':
            return 'apply_price'
        else:
            return 'human_review'
    
    def apply_price_node(self, state: PricingState) -> PricingState:
        """Agent 7: Apply price change to Stripe"""
        strategy = state['selected_strategy']
        new_price = strategy['new_price']
        
        # Create new Stripe price
        stripe_price = self.stripe.prices.create(
            product=state['product_id'],
            unit_amount=int(new_price * 100),
            currency='usd',
            recurring={'interval': 'month'}
        )
        
        # Log change
        self._log_price_change({
            'product_id': state['product_id'],
            'old_price': state['current_price'],
            'new_price': new_price,
            'strategy': strategy['strategy'],
            'rationale': strategy['rationale'],
            'timestamp': datetime.now().isoformat(),
            'auto_applied': True
        })
        
        # Send notifications
        self._notify_stakeholders(
            f"Price updated: {state['product_id']} → ${new_price}"
        )
        
        state['applied'] = True
        return state
    
    def human_review_node(self, state: PricingState) -> PricingState:
        """Agent 8: Flag for human review"""
        # Send to approval queue
        review_data = {
            'product_id': state['product_id'],
            'current_price': state['current_price'],
            'recommended_strategy': state['selected_strategy'],
            'analysis': state['analysis'],
            'validation': state['validation'],
            'timestamp': datetime.now().isoformat()
        }
        
        self.redis.lpush('pricing_review_queue', json.dumps(review_data))
        
        # Notify pricing team
        self._notify_stakeholders(
            f"Pricing review needed: {state['product_id']}",
            channel='#pricing-team'
        )
        
        state['applied'] = False
        return state

def build_pricing_graph():
    """Build multi-agent pricing optimization graph"""
    agent = PricingAgent()
    graph = Graph()
    
    # Add all agent nodes
    graph.add_node("monitor_competitors", agent.competitor_monitor_node)
    graph.add_node("gather_analytics", agent.analytics_node)
    graph.add_node("analyze_market", agent.analysis_node)
    graph.add_node("validate_opportunity", agent.validation_node)
    graph.add_node("generate_recommendations", agent.recommendation_node)
    graph.add_node("select_strategy", agent.strategy_selection_node)
    graph.add_node("apply_price", agent.apply_price_node)
    graph.add_node("human_review", agent.human_review_node)
    
    # Build workflow
    graph.set_entry_point("monitor_competitors")
    graph.add_edge("monitor_competitors", "gather_analytics")
    graph.add_edge("gather_analytics", "analyze_market")
    graph.add_edge("analyze_market", "validate_opportunity")
    graph.add_edge("validate_opportunity", "generate_recommendations")
    graph.add_edge("generate_recommendations", "select_strategy")
    
    # Conditional routing based on approval
    graph.add_conditional_edges(
        "select_strategy",
        agent.approval_router,
        {
            "apply_price": "apply_price",
            "human_review": "human_review"
        }
    )
    
    graph.add_edge("apply_price", END)
    graph.add_edge("human_review", END)
    
    return graph.compile()

# Usage
pricing_graph = build_pricing_graph()

# Run for each product
for product_id in product_catalog:
    initial_state = {
        "product_id": product_id,
        "current_price": get_current_price(product_id),
        "competitor_data": [],
        "analytics_data": {},
        "analysis": {},
        "validation": {},
        "recommendations": [],
        "selected_strategy": {},
        "approval_status": "",
        "applied": False
    }
    
    result = pricing_graph.invoke(initial_state)
    print(f"{product_id}: Applied={result['applied']}, Status={result['approval_status']}")

When to Level Up

1

Start: Manual Triggers

10-50 SKUs

  • Run pricing analysis on-demand via script
  • Manual review of all recommendations
  • Basic competitor price tracking (daily checks)
  • Simple logging to console/file
2

Scale: Real-Time Monitoring

50-500 SKUs

  • Automated competitor monitoring (hourly checks)
  • Webhook-triggered analysis on price changes
  • Auto-apply for low-risk changes (<5% adjustment)
  • Slack/email alerts for significant moves
  • Redis caching for performance
  • Retry logic with exponential backoff
3

Production: Multi-Agent System

500-2,000 SKUs

  • Specialized agents (competitor monitoring, analytics, validation, recommendation)
  • Parallel processing of multiple products
  • Human-in-the-loop for high-risk changes (>10% or high-risk flag)
  • A/B testing framework with statistical significance
  • Advanced analytics (price elasticity, demand forecasting)
  • Audit trail with rollback capability
4

Enterprise: ML-Powered Optimization

2,000+ SKUs

  • Machine learning models for price elasticity prediction
  • Real-time demand forecasting using time-series analysis
  • Multi-objective optimization (revenue, margin, market share)
  • Personalized pricing by customer segment
  • Geographic pricing optimization
  • Integration with inventory/supply chain systems
  • Advanced experimentation platform (multi-armed bandits)
  • Distributed processing across regions

Strategy-Specific Gotchas

The code examples work. But pricing optimization has unique challenges you need to handle - from price wars to customer psychology.

Price War Detection & Circuit Breakers

If competitors start aggressively undercutting, you need automatic circuit breakers to prevent a race to the bottom. Set minimum price floors and max change velocity.

def check_price_war(competitor_changes: List[dict], window_hours: int = 24) -> bool:
    """Detect if we're in a price war"""
    recent = [c for c in competitor_changes 
              if (datetime.now() - c['timestamp']).hours < window_hours]
    
    # Price war signals:
    # 1. Multiple competitors dropping prices
    drops = [c for c in recent if c['change_percent'] < -5]
    if len(drops) >= 2:
        return True
    
    # 2. Large single drop (>15%)
    if any(c['change_percent'] < -15 for c in recent):
        return True
    
    # 3. Rapid succession of changes
    if len(recent) > 5:
        return True
    
    return False

def apply_circuit_breaker(new_price: float, current_price: float, 
                          min_price: float, max_change_per_day: float) -> float:
    """Limit price changes to prevent overreaction"""
    # Don't go below minimum viable price
    if new_price < min_price:
        print(f"Circuit breaker: Price ${new_price} below minimum ${min_price}")
        return current_price  # Don't change
    
    # Limit velocity of change
    max_change = current_price * max_change_per_day
    if abs(new_price - current_price) > max_change:
        if new_price < current_price:
            return current_price - max_change
        else:
            return current_price + max_change
    
    return new_price

# Usage in recommendation node
if check_price_war(competitor_changes):
    # Enter defensive mode
    print("Price war detected - applying conservative strategy")
    new_price = apply_circuit_breaker(
        new_price=recommended_price,
        current_price=current_price,
        min_price=cost_basis * 1.15,  # Minimum 15% margin
        max_change_per_day=0.05  # Max 5% change per day
    )

Customer Anchor Price Psychology

Customers have mental anchor prices. Crossing certain thresholds ($99 → $101) feels bigger than the actual difference. Use psychological pricing rules.

function applyPsychologicalPricing(rawPrice: number): number {
  // Round to psychological price points
  // Prefer: $X9, $X7, $X5 over round numbers
  
  if (rawPrice < 10) {
    // For low prices, use .99
    return Math.floor(rawPrice) + 0.99;
  }
  
  if (rawPrice < 100) {
    // Use X9 or X7 endings
    const base = Math.floor(rawPrice / 10) * 10;
    const options = [base + 7, base + 9];
    return options.reduce((prev, curr) => 
      Math.abs(curr - rawPrice) < Math.abs(prev - rawPrice) ? curr : prev
    );
  }
  
  // For $100+, avoid crossing century marks unnecessarily
  if (rawPrice >= 95 && rawPrice < 100) {
    return 99;  // Stay below $100 threshold
  }
  
  if (rawPrice >= 100 && rawPrice < 105) {
    return 99;  // Don't cross $100 for small gains
  }
  
  // For higher prices, round to nearest $5
  return Math.round(rawPrice / 5) * 5 - 1;
}

// Apply to recommendations
const psychologicalPrice = applyPsychologicalPricing(107.3);
console.log(psychologicalPrice);  // 107 (not 107.30)

// Test threshold sensitivity
function testPriceThreshold(basePrice: number, testPrice: number): boolean {
  const thresholds = [50, 100, 200, 500, 1000];
  
  for (const threshold of thresholds) {
    // Don't cross major thresholds for <5% gain
    if (basePrice < threshold && testPrice >= threshold) {
      const gain = (testPrice - basePrice) / basePrice;
      if (gain < 0.05) {
        console.log(`Threshold warning: Crossing $${threshold} for only ${(gain * 100).toFixed(1)}% gain`);
        return false;
      }
    }
  }
  
  return true;
}

A/B Test Statistical Significance

Don't make pricing decisions on small sample sizes. You need proper statistical significance before declaring a winner. Use sequential testing to stop early if results are clear.

from scipy import stats
import numpy as np

def calculate_sample_size(baseline_rate: float, 
                         minimum_detectable_effect: float,
                         alpha: float = 0.05,
                         power: float = 0.8) -> int:
    """Calculate required sample size for A/B test"""
    # Use standard formula for proportion tests
    z_alpha = stats.norm.ppf(1 - alpha/2)
    z_beta = stats.norm.ppf(power)
    
    p1 = baseline_rate
    p2 = baseline_rate * (1 + minimum_detectable_effect)
    p_avg = (p1 + p2) / 2
    
    n = (z_alpha * np.sqrt(2 * p_avg * (1 - p_avg)) + 
         z_beta * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2
    n = n / (p1 - p2) ** 2
    
    return int(np.ceil(n))

def check_significance(control_conversions: int, control_total: int,
                      variant_conversions: int, variant_total: int,
                      alpha: float = 0.05) -> dict:
    """Check if A/B test results are statistically significant"""
    # Two-proportion z-test
    p1 = control_conversions / control_total
    p2 = variant_conversions / variant_total
    
    p_pooled = (control_conversions + variant_conversions) / (control_total + variant_total)
    
    se = np.sqrt(p_pooled * (1 - p_pooled) * (1/control_total + 1/variant_total))
    z_score = (p2 - p1) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
    
    # Calculate confidence interval
    se_diff = np.sqrt(p1*(1-p1)/control_total + p2*(1-p2)/variant_total)
    ci_lower = (p2 - p1) - 1.96 * se_diff
    ci_upper = (p2 - p1) + 1.96 * se_diff
    
    return {
        'control_rate': p1,
        'variant_rate': p2,
        'lift': (p2 - p1) / p1,
        'p_value': p_value,
        'is_significant': p_value < alpha,
        'confidence_interval': (ci_lower, ci_upper),
        'recommendation': 'variant_wins' if p_value < alpha and p2 > p1 else 'keep_testing'
    }

# Usage in pricing A/B test
result = check_significance(
    control_conversions=120,
    control_total=1000,
    variant_conversions=145,
    variant_total=1000
)

if result['is_significant']:
    print(f"Winner found! Lift: {result['lift']:.1%}, p-value: {result['p_value']:.4f}")
else:
    print(f"Keep testing. Current p-value: {result['p_value']:.4f}")

Revenue vs. Profit Optimization

Optimizing for revenue isn't the same as optimizing for profit. You need to factor in costs, customer lifetime value, and acquisition costs. A lower price might bring more customers but worse unit economics.

def calculate_profit_impact(current_price: float, new_price: float,
                            current_volume: int, 
                            price_elasticity: float,
                            variable_cost: float,
                            acquisition_cost: float,
                            avg_lifetime_months: int) -> dict:
    """Calculate true profit impact, not just revenue"""
    
    # Estimate new volume using price elasticity
    price_change = (new_price - current_price) / current_price
    volume_change = price_elasticity * price_change
    new_volume = int(current_volume * (1 + volume_change))
    
    # Calculate current profit
    current_revenue = current_price * current_volume
    current_costs = (variable_cost + acquisition_cost) * current_volume
    current_profit = current_revenue - current_costs
    
    # Calculate new profit
    new_revenue = new_price * new_volume
    new_costs = (variable_cost + acquisition_cost) * new_volume
    new_profit = new_revenue - new_costs
    
    # Factor in lifetime value
    current_ltv = current_price * avg_lifetime_months * current_volume
    new_ltv = new_price * avg_lifetime_months * new_volume
    
    return {
        'current': {
            'revenue': current_revenue,
            'profit': current_profit,
            'margin': current_profit / current_revenue,
            'ltv': current_ltv
        },
        'new': {
            'revenue': new_revenue,
            'profit': new_profit,
            'margin': new_profit / new_revenue,
            'ltv': new_ltv
        },
        'impact': {
            'revenue_change': new_revenue - current_revenue,
            'profit_change': new_profit - current_profit,
            'ltv_change': new_ltv - current_ltv,
            'volume_change': new_volume - current_volume
        },
        'recommendation': 'increase_price' if new_profit > current_profit else 'keep_current'
    }

# Usage
impact = calculate_profit_impact(
    current_price=99,
    new_price=107,
    current_volume=450,
    price_elasticity=-1.3,  # 10% price increase = 13% volume decrease
    variable_cost=20,  # Cost per customer
    acquisition_cost=150,  # CAC
    avg_lifetime_months=18
)

print(f"Revenue change: ${impact['impact']['revenue_change']:,.0f}")
print(f"Profit change: ${impact['impact']['profit_change']:,.0f}")
print(f"LTV change: ${impact['impact']['ltv_change']:,.0f}")
print(f"Recommendation: {impact['recommendation']}")

Grandfathering & Migration Strategy

You can't just change prices for existing customers without a plan. Need grandfathering logic, migration timelines, and communication strategy. Some customers get legacy pricing, new customers get new pricing.

interface Customer {
  id: string;
  currentPrice: number;
  signupDate: Date;
  plan: string;
  isGrandfathered: boolean;
}

function determinePricingStrategy(
  customer: Customer,
  newPrice: number,
  grandfatherCutoffDate: Date,
  migrationMonths: number
): {
  appliesTo: 'immediate' | 'grandfathered' | 'gradual_migration';
  finalPrice: number;
  effectiveDate: Date;
  communicationNeeded: boolean;
} {
  // Existing customers before cutoff get grandfathered
  if (customer.signupDate < grandfatherCutoffDate) {
    return {
      appliesTo: 'grandfathered',
      finalPrice: customer.currentPrice,
      effectiveDate: new Date('9999-12-31'),  // Indefinite
      communicationNeeded: true,  // Tell them they're protected
    };
  }

  // Recent customers get gradual migration
  const monthsSinceSignup = 
    (Date.now() - customer.signupDate.getTime()) / (1000 * 60 * 60 * 24 * 30);
  
  if (monthsSinceSignup < migrationMonths) {
    // Gradual price increase over remaining months
    const monthsRemaining = migrationMonths - monthsSinceSignup;
    const monthlyIncrease = (newPrice - customer.currentPrice) / monthsRemaining;
    
    return {
      appliesTo: 'gradual_migration',
      finalPrice: newPrice,
      effectiveDate: new Date(
        Date.now() + monthsRemaining * 30 * 24 * 60 * 60 * 1000
      ),
      communicationNeeded: true,
    };
  }

  // New signups get new price immediately
  return {
    appliesTo: 'immediate',
    finalPrice: newPrice,
    effectiveDate: new Date(),
    communicationNeeded: false,
  };
}

// Apply to customer base
async function migrateCustomerPricing(
  customers: Customer[],
  newPrice: number
): Promise<void> {
  const grandfatherCutoff = new Date('2025-01-01');
  const migrationMonths = 6;

  for (const customer of customers) {
    const strategy = determinePricingStrategy(
      customer,
      newPrice,
      grandfatherCutoff,
      migrationMonths
    );

    if (strategy.communicationNeeded) {
      await sendPricingNotification(customer, strategy);
    }

    if (strategy.appliesTo === 'immediate') {
      await updateStripePrice(customer.id, strategy.finalPrice);
    } else if (strategy.appliesTo === 'gradual_migration') {
      await scheduleGradualMigration(customer, strategy);
    }

    console.log(
      `${customer.id}: ${strategy.appliesTo} - $${customer.currentPrice} → $${strategy.finalPrice}`
    );
  }
}

Cost Calculator

Manual Pricing Analysis

Daily competitor price checks (2 hours × $75/hr × 5 days)
$750
Weekly pricing analysis meeting (3 people × 2 hours × $100/hr)
$600
Monthly price optimization project (analyst × 20 hours × $100/hr)
$2,000
Spreadsheet maintenance and data entry (5 hours × $50/hr)
$250
Total:$3,600
per month

Limitations:

  • 2-3 day lag between insight and action
  • Can only monitor 10-20 competitors manually
  • No real-time adjustment capability
  • High error rate in manual data entry (15-20%)
  • Limited to analyzing 10-50 SKUs effectively

Automated Dynamic Pricing

Claude API costs (500 analyses/day × $0.015 per analysis)
$225
Price intelligence API (competitor monitoring)
$199
Infrastructure (Redis, monitoring, hosting)
$150
Human oversight (5 hours/week × $100/hr)
$500
Total:$1,074
per month

Benefits:

  • Real-time price adjustments (within 1 hour of competitor move)
  • Monitor 100+ competitors automatically
  • Process 500+ SKUs simultaneously
  • 99.9% accuracy in data extraction
  • 15-30% revenue increase from optimized pricing
  • A/B testing runs automatically with statistical significance
$84/day saved
70% cost reduction | $2,526/month | $30,312/year
💡 Pays for itself in first month through better pricing decisions
📊

Want This Running in Your Pricing System?

We build custom pricing optimization engines that integrate with your Stripe, analytics, and business rules. From competitor monitoring to automated A/B testing - production-ready in 2-4 weeks.