← Monday's Prompts

Automate Sales Intelligence 🚀

Turn Monday's 3 prompts into a real-time research engine

October 14, 2025
💼 Sales Enablement🐍 Python + TypeScript⚡ 10 → 1000 accounts/day

The Problem

On Monday you tested the 3 prompts in ChatGPT. You saw how enrichment → signal detection → talking points works. Cool! But here's reality: your reps can't research 50 accounts per day by hand. They're copy-pasting company names into LinkedIn, scanning news sites, and still showing up to calls unprepared. By the time they finish researching, the intel is already stale.

6+ hours
Per week per rep on manual research
48 hours
Average intel staleness (2 days old)
Can't scale
Beyond 10-15 accounts/day per rep

See It Work

Watch the 3 prompts chain together automatically. This is what you'll build.

The Code

Three levels: start simple, add reliability, then scale to production. Pick where you are.

Level 1: Simple API Calls

Good for: 0-100 accounts/day | Setup time: 30 minutes

# Simple API Calls (0-100 accounts/day)
import openai
import requests
import json
from datetime import datetime

# API keys (use environment variables in production)
OPENAI_API_KEY = "sk-..."
ZOOMINFO_API_KEY = "your_zoominfo_key"
NEWS_API_KEY = "your_newsapi_key"

def automate_sales_intel(company_name: str, contact_name: str = None) -> dict:
    """Chain the 3 prompts: enrich → detect signals → generate talking points"""
    
    # Step 1: Enrich account data from multiple sources
    enrichment_data = enrich_account(company_name)
    
    # Step 2: Detect buying signals
    signals = detect_buying_signals(enrichment_data)
    
    # Step 3: Generate personalized talking points
    talking_points = generate_talking_points(enrichment_data, signals, contact_name)
    
    return {
        "enriched_data": enrichment_data,
        "buying_signals": signals,
        "talking_points": talking_points,
        "overall_score": signals.get("overall_score", 0),
        "timestamp": datetime.now().isoformat()
    }

def enrich_account(company_name: str) -> dict:
    """Enrich account with ZoomInfo, Clearbit, and news data"""
    
    # Get company data from ZoomInfo
    zoominfo_url = "https://api.zoominfo.com/lookup/company"
    zoominfo_response = requests.get(
        zoominfo_url,
        headers={"Authorization": f"Bearer {ZOOMINFO_API_KEY}"},
        params={"companyName": company_name}
    )
    company_data = zoominfo_response.json() if zoominfo_response.status_code == 200 else {}
    
    # Get recent news
    news_url = "https://newsapi.org/v2/everything"
    news_response = requests.get(
        news_url,
        params={
            "q": company_name,
            "apiKey": NEWS_API_KEY,
            "sortBy": "publishedAt",
            "pageSize": 5
        }
    )
    recent_news = news_response.json().get("articles", []) if news_response.status_code == 200 else []
    
    # Use LLM to structure and extract key info
    enrichment_prompt = f"""Extract and structure company intelligence from these data sources.

Company: {company_name}

ZoomInfo Data:
{json.dumps(company_data, indent=2)}

Recent News:
{json.dumps([{"title": a["title"], "description": a["description"], "date": a["publishedAt"]} for a in recent_news], indent=2)}

Output as JSON with these fields:
- company_name
- industry
- employee_count
- headquarters
- tech_stack (array)
- key_contacts (array with name, title, linkedin, tenure)
- recent_funding (round, amount, date, lead_investor)
- expansion_plans (array)
- competitors (array)
- recent_news_summary (string)"""

    client = openai.OpenAI(api_key=OPENAI_API_KEY)
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": enrichment_prompt}],
        temperature=0.3
    )
    
    return json.loads(response.choices[0].message.content)

def detect_buying_signals(enriched_data: dict) -> dict:
    """Analyze enriched data for buying signals"""
    
    signal_prompt = f"""Analyze this company data and identify buying signals.

Company Data:
{json.dumps(enriched_data, indent=2)}

Identify signals in these categories:
- funding_event (recent funding rounds)
- hiring_surge (rapid hiring = growth = budget)
- tech_stack_gap (missing tools we can fill)
- executive_change (new leaders = new tool evaluations)
- market_expansion (entering new markets = new needs)
- competitor_switch (mentions of switching from competitors)

For each signal, provide:
- type (category above)
- strength (high/medium/low)
- description (what you found)
- relevance (why this matters for our sale)
- source (where this came from)
- date (when this happened)

Also provide:
- overall_score (0-100, how hot is this lead?)
- recommendation (what should sales do next?)

Output as JSON."""

    client = openai.OpenAI(api_key=OPENAI_API_KEY)
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": signal_prompt}],
        temperature=0.3
    )
    
    return json.loads(response.choices[0].message.content)

def generate_talking_points(enriched_data: dict, signals: dict, contact_name: str = None) -> dict:
    """Generate personalized talking points and outreach content"""
    
    talking_points_prompt = f"""Generate personalized sales talking points based on this intelligence.

Company Data:
{json.dumps(enriched_data, indent=2)}

Buying Signals:
{json.dumps(signals, indent=2)}

Contact: {contact_name or 'Unknown'}

Generate:
1. talking_points (array of 4-5 points, each with category, point, and transition)
2. email_subject (compelling subject line)
3. email_opening (first 2-3 sentences of cold email)
4. meeting_agenda (array of 4 discussion topics)

Make it conversational, specific to their situation, and reference actual signals.
Avoid generic sales speak. Be human.

Output as JSON."""

    client = openai.OpenAI(api_key=OPENAI_API_KEY)
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": talking_points_prompt}],
        temperature=0.7
    )
    
    return json.loads(response.choices[0].message.content)

# Usage
result = automate_sales_intel(
    company_name="Acme Corp",
    contact_name="Sarah Chen"
)

print(f"Overall Score: {result['overall_score']}/100")
print(f"Signals Found: {len(result['buying_signals']['signals'])}")
print(f"Talking Points: {len(result['talking_points']['talking_points'])}")
print(f"\nEmail Subject: {result['talking_points']['email_subject']}")

Level 2: With Error Handling & Salesforce Integration

Good for: 100-1,000 accounts/day | Setup time: 2 hours

// With Error Handling & Salesforce Integration (100-1000 accounts/day)
import Anthropic from '@anthropic-ai/sdk';
import axios from 'axios';
import jsforce from 'jsforce';

interface SalesIntelResult {
  enriched_data: any;
  buying_signals: any;
  talking_points: any;
  overall_score: number;
  timestamp: string;
  salesforce_updated: boolean;
}

class SalesIntelAutomation {
  private anthropic: Anthropic;
  private sfConnection: jsforce.Connection;

  constructor() {
    this.anthropic = new Anthropic({
      apiKey: process.env.ANTHROPIC_API_KEY!,
    });

    this.sfConnection = new jsforce.Connection({
      loginUrl: process.env.SALESFORCE_LOGIN_URL!,
    });
  }

  async authenticate() {
    await this.sfConnection.login(
      process.env.SALESFORCE_USERNAME!,
      process.env.SALESFORCE_PASSWORD! + process.env.SALESFORCE_TOKEN!
    );
  }

  async processAccount(
    accountId: string,
    maxRetries: number = 3
  ): Promise<SalesIntelResult> {
    // Get account from Salesforce
    const account = await this.getAccountFromSalesforce(accountId);

    // Step 1: Enrich with retries
    const enriched = await this.retryWithBackoff(
      () => this.enrichAccount(account.Name, account),
      maxRetries
    );

    // Step 2: Detect signals with retries
    const signals = await this.retryWithBackoff(
      () => this.detectSignals(enriched),
      maxRetries
    );

    // Step 3: Generate talking points with retries
    const talkingPoints = await this.retryWithBackoff(
      () => this.generateTalkingPoints(enriched, signals, account),
      maxRetries
    );

    // Step 4: Update Salesforce
    const sfUpdated = await this.updateSalesforce(accountId, {
      enriched,
      signals,
      talkingPoints,
    });

    return {
      enriched_data: enriched,
      buying_signals: signals,
      talking_points: talkingPoints,
      overall_score: signals.overall_score,
      timestamp: new Date().toISOString(),
      salesforce_updated: sfUpdated,
    };
  }

  private async getAccountFromSalesforce(accountId: string) {
    const result = await this.sfConnection.sobject('Account').retrieve(accountId);
    return result;
  }

  private async enrichAccount(companyName: string, sfAccount: any): Promise<any> {
    // Gather data from multiple sources
    const [zoomInfoData, newsData, techStackData] = await Promise.allSettled([
      this.getZoomInfoData(companyName),
      this.getNewsData(companyName),
      this.getTechStackData(sfAccount.Website),
    ]);

    // Combine all data
    const combinedData = {
      salesforce: sfAccount,
      zoominfo: zoomInfoData.status === 'fulfilled' ? zoomInfoData.value : null,
      news: newsData.status === 'fulfilled' ? newsData.value : null,
      tech_stack: techStackData.status === 'fulfilled' ? techStackData.value : null,
    };

    // Use Claude to structure
    const response = await this.anthropic.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 2048,
      messages: [
        {
          role: 'user',
          content: `Extract and structure company intelligence from these sources:\n\n${JSON.stringify(combinedData, null, 2)}\n\nOutput as JSON with: company_name, industry, employee_count, headquarters, tech_stack, key_contacts, recent_funding, expansion_plans, competitors, recent_news_summary`,
        },
      ],
    });

    const content = response.content[0];
    if (content.type !== 'text') throw new Error('Invalid response');
    return JSON.parse(content.text);
  }

  private async detectSignals(enrichedData: any): Promise<any> {
    const response = await this.anthropic.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 2048,
      messages: [
        {
          role: 'user',
          content: `Analyze for buying signals:\n\n${JSON.stringify(enrichedData, null, 2)}\n\nIdentify: funding_event, hiring_surge, tech_stack_gap, executive_change, market_expansion, competitor_switch. For each: type, strength, description, relevance, source, date. Include overall_score (0-100) and recommendation.`,
        },
      ],
    });

    const content = response.content[0];
    if (content.type !== 'text') throw new Error('Invalid response');
    return JSON.parse(content.text);
  }

  private async generateTalkingPoints(
    enrichedData: any,
    signals: any,
    account: any
  ): Promise<any> {
    const primaryContact = await this.getPrimaryContact(account.Id);

    const response = await this.anthropic.messages.create({
      model: 'claude-3-5-sonnet-20241022',
      max_tokens: 2048,
      messages: [
        {
          role: 'user',
          content: `Generate personalized talking points:\n\nCompany: ${JSON.stringify(enrichedData, null, 2)}\n\nSignals: ${JSON.stringify(signals, null, 2)}\n\nContact: ${primaryContact?.Name || 'Unknown'}\n\nGenerate: talking_points (4-5 with category, point, transition), email_subject, email_opening, meeting_agenda (4 topics). Be specific and conversational.`,
        },
      ],
    });

    const content = response.content[0];
    if (content.type !== 'text') throw new Error('Invalid response');
    return JSON.parse(content.text);
  }

  private async getPrimaryContact(accountId: string) {
    const result = await this.sfConnection
      .sobject('Contact')
      .find({ AccountId: accountId, IsPrimary__c: true })
      .limit(1)
      .execute();
    return result[0];
  }

  private async updateSalesforce(
    accountId: string,
    intel: any
  ): Promise<boolean> {
    try {
      // Update Account with enriched data
      await this.sfConnection.sobject('Account').update({
        Id: accountId,
        AI_Overall_Score__c: intel.signals.overall_score,
        AI_Last_Updated__c: new Date().toISOString(),
        AI_Recommendation__c: intel.signals.recommendation,
        AI_Key_Signals__c: JSON.stringify(
          intel.signals.signals.slice(0, 3)
        ),
      });

      // Create Task with talking points
      await this.sfConnection.sobject('Task').create({
        WhatId: accountId,
        Subject: 'AI-Generated Sales Intel',
        Description: `Email Subject: ${intel.talkingPoints.email_subject}\n\nOpening: ${intel.talkingPoints.email_opening}\n\nTalking Points:\n${intel.talkingPoints.talking_points.map((tp: any, i: number) => `${i + 1}. ${tp.point}`).join('\n')}`,
        Status: 'Not Started',
        Priority: intel.signals.overall_score > 70 ? 'High' : 'Normal',
      });

      return true;
    } catch (error) {
      console.error('Failed to update Salesforce:', error);
      return false;
    }
  }

  private async getZoomInfoData(companyName: string) {
    const response = await axios.get(
      'https://api.zoominfo.com/lookup/company',
      {
        headers: { Authorization: `Bearer ${process.env.ZOOMINFO_API_KEY}` },
        params: { companyName },
        timeout: 10000,
      }
    );
    return response.data;
  }

  private async getNewsData(companyName: string) {
    const response = await axios.get('https://newsapi.org/v2/everything', {
      params: {
        q: companyName,
        apiKey: process.env.NEWS_API_KEY,
        sortBy: 'publishedAt',
        pageSize: 5,
      },
      timeout: 10000,
    });
    return response.data.articles;
  }

  private async getTechStackData(website: string) {
    // Use BuiltWith or Clearbit for tech stack detection
    const response = await axios.get(
      `https://api.builtwith.com/v1/api.json?KEY=${process.env.BUILTWITH_API_KEY}&LOOKUP=${website}`,
      { timeout: 10000 }
    );
    return response.data;
  }

  private async retryWithBackoff<T>(
    fn: () => Promise<T>,
    maxRetries: number
  ): Promise<T> {
    let lastError: Error | null = null;

    for (let attempt = 0; attempt < maxRetries; attempt++) {
      try {
        return await Promise.race([
          fn(),
          new Promise<never>((_, reject) =>
            setTimeout(() => reject(new Error('Timeout')), 30000)
          ),
        ]);
      } catch (error) {
        lastError = error as Error;
        if (attempt < maxRetries - 1) {
          await new Promise((resolve) =>
            setTimeout(resolve, Math.pow(2, attempt) * 1000)
          );
        }
      }
    }

    throw lastError;
  }
}

// Usage
const automation = new SalesIntelAutomation();
await automation.authenticate();

const result = await automation.processAccount('001XXXXXXXXXXXXXXX');
console.log(`Score: ${result.overall_score}/100`);
console.log(`Salesforce Updated: ${result.salesforce_updated}`);

Level 3: Production Pattern with LangGraph & Queue

Good for: 1,000+ accounts/day | Setup time: 1 day

# Production Pattern with LangGraph & Queue (1000+ accounts/day)
from langgraph.graph import Graph, END
from typing import TypedDict, List, Optional
import openai
import asyncio
import aiohttp
from datetime import datetime
import redis
import json

class SalesIntelState(TypedDict):
    account_id: str
    company_name: str
    enriched_data: Optional[dict]
    buying_signals: Optional[dict]
    talking_points: Optional[dict]
    overall_score: int
    errors: List[str]
    retry_count: int
    timestamp: str

class SalesIntelPipeline:
    def __init__(self):
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            decode_responses=True
        )
        self.graph = self.build_graph()
    
    def build_graph(self):
        """Build LangGraph workflow"""
        graph = Graph()
        
        # Add nodes
        graph.add_node("enrich", self.enrich_node)
        graph.add_node("detect_signals", self.detect_signals_node)
        graph.add_node("generate_talking_points", self.generate_talking_points_node)
        graph.add_node("update_crm", self.update_crm_node)
        graph.add_node("handle_error", self.handle_error_node)
        
        # Add edges
        graph.set_entry_point("enrich")
        graph.add_conditional_edges(
            "enrich",
            self.check_enrichment_success,
            {
                "success": "detect_signals",
                "retry": "enrich",
                "fail": "handle_error"
            }
        )
        graph.add_conditional_edges(
            "detect_signals",
            self.check_signals_success,
            {
                "success": "generate_talking_points",
                "retry": "detect_signals",
                "fail": "handle_error"
            }
        )
        graph.add_edge("generate_talking_points", "update_crm")
        graph.add_edge("update_crm", END)
        graph.add_edge("handle_error", END)
        
        return graph.compile()
    
    async def enrich_node(self, state: SalesIntelState) -> SalesIntelState:
        """Enrich account data from multiple sources"""
        try:
            # Parallel API calls
            async with aiohttp.ClientSession() as session:
                tasks = [
                    self.get_zoominfo_data(session, state['company_name']),
                    self.get_news_data(session, state['company_name']),
                    self.get_linkedin_data(session, state['company_name']),
                    self.get_tech_stack_data(session, state['company_name'])
                ]
                results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Filter out errors
            valid_results = [r for r in results if not isinstance(r, Exception)]
            
            if not valid_results:
                state['errors'].append("All enrichment sources failed")
                return state
            
            # Combine and structure with LLM
            combined_data = {
                'zoominfo': results[0] if not isinstance(results[0], Exception) else None,
                'news': results[1] if not isinstance(results[1], Exception) else None,
                'linkedin': results[2] if not isinstance(results[2], Exception) else None,
                'tech_stack': results[3] if not isinstance(results[3], Exception) else None
            }
            
            enrichment_prompt = f"""Extract and structure company intelligence from these sources.

Company: {state['company_name']}

Data Sources:
{json.dumps(combined_data, indent=2)}

Output as JSON with: company_name, industry, employee_count, headquarters, tech_stack, key_contacts, recent_funding, expansion_plans, competitors, recent_news_summary"""

            response = openai.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": enrichment_prompt}],
                temperature=0.3,
                timeout=30
            )
            
            state['enriched_data'] = json.loads(response.choices[0].message.content)
            state['errors'] = []  # Clear errors on success
            
        except Exception as e:
            state['errors'].append(f"Enrichment failed: {str(e)}")
        
        return state
    
    async def detect_signals_node(self, state: SalesIntelState) -> SalesIntelState:
        """Detect buying signals from enriched data"""
        try:
            signal_prompt = f"""Analyze this company data for buying signals.

Company Data:
{json.dumps(state['enriched_data'], indent=2)}

Identify signals: funding_event, hiring_surge, tech_stack_gap, executive_change, market_expansion, competitor_switch.

For each: type, strength (high/medium/low), description, relevance, source, date.
Include: overall_score (0-100), recommendation.

Output as JSON."""

            response = openai.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": signal_prompt}],
                temperature=0.3,
                timeout=30
            )
            
            signals = json.loads(response.choices[0].message.content)
            state['buying_signals'] = signals
            state['overall_score'] = signals.get('overall_score', 0)
            state['errors'] = []
            
        except Exception as e:
            state['errors'].append(f"Signal detection failed: {str(e)}")
        
        return state
    
    async def generate_talking_points_node(self, state: SalesIntelState) -> SalesIntelState:
        """Generate personalized talking points"""
        try:
            talking_points_prompt = f"""Generate personalized sales talking points.

Company: {json.dumps(state['enriched_data'], indent=2)}
Signals: {json.dumps(state['buying_signals'], indent=2)}

Generate:
1. talking_points (4-5 with category, point, transition)
2. email_subject
3. email_opening
4. meeting_agenda (4 topics)

Be conversational and specific. Output as JSON."""

            response = openai.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": talking_points_prompt}],
                temperature=0.7,
                timeout=30
            )
            
            state['talking_points'] = json.loads(response.choices[0].message.content)
            state['errors'] = []
            
        except Exception as e:
            state['errors'].append(f"Talking points generation failed: {str(e)}")
        
        return state
    
    async def update_crm_node(self, state: SalesIntelState) -> SalesIntelState:
        """Update Salesforce with results"""
        try:
            # Store in Redis for CRM sync
            redis_key = f"sales_intel:{state['account_id']}"
            self.redis_client.setex(
                redis_key,
                86400,  # 24 hour expiry
                json.dumps({
                    'enriched_data': state['enriched_data'],
                    'buying_signals': state['buying_signals'],
                    'talking_points': state['talking_points'],
                    'overall_score': state['overall_score'],
                    'timestamp': state['timestamp']
                })
            )
            
            # Queue for Salesforce update (async worker will process)
            self.redis_client.lpush('salesforce_update_queue', json.dumps({
                'account_id': state['account_id'],
                'score': state['overall_score'],
                'signals': state['buying_signals']['signals'][:3],
                'talking_points': state['talking_points']
            }))
            
        except Exception as e:
            state['errors'].append(f"CRM update failed: {str(e)}")
        
        return state
    
    async def handle_error_node(self, state: SalesIntelState) -> SalesIntelState:
        """Handle errors and log"""
        print(f"Error processing {state['account_id']}: {state['errors']}")
        
        # Store error in Redis for monitoring
        self.redis_client.lpush('sales_intel_errors', json.dumps({
            'account_id': state['account_id'],
            'errors': state['errors'],
            'timestamp': state['timestamp']
        }))
        
        return state
    
    def check_enrichment_success(self, state: SalesIntelState) -> str:
        """Route based on enrichment success"""
        if state['enriched_data']:
            return "success"
        elif state['retry_count'] < 3:
            state['retry_count'] += 1
            return "retry"
        else:
            return "fail"
    
    def check_signals_success(self, state: SalesIntelState) -> str:
        """Route based on signal detection success"""
        if state['buying_signals']:
            return "success"
        elif state['retry_count'] < 3:
            state['retry_count'] += 1
            return "retry"
        else:
            return "fail"
    
    async def get_zoominfo_data(self, session: aiohttp.ClientSession, company_name: str):
        """Fetch ZoomInfo data"""
        async with session.get(
            'https://api.zoominfo.com/lookup/company',
            headers={'Authorization': f'Bearer {ZOOMINFO_API_KEY}'},
            params={'companyName': company_name},
            timeout=10
        ) as response:
            return await response.json()
    
    async def get_news_data(self, session: aiohttp.ClientSession, company_name: str):
        """Fetch news data"""
        async with session.get(
            'https://newsapi.org/v2/everything',
            params={
                'q': company_name,
                'apiKey': NEWS_API_KEY,
                'sortBy': 'publishedAt',
                'pageSize': 5
            },
            timeout=10
        ) as response:
            data = await response.json()
            return data.get('articles', [])
    
    async def get_linkedin_data(self, session: aiohttp.ClientSession, company_name: str):
        """Fetch LinkedIn data (requires Sales Navigator API)"""
        # Placeholder - implement with LinkedIn Sales Nav API
        return {'company': company_name, 'employees': []}
    
    async def get_tech_stack_data(self, session: aiohttp.ClientSession, company_name: str):
        """Fetch tech stack data"""
        # Placeholder - implement with BuiltWith or Clearbit
        return {'technologies': []}
    
    async def process_account(self, account_id: str, company_name: str) -> dict:
        """Process a single account through the pipeline"""
        initial_state: SalesIntelState = {
            'account_id': account_id,
            'company_name': company_name,
            'enriched_data': None,
            'buying_signals': None,
            'talking_points': None,
            'overall_score': 0,
            'errors': [],
            'retry_count': 0,
            'timestamp': datetime.now().isoformat()
        }
        
        result = await self.graph.ainvoke(initial_state)
        return result

# Usage
pipeline = SalesIntelPipeline()

# Process single account
result = await pipeline.process_account(
    account_id='001XXXXXXXXXXXXXXX',
    company_name='Acme Corp'
)

print(f"Score: {result['overall_score']}/100")
print(f"Errors: {len(result['errors'])}")

# Process batch (1000+ accounts)
accounts = [
    ('001XXX1', 'Company A'),
    ('001XXX2', 'Company B'),
    # ... 1000 more
]

tasks = [pipeline.process_account(aid, name) for aid, name in accounts]
results = await asyncio.gather(*tasks)

print(f"Processed {len(results)} accounts")
print(f"High priority: {len([r for r in results if r['overall_score'] > 70])}")

When to Level Up

1

Start: Simple API Calls

0-100 accounts/day

  • Sequential API calls to ZoomInfo, News API, OpenAI
  • Basic error logging with print statements
  • Manual retry on failures
  • Store results in CSV or local database
2

Scale: Add Reliability & CRM Integration

100-1,000 accounts/day

  • Automatic retries with exponential backoff
  • Parallel API calls with asyncio/Promise.all
  • Salesforce API integration for automatic updates
  • Error tracking with Sentry or CloudWatch
  • Rate limiting and quota management
3

Production: Framework & Queue

1,000-5,000 accounts/day

  • LangGraph for complex workflows with conditional routing
  • Redis queue for batch processing (handle spikes)
  • State persistence (resume failed jobs without re-enriching)
  • Async workers for parallel processing (10-50 concurrent)
  • Monitoring dashboard (Grafana) showing success rates, API costs
4

Enterprise: Multi-Agent System

5,000+ accounts/day

  • Specialized agents: enrichment, signal detection, personalization, CRM sync
  • Load balancing across multiple LLM providers (Claude → GPT-4 → Gemini fallback)
  • Real-time Salesforce sync with webhooks (no polling)
  • A/B testing for talking point effectiveness (track reply rates)
  • Cost optimization: cache enrichment data, use cheaper models for simple tasks

Sales-Specific Gotchas

The code examples work. But sales has unique challenges you need to handle.

API Rate Limits & Quota Management

ZoomInfo, LinkedIn Sales Nav, and news APIs have strict rate limits. You'll hit them fast at scale. Implement smart caching and quota tracking.

import time
from functools import wraps
import redis

redis_client = redis.Redis()

def rate_limit(calls_per_minute: int, api_name: str):
    """Rate limit decorator with Redis"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = f"rate_limit:{api_name}"
            current = redis_client.get(key)
            
            if current and int(current) >= calls_per_minute:
                # Wait until window resets
                ttl = redis_client.ttl(key)
                time.sleep(ttl)
            
            # Increment counter
            pipe = redis_client.pipeline()
            pipe.incr(key)
            pipe.expire(key, 60)  # 1 minute window
            pipe.execute()
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(calls_per_minute=100, api_name='zoominfo')
def get_zoominfo_data(company_name: str):
    # API call here
    pass

# Also cache results to avoid duplicate calls
def get_cached_enrichment(company_name: str):
    cache_key = f"enrichment:{company_name}"
    cached = redis_client.get(cache_key)
    
    if cached:
        return json.loads(cached)
    
    # Fetch fresh data
    data = get_zoominfo_data(company_name)
    
    # Cache for 7 days
    redis_client.setex(cache_key, 604800, json.dumps(data))
    return data

Stale Data & Refresh Strategy

Company data changes constantly: funding rounds, executive changes, hiring surges. You need a smart refresh strategy that prioritizes high-value accounts.

interface RefreshPriority {
  accountId: string;
  lastUpdated: Date;
  dealStage: string;
  dealValue: number;
  priority: number;
}

function calculateRefreshPriority(account: any): RefreshPriority {
  const daysSinceUpdate = Math.floor(
    (Date.now() - new Date(account.lastUpdated).getTime()) / (1000 * 60 * 60 * 24)
  );
  
  let priority = 0;
  
  // High-value deals get frequent updates
  if (account.dealValue > 100000) priority += 50;
  else if (account.dealValue > 50000) priority += 30;
  
  // Late-stage deals need fresh intel
  if (account.dealStage === 'Negotiation') priority += 40;
  else if (account.dealStage === 'Proposal') priority += 30;
  else if (account.dealStage === 'Discovery') priority += 20;
  
  // Stale data needs refresh
  if (daysSinceUpdate > 30) priority += 40;
  else if (daysSinceUpdate > 14) priority += 20;
  else if (daysSinceUpdate > 7) priority += 10;
  
  return {
    accountId: account.id,
    lastUpdated: account.lastUpdated,
    dealStage: account.dealStage,
    dealValue: account.dealValue,
    priority,
  };
}

// Refresh high-priority accounts first
async function refreshAccounts() {
  const accounts = await getAccountsNeedingRefresh();
  const prioritized = accounts
    .map(calculateRefreshPriority)
    .sort((a, b) => b.priority - a.priority);
  
  // Process top 100 high-priority accounts daily
  for (const account of prioritized.slice(0, 100)) {
    await processAccount(account.accountId);
  }
}

Multi-Contact Personalization

Enterprise deals involve multiple stakeholders: CTO, VP Eng, Director of Ops. Each needs different talking points. Don't send the same message to everyone.

def generate_role_specific_talking_points(
    enriched_data: dict,
    signals: dict,
    contact_role: str
) -> dict:
    """Generate talking points tailored to contact's role"""
    
    role_focus = {
        'CTO': 'technical architecture, security, scalability, integration',
        'VP Engineering': 'team productivity, developer experience, hiring, retention',
        'Director of Operations': 'cost savings, efficiency, ROI, implementation timeline',
        'CEO': 'business impact, revenue growth, competitive advantage, market position',
        'CFO': 'budget, ROI, cost reduction, financial metrics'
    }
    
    focus_areas = role_focus.get(contact_role, 'general business value')
    
    prompt = f"""Generate personalized talking points for a {contact_role}.

Company: {json.dumps(enriched_data, indent=2)}
Signals: {json.dumps(signals, indent=2)}

Focus on: {focus_areas}

For a {contact_role}, emphasize:
- Their specific pain points and priorities
- Metrics they care about
- How our solution helps their department
- ROI in their terms

Generate: talking_points (4-5), email_subject, email_opening, meeting_agenda.
Be specific to their role. Output as JSON."""

    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.7
    )
    
    return json.loads(response.choices[0].message.content)

# Usage: generate different talking points for each contact
contacts = [
    {'name': 'Sarah Chen', 'role': 'CTO'},
    {'name': 'Michael Torres', 'role': 'VP Engineering'},
    {'name': 'Lisa Park', 'role': 'Director of Operations'}
]

for contact in contacts:
    talking_points = generate_role_specific_talking_points(
        enriched_data,
        signals,
        contact['role']
    )
    print(f"\n{contact['name']} ({contact['role']}):")
    print(f"Subject: {talking_points['email_subject']}")

Competitive Intelligence & Displacement

If they're using a competitor, you need different talking points focused on switching costs, migration, and why now is the time to change.

interface CompetitorContext {
  competitor: string;
  weaknesses: string[];
  switchingCost: string;
  migrationTime: string;
  displacementStrategy: string;
}

const competitorIntel: Record<string, CompetitorContext> = {
  'CompetitorA': {
    competitor: 'CompetitorA',
    weaknesses: [
      'Poor integration with modern tools',
      'Expensive enterprise pricing',
      'Slow feature releases',
      'Limited API access'
    ],
    switchingCost: 'Low - our migration tool handles 90% automatically',
    migrationTime: '2-4 weeks with zero downtime',
    displacementStrategy: 'Focus on technical debt and integration pain',
  },
  'CompetitorB': {
    competitor: 'CompetitorB',
    weaknesses: [
      'Complex setup requiring consultants',
      'Poor customer support',
      'Outdated UI/UX',
      'No mobile app'
    ],
    switchingCost: 'Medium - training required but intuitive UI',
    migrationTime: '3-6 weeks',
    displacementStrategy: 'Emphasize ease of use and support quality',
  },
};

function generateDisplacementTalkingPoints(
  enrichedData: any,
  signals: any,
  currentTool: string
): any {
  const competitorContext = competitorIntel[currentTool];
  
  if (!competitorContext) {
    // Not using a known competitor - standard approach
    return generateTalkingPoints(enrichedData, signals, null);
  }
  
  const prompt = `Generate displacement talking points for switching from ${currentTool}.

Company: ${JSON.stringify(enrichedData)}
Signals: ${JSON.stringify(signals)}

Competitor Weaknesses:
${competitorContext.weaknesses.map((w, i) => `${i + 1}. ${w}`).join('\n')}

Switching Cost: ${competitorContext.switchingCost}
Migration Time: ${competitorContext.migrationTime}
Strategy: ${competitorContext.displacementStrategy}

Generate talking points that:
1. Acknowledge their current tool (don't bash it)
2. Highlight specific pain points they're likely experiencing
3. Show how we solve those specific issues
4. Address switching concerns (cost, time, risk)
5. Create urgency (why switch now vs later)

Output as JSON with: talking_points, email_subject, email_opening, meeting_agenda, objection_handlers`;
  
  // Call LLM with displacement context
  return callLLM(prompt);
}

// Usage
if (enrichedData.tech_stack.includes('CompetitorA')) {
  const displacementPoints = generateDisplacementTalkingPoints(
    enrichedData,
    signals,
    'CompetitorA'
  );
  console.log('Displacement strategy:', displacementPoints);
}

Signal Decay & Timing Windows

Buying signals have expiration dates. A funding round is hot for 30 days, then everyone's already pitched them. Track signal freshness and prioritize accordingly.

from datetime import datetime, timedelta

def calculate_signal_urgency(signal: dict) -> dict:
    """Calculate how urgent it is to act on this signal"""
    
    signal_date = datetime.fromisoformat(signal['date'])
    days_old = (datetime.now() - signal_date).days
    
    # Different signals have different decay rates
    decay_windows = {
        'funding_event': 30,      # Hot for 30 days
        'executive_change': 45,   # New exec evaluates for ~45 days
        'hiring_surge': 60,       # Hiring momentum lasts ~60 days
        'market_expansion': 90,   # Expansion planning is longer
        'tech_stack_gap': 180,    # Tech debt is chronic
        'competitor_switch': 14   # Switching signals are urgent
    }
    
    decay_window = decay_windows.get(signal['type'], 60)
    urgency_score = max(0, 100 - (days_old / decay_window * 100))
    
    # Categorize urgency
    if urgency_score > 80:
        urgency_level = 'critical'  # Act today
        action = 'Reach out immediately - signal is fresh'
    elif urgency_score > 60:
        urgency_level = 'high'      # Act this week
        action = 'Schedule outreach within 2-3 days'
    elif urgency_score > 40:
        urgency_level = 'medium'    # Act this month
        action = 'Add to outreach queue for next week'
    else:
        urgency_level = 'low'       # Signal is stale
        action = 'Monitor for new signals before reaching out'
    
    return {
        **signal,
        'days_old': days_old,
        'urgency_score': round(urgency_score, 1),
        'urgency_level': urgency_level,
        'recommended_action': action,
        'expires_in_days': max(0, decay_window - days_old)
    }

def prioritize_accounts_by_signal_urgency(accounts: list) -> list:
    """Sort accounts by most urgent signals"""
    
    for account in accounts:
        # Calculate urgency for each signal
        account['signals'] = [
            calculate_signal_urgency(signal)
            for signal in account['signals']
        ]
        
        # Overall account urgency = highest signal urgency
        account['max_urgency'] = max(
            [s['urgency_score'] for s in account['signals']],
            default=0
        )
    
    # Sort by urgency (highest first)
    return sorted(accounts, key=lambda a: a['max_urgency'], reverse=True)

# Usage
accounts = get_all_accounts_with_signals()
prioritized = prioritize_accounts_by_signal_urgency(accounts)

# Focus on critical urgency accounts first
critical_accounts = [
    a for a in prioritized
    if any(s['urgency_level'] == 'critical' for s in a['signals'])
]

print(f"Critical accounts to contact today: {len(critical_accounts)}")
for account in critical_accounts[:10]:
    print(f"\n{account['company_name']}:")
    for signal in account['signals']:
        if signal['urgency_level'] == 'critical':
            print(f"  - {signal['description']} (expires in {signal['expires_in_days']} days)")
            print(f"    Action: {signal['recommended_action']}")

Cost Calculator

Manual Research Process

Sales rep time (2 hours/day researching)
$50/hour × 2 hours × 20 days = $2,000/month per rep
ZoomInfo subscription
$300/month per seat
LinkedIn Sales Navigator
$100/month per seat
Opportunity cost (could be selling instead)
~$5,000/month in lost deals per rep
Total:$7,400/month per rep
monthly

Limitations:

  • Can only research 10-15 accounts/day
  • Intel is 24-48 hours stale by the time they call
  • Inconsistent quality (depends on rep's research skills)
  • No systematic signal tracking
  • Talking points are generic, not personalized

Automated Intelligence System

OpenAI API (GPT-4)
$0.03 per account × 500 accounts = $15/day = $300/month
ZoomInfo API
$500/month (bulk pricing)
News API + BuiltWith
$200/month combined
Infrastructure (AWS/Redis)
$100/month
Maintenance (developer time)
4 hours/month × $100/hour = $400/month
Total:$1,500/month for entire team
monthly

Benefits:

  • Process 500+ accounts/day automatically
  • Real-time intel (updated every 24 hours)
  • Consistent quality across all accounts
  • Systematic signal tracking and prioritization
  • Role-specific personalization for each contact
  • Competitive displacement strategies
  • Signal urgency scoring and decay tracking
$196/day saved
80% cost reduction | $5,900/month | $70,800/year
Note: Per sales rep
💡 Pays for itself in first month. With 5 reps, saves $354,000/year.
💼

Want This Running in Your Salesforce?

We build custom sales intelligence systems that integrate with your CRM, data sources, and workflows. From simple enrichment to full multi-agent platforms.