The Problem
On Monday you tested the 3 prompts in ChatGPT. You saw how segment analysis → content generation → A/B testing works. But here's the reality: you can't manually personalize 10,000 emails per day. Your team spends hours copying customer data, pasting into prompts, and manually scheduling sends. Meanwhile, your open rates stay stuck at 18% because the emails still feel generic.
See It Work
Watch the 3 prompts chain together automatically. This is what you'll build.
The Code
Three levels: start simple, add intelligence, then scale to production. Pick where you are.
Level 1: Simple API Calls
Good for: 0-500 emails/day | Setup time: 30 minutes
# Simple Email Personalization (0-500 emails/day) import openai import json from sendgrid import SendGridAPIClient from sendgrid.helpers.mail import Mail def personalize_email(customer_data: dict) -> dict: """Chain the 3 prompts: analyze → generate → test""" # Step 1: Analyze customer segment analysis_prompt = f"""Analyze this customer for email personalization. Extract: engagement signals, pain points, urgency score, personalization hooks. Customer data: {json.dumps(customer_data)} Output as JSON with: engagement_signals (list), pain_point (string), urgency_score (1-10), personalization_hooks (list).""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": analysis_prompt}], temperature=0.3 ) analysis = json.loads(response.choices[0].message.content) # Step 2: Generate personalized content content_prompt = f"""Write a personalized email for this customer. Customer analysis: {json.dumps(analysis)} Customer name: {customer_data['name']} Company: {customer_data.get('company', 'their company')} Pain point: {analysis['pain_point']} Write: 1. Two subject line variants (A/B test) 2. Email body with: casual opening, value prop, CTA, P.S. 3. Use personalization hooks naturally (no templates) Output as JSON with: subject_lines (array), email_body (object with opening/value_prop/cta/ps).""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": content_prompt}], temperature=0.7 ) content = json.loads(response.choices[0].message.content) # Step 3: Setup A/B test (simple 50/50 split) import random variant = 'A' if random.random() < 0.5 else 'B' subject = content['subject_lines'][0 if variant == 'A' else 1]['text'] return { "analysis": analysis, "content": content, "variant": variant, "subject": subject, "body": content['email_body'] } def send_personalized_email(customer_data: dict, sendgrid_api_key: str): """Generate and send personalized email""" # Generate personalized content email_data = personalize_email(customer_data) # Send via SendGrid message = Mail( from_email='your-email@company.com', to_emails=customer_data['email'], subject=email_data['subject'], html_content=f""" <html> <body> <p>{email_data['body']['opening']}</p> <p>{email_data['body']['value_prop']}</p> <p>{email_data['body']['cta']}</p> <p><em>{email_data['body']['ps']}</em></p> </body> </html> """ ) # Add custom args for tracking message.custom_arg = [ {"key": "variant", "value": email_data['variant']}, {"key": "urgency_score", "value": str(email_data['analysis']['urgency_score'])} ] sg = SendGridAPIClient(sendgrid_api_key) response = sg.send(message) return { "status": response.status_code, "variant": email_data['variant'], "subject": email_data['subject'] } # Usage customer = { "name": "Jessica Martinez", "email": "jessica@startup.com", "company": "StartupCo", "signup_date": "2025-08-09", "pricing_views": 5, "whitepaper_download": "Scaling Customer Support", "trial_status": "not_started" } result = send_personalized_email(customer, "your_sendgrid_api_key") print(f"Sent variant {result['variant']}: {result['subject']}")
Level 2: With Behavioral Triggers & Optimization
Good for: 500-5,000 emails/day | Setup time: 2 hours
// With Behavioral Triggers & Optimization (500-5K emails/day) import Anthropic from '@anthropic-ai/sdk'; import sgMail from '@sendgrid/mail'; import { AnalyticsClient } from '@segment/analytics-node'; interface CustomerData { id: string; email: string; name: string; company?: string; events: Array<{ type: string; timestamp: string; properties?: Record<string, any>; }>; attributes: Record<string, any>; } interface EmailResult { sent: boolean; variant: string; subject: string; scheduled_time?: string; } class EmailPersonalizationEngine { private anthropic: Anthropic; private analytics: AnalyticsClient; private sendgridApiKey: string; constructor(anthropicKey: string, sendgridKey: string, segmentKey: string) { this.anthropic = new Anthropic({ apiKey: anthropicKey }); this.analytics = new AnalyticsClient({ writeKey: segmentKey }); this.sendgridApiKey = sendgridKey; sgMail.setApiKey(sendgridKey); } async checkTriggerConditions(customer: CustomerData): Promise<boolean> { // Define behavioral triggers const triggers = [ { name: 'high_intent', conditions: [ () => this.countEvents(customer, 'pricing_page_view') >= 3, () => this.daysSinceSignup(customer) <= 7, () => customer.attributes.trial_status === 'not_started', ], }, { name: 'engagement_drop', conditions: [ () => this.daysSinceLastActivity(customer) >= 3, () => this.daysSinceLastActivity(customer) <= 7, () => customer.attributes.trial_status === 'active', ], }, ]; // Check if any trigger matches for (const trigger of triggers) { const allMatch = trigger.conditions.every((condition) => condition()); if (allMatch) { console.log(`Trigger matched: ${trigger.name}`); return true; } } return false; } private countEvents(customer: CustomerData, eventType: string): number { return customer.events.filter((e) => e.type === eventType).length; } private daysSinceSignup(customer: CustomerData): number { const signup = customer.events.find((e) => e.type === 'signed_up'); if (!signup) return 999; const days = Math.floor( (Date.now() - new Date(signup.timestamp).getTime()) / (1000 * 60 * 60 * 24) ); return days; } private daysSinceLastActivity(customer: CustomerData): number { if (customer.events.length === 0) return 999; const lastEvent = customer.events.sort( (a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime() )[0]; const days = Math.floor( (Date.now() - new Date(lastEvent.timestamp).getTime()) / (1000 * 60 * 60 * 24) ); return days; } async personalizeEmail(customer: CustomerData): Promise<any> { // Step 1: Analyze with Claude const analysisResponse = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [ { role: 'user', content: `Analyze this customer for email personalization:\n${JSON.stringify(customer)}\n\nOutput JSON with: engagement_signals, pain_point, urgency_score (1-10), personalization_hooks.`, }, ], }); const analysisContent = analysisResponse.content[0]; if (analysisContent.type !== 'text') throw new Error('Invalid response'); const analysis = JSON.parse(analysisContent.text); // Step 2: Generate content const contentResponse = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, temperature: 0.7, messages: [ { role: 'user', content: `Write personalized email for ${customer.name}.\n\nAnalysis: ${JSON.stringify(analysis)}\n\nOutput JSON with: subject_lines (2 variants), email_body (opening/value_prop/cta/ps).`, }, ], }); const contentText = contentResponse.content[0]; if (contentText.type !== 'text') throw new Error('Invalid response'); const content = JSON.parse(contentText.text); // Step 3: Optimize send time based on past opens const optimalTime = await this.predictOptimalSendTime(customer); return { analysis, content, optimal_send_time: optimalTime, }; } async predictOptimalSendTime(customer: CustomerData): Promise<Date> { // Analyze past email open times const emailOpens = customer.events.filter((e) => e.type === 'email_opened'); if (emailOpens.length < 3) { // Default to 10am in customer's timezone return new Date(Date.now() + 24 * 60 * 60 * 1000); } // Find most common hour const hours = emailOpens.map((e) => new Date(e.timestamp).getHours()); const hourCounts = hours.reduce((acc, hour) => { acc[hour] = (acc[hour] || 0) + 1; return acc; }, {} as Record<number, number>); const bestHour = Object.entries(hourCounts).sort((a, b) => b[1] - a[1])[0][0]; // Schedule for tomorrow at that hour const tomorrow = new Date(); tomorrow.setDate(tomorrow.getDate() + 1); tomorrow.setHours(parseInt(bestHour), 0, 0, 0); return tomorrow; } async sendWithABTest( customer: CustomerData, emailData: any ): Promise<EmailResult> { // 50/50 A/B split const variant = Math.random() < 0.5 ? 'A' : 'B'; const subject = emailData.content.subject_lines[variant === 'A' ? 0 : 1].text; const msg = { to: customer.email, from: 'hello@yourcompany.com', subject, html: ` <html> <body style="font-family: sans-serif; line-height: 1.6;"> <p>${emailData.content.email_body.opening}</p> <p>${emailData.content.email_body.value_prop}</p> <p>${emailData.content.email_body.cta}</p> <p><em>${emailData.content.email_body.ps}</em></p> </body> </html> `, customArgs: { variant, customer_id: customer.id, urgency_score: emailData.analysis.urgency_score.toString(), }, sendAt: Math.floor(emailData.optimal_send_time.getTime() / 1000), }; await sgMail.send(msg); // Track in Segment this.analytics.track({ userId: customer.id, event: 'Email Sent', properties: { variant, subject, scheduled_time: emailData.optimal_send_time.toISOString(), urgency_score: emailData.analysis.urgency_score, }, }); return { sent: true, variant, subject, scheduled_time: emailData.optimal_send_time.toISOString(), }; } async processCustomer(customer: CustomerData): Promise<EmailResult | null> { // Check if triggers match const shouldSend = await this.checkTriggerConditions(customer); if (!shouldSend) { console.log(`No triggers matched for ${customer.name}`); return null; } // Generate personalized email const emailData = await this.personalizeEmail(customer); // Send with A/B test return await this.sendWithABTest(customer, emailData); } } // Usage const engine = new EmailPersonalizationEngine( process.env.ANTHROPIC_API_KEY!, process.env.SENDGRID_API_KEY!, process.env.SEGMENT_WRITE_KEY! ); const customer: CustomerData = { id: 'cust_123', email: 'jessica@startup.com', name: 'Jessica Martinez', company: 'StartupCo', events: [ { type: 'signed_up', timestamp: '2025-08-09T10:00:00Z' }, { type: 'pricing_page_view', timestamp: '2025-08-10T14:30:00Z' }, { type: 'pricing_page_view', timestamp: '2025-08-11T09:15:00Z' }, { type: 'pricing_page_view', timestamp: '2025-08-11T16:45:00Z' }, { type: 'whitepaper_download', timestamp: '2025-08-11T17:00:00Z' }, ], attributes: { trial_status: 'not_started', company_size: 15, }, }; const result = await engine.processCustomer(customer); if (result) { console.log(`Sent variant ${result.variant} at ${result.scheduled_time}`); }
Level 3: Production Pattern with LangGraph
Good for: 5,000-50,000 emails/day | Setup time: 1 day
# Production Pattern with LangGraph (5K-50K emails/day) from langgraph.graph import Graph, END from typing import TypedDict, List, Optional import openai import asyncio from datetime import datetime, timedelta import json class EmailState(TypedDict): customer_id: str customer_data: dict triggers_matched: List[str] analysis: Optional[dict] content_variants: Optional[List[dict]] winning_variant: Optional[str] send_time: Optional[datetime] sent: bool follow_ups: List[dict] def check_triggers_node(state: EmailState) -> EmailState: """Check if behavioral triggers match""" customer = state['customer_data'] matched = [] # High intent trigger pricing_views = len([e for e in customer['events'] if e['type'] == 'pricing_page_view']) if pricing_views >= 3 and customer['attributes'].get('trial_status') == 'not_started': matched.append('high_intent') # Engagement drop trigger last_event = max([e['timestamp'] for e in customer['events']]) days_inactive = (datetime.now() - datetime.fromisoformat(last_event)).days if 3 <= days_inactive <= 7: matched.append('engagement_drop') # Cart abandonment trigger if any(e['type'] == 'checkout_started' for e in customer['events']): if not any(e['type'] == 'purchase_completed' for e in customer['events']): matched.append('cart_abandonment') state['triggers_matched'] = matched return state def analyze_customer_node(state: EmailState) -> EmailState: """Deep analysis of customer behavior""" customer = state['customer_data'] analysis_prompt = f"""Analyze this customer's behavior for email personalization. Customer data: {json.dumps(customer)} Triggered by: {', '.join(state['triggers_matched'])} Provide: 1. Engagement signals (what they've done) 2. Intent score (1-10) 3. Pain points (inferred from behavior) 4. Personalization hooks (specific details to reference) 5. Recommended tone (casual/professional/urgent) Output as JSON.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": analysis_prompt}], temperature=0.3 ) state['analysis'] = json.loads(response.choices[0].message.content) return state def generate_variants_node(state: EmailState) -> EmailState: """Generate multiple content variants for testing""" content_prompt = f"""Generate 3 email variants for A/B/C testing. Customer: {state['customer_data']['name']} Analysis: {json.dumps(state['analysis'])} Triggers: {', '.join(state['triggers_matched'])} For each variant, provide: - Subject line - Email body (opening, value_prop, cta, ps) - Predicted performance (open_rate, click_rate) - Variant strategy (e.g., 'urgency-focused', 'value-focused', 'social-proof') Output as JSON array.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": content_prompt}], temperature=0.8 ) state['content_variants'] = json.loads(response.choices[0].message.content) return state def select_winner_node(state: EmailState) -> EmailState: """Select best variant based on historical performance""" variants = state['content_variants'] # In production, query your analytics DB for similar campaigns # For now, use predicted performance best_variant = max(variants, key=lambda v: v['predicted_performance']['open_rate']) state['winning_variant'] = best_variant['variant_id'] return state def optimize_send_time_node(state: EmailState) -> EmailState: """Predict optimal send time""" customer = state['customer_data'] # Analyze past email opens email_opens = [e for e in customer['events'] if e['type'] == 'email_opened'] if len(email_opens) >= 3: # Find most common hour hours = [datetime.fromisoformat(e['timestamp']).hour for e in email_opens] best_hour = max(set(hours), key=hours.count) else: # Default to 10am best_hour = 10 # Schedule for next occurrence of that hour now = datetime.now() send_time = now.replace(hour=best_hour, minute=0, second=0, microsecond=0) if send_time < now: send_time += timedelta(days=1) state['send_time'] = send_time return state def send_email_node(state: EmailState) -> EmailState: """Send the email via SendGrid/Mailgun""" import sendgrid from sendgrid.helpers.mail import Mail winner = next(v for v in state['content_variants'] if v['variant_id'] == state['winning_variant']) message = Mail( from_email='hello@yourcompany.com', to_emails=state['customer_data']['email'], subject=winner['subject'], html_content=f""" <html> <body> <p>{winner['email_body']['opening']}</p> <p>{winner['email_body']['value_prop']}</p> <p>{winner['email_body']['cta']}</p> <p><em>{winner['email_body']['ps']}</em></p> </body> </html> """ ) # Schedule send message.send_at = int(state['send_time'].timestamp()) # Add tracking message.custom_arg = [ {"key": "customer_id", "value": state['customer_id']}, {"key": "variant", "value": state['winning_variant']}, {"key": "trigger", "value": ','.join(state['triggers_matched'])} ] sg = sendgrid.SendGridAPIClient(api_key='your_sendgrid_key') response = sg.send(message) state['sent'] = response.status_code == 202 return state def schedule_follow_ups_node(state: EmailState) -> EmailState: """Schedule follow-up emails based on behavior""" # Follow-up 1: If not opened after 48 hours state['follow_ups'].append({ "delay_hours": 48, "condition": "email_opened == false", "subject": "Quick question about your workflow", "angle": "softer_approach" }) # Follow-up 2: If opened but not clicked state['follow_ups'].append({ "delay_hours": 72, "condition": "email_opened == true AND link_clicked == false", "subject": "That demo I mentioned—still interested?", "angle": "direct_cta" }) return state def should_send(state: EmailState) -> str: """Route based on triggers""" if len(state['triggers_matched']) > 0: return "analyze" else: return "skip" # Build the graph def build_email_graph(): graph = Graph() # Add nodes graph.add_node("check_triggers", check_triggers_node) graph.add_node("analyze", analyze_customer_node) graph.add_node("generate_variants", generate_variants_node) graph.add_node("select_winner", select_winner_node) graph.add_node("optimize_time", optimize_send_time_node) graph.add_node("send", send_email_node) graph.add_node("schedule_follow_ups", schedule_follow_ups_node) # Add edges graph.set_entry_point("check_triggers") graph.add_conditional_edges( "check_triggers", should_send, { "analyze": "analyze", "skip": END } ) graph.add_edge("analyze", "generate_variants") graph.add_edge("generate_variants", "select_winner") graph.add_edge("select_winner", "optimize_time") graph.add_edge("optimize_time", "send") graph.add_edge("send", "schedule_follow_ups") graph.add_edge("schedule_follow_ups", END) return graph.compile() # Usage email_graph = build_email_graph() initial_state = { "customer_id": "cust_123", "customer_data": { "email": "jessica@startup.com", "name": "Jessica Martinez", "events": [ {"type": "signed_up", "timestamp": "2025-08-09T10:00:00"}, {"type": "pricing_page_view", "timestamp": "2025-08-10T14:30:00"}, {"type": "pricing_page_view", "timestamp": "2025-08-11T09:15:00"}, {"type": "pricing_page_view", "timestamp": "2025-08-11T16:45:00"}, ], "attributes": {"trial_status": "not_started"} }, "triggers_matched": [], "analysis": None, "content_variants": None, "winning_variant": None, "send_time": None, "sent": False, "follow_ups": [] } result = email_graph.invoke(initial_state) print(f"Email sent: {result['sent']}") print(f"Scheduled for: {result['send_time']}") print(f"Follow-ups: {len(result['follow_ups'])}")
When to Level Up
Start: Simple API Calls
0-500 emails/day
- Sequential API calls to OpenAI/Anthropic
- Basic A/B testing (50/50 random split)
- Manual trigger checking (run script on schedule)
- SendGrid/Mailgun for sending
Scale: Add Intelligence
500-5,000 emails/day
- Behavioral trigger detection (pricing views, activity drops, cart abandonment)
- Optimal send time prediction (based on past opens)
- Event tracking with Segment/Mixpanel
- Automatic follow-up sequences based on engagement
Production: Framework & Orchestration
5,000-50,000 emails/day
- LangGraph for complex workflows with conditional routing
- Multi-variant testing (A/B/C/D with automatic winner selection)
- Real-time CDP integration (Segment, mParticle, Rudderstack)
- Dynamic follow-up chains (behavior-triggered sequences)
Enterprise: Multi-Agent System
50,000+ emails/day
- Specialized agents (content generation, send-time optimization, deliverability monitoring)
- Multi-channel orchestration (email + SMS + push + in-app)
- Real-time personalization (content changes based on current behavior)
- Advanced attribution (multi-touch revenue tracking across channels)
Marketing-Specific Gotchas
The code examples above work. But email marketing has unique challenges you need to handle.
Deliverability & Spam Filters
Over-personalization can trigger spam filters. Avoid: excessive caps, too many links, spammy words like 'FREE' or 'ACT NOW'. Warm up new IP addresses gradually. Monitor sender reputation with tools like Postmark or Mailgun analytics.
def check_spam_score(subject: str, body: str) -> dict: """Check for spam triggers before sending""" spam_words = ['free', 'act now', 'limited time', 'click here', 'buy now'] issues = [] # Check subject line if subject.isupper(): issues.append('Subject line all caps') if sum(1 for word in spam_words if word.lower() in subject.lower()) > 2: issues.append('Too many spam words in subject') # Check body link_count = body.count('http') if link_count > 5: issues.append(f'Too many links ({link_count})') if sum(1 for word in spam_words if word.lower() in body.lower()) > 3: issues.append('Too many spam words in body') return { 'is_safe': len(issues) == 0, 'issues': issues, 'spam_score': len(issues) } # Check before sending spam_check = check_spam_score(subject, email_body) if not spam_check['is_safe']: print(f"Warning: {spam_check['issues']}") # Regenerate content or flag for manual review
Unsubscribe & Compliance (CAN-SPAM, GDPR)
Every email must include an unsubscribe link. CAN-SPAM requires physical address. GDPR requires explicit consent for EU recipients. Track opt-outs in real-time to avoid sending to unsubscribed users.
interface EmailCompliance { hasUnsubscribeLink: boolean; hasPhysicalAddress: boolean; hasConsentRecord: boolean; recipientRegion: string; } async function checkCompliance( email: string, emailHtml: string ): Promise<EmailCompliance> { // Check for unsubscribe link const hasUnsubscribe = emailHtml.includes('unsubscribe') || emailHtml.includes('opt-out'); // Check for physical address (required by CAN-SPAM) const addressPattern = /\d+\s+[\w\s]+,\s+[A-Z]{2}\s+\d{5}/; const hasAddress = addressPattern.test(emailHtml); // Check consent for GDPR (query your database) const user = await db.users.findOne({ email }); const hasConsent = user?.marketing_consent === true; // Determine region (for GDPR compliance) const region = user?.country || 'US'; const isEU = ['DE', 'FR', 'IT', 'ES', 'UK'].includes(region); if (isEU && !hasConsent) { throw new Error('Cannot send to EU user without explicit consent'); } if (!hasUnsubscribe || !hasAddress) { throw new Error('Email missing required compliance elements'); } return { hasUnsubscribeLink: hasUnsubscribe, hasPhysicalAddress: hasAddress, hasConsentRecord: hasConsent, recipientRegion: region, }; } // Use before sending await checkCompliance(customer.email, emailHtml);
Rate Limiting & Send Throttling
Email providers (SendGrid, Mailgun) have rate limits. SendGrid free tier: 100 emails/day. Paid: 40k-120k/month. Throttle sends to avoid hitting limits. Use queues (Redis, RabbitMQ) to manage send rate.
import asyncio from collections import deque from datetime import datetime, timedelta class RateLimiter: def __init__(self, max_per_minute: int): self.max_per_minute = max_per_minute self.requests = deque() async def acquire(self): """Wait if rate limit exceeded""" now = datetime.now() # Remove requests older than 1 minute while self.requests and self.requests[0] < now - timedelta(minutes=1): self.requests.popleft() # If at limit, wait if len(self.requests) >= self.max_per_minute: sleep_time = (self.requests[0] + timedelta(minutes=1) - now).total_seconds() await asyncio.sleep(sleep_time) return await self.acquire() self.requests.append(now) return True # Usage with SendGrid (100/minute limit) limiter = RateLimiter(max_per_minute=100) async def send_batch(customers: list): for customer in customers: await limiter.acquire() # Wait if needed await send_email(customer) print(f"Sent to {customer['name']}") await send_batch(customer_list)
Dynamic Content Rendering & Merge Tags
LLM-generated content needs to be converted to email template format. Replace dynamic elements with merge tags for ESP (Email Service Provider). Handle missing data gracefully (fallback to defaults).
interface MergeTag { key: string; value: string; fallback: string; } function renderTemplate( template: string, customer: any ): string { // Define merge tags with fallbacks const mergeTags: MergeTag[] = [ { key: '{{first_name}}', value: customer.name?.split(' ')[0] || '', fallback: 'there', }, { key: '{{company}}', value: customer.company || '', fallback: 'your company', }, { key: '{{pricing_views}}', value: customer.pricing_views?.toString() || '', fallback: 'a few times', }, ]; let rendered = template; // Replace merge tags for (const tag of mergeTags) { const value = tag.value || tag.fallback; rendered = rendered.replace(new RegExp(tag.key, 'g'), value); } return rendered; } // Convert LLM output to template const llmOutput = "Hey {{first_name}}, I saw {{company}} checked pricing {{pricing_views}}..."; const rendered = renderTemplate(llmOutput, customer); // Output: "Hey Jessica, I saw StartupCo checked pricing 5 times..."
A/B Test Statistical Significance
Don't declare a winner too early. Need minimum 100 sends per variant and 95% confidence. Use proper statistical tests (chi-square for open rates, t-test for click rates). Stop test when winner is clear.
from scipy import stats import math def calculate_ab_test_significance( variant_a_opens: int, variant_a_sends: int, variant_b_opens: int, variant_b_sends: int ) -> dict: """Determine if A/B test has statistical significance""" # Check minimum sample size if variant_a_sends < 100 or variant_b_sends < 100: return { 'significant': False, 'reason': 'Insufficient sample size (need 100+ per variant)' } # Calculate open rates rate_a = variant_a_opens / variant_a_sends rate_b = variant_b_opens / variant_b_sends # Chi-square test for proportions observed = [[variant_a_opens, variant_a_sends - variant_a_opens], [variant_b_opens, variant_b_sends - variant_b_opens]] chi2, p_value = stats.chi2_contingency(observed)[:2] # 95% confidence = p_value < 0.05 is_significant = p_value < 0.05 # Calculate lift lift_percent = ((rate_b - rate_a) / rate_a) * 100 if rate_a > 0 else 0 return { 'significant': is_significant, 'p_value': p_value, 'confidence': (1 - p_value) * 100, 'variant_a_rate': rate_a, 'variant_b_rate': rate_b, 'winner': 'B' if is_significant and rate_b > rate_a else 'A' if is_significant else 'None', 'lift_percent': lift_percent } # Usage result = calculate_ab_test_significance( variant_a_opens=45, variant_a_sends=200, variant_b_opens=62, variant_b_sends=200 ) if result['significant']: print(f"Winner: Variant {result['winner']} (+{result['lift_percent']:.1f}% lift)") else: print("Keep testing - no clear winner yet")
Cost Calculator
Manual Email Personalization
Limitations:
- • Max 50-100 personalized emails per day
- • No real-time behavioral triggers
- • Manual A/B test management
- • Generic segmentation (not 1:1)
Automated Personalization Engine
Benefits:
- ✓ 30,000+ personalized emails per month (600x more)
- ✓ Real-time behavioral triggers
- ✓ Automatic A/B testing with winner selection
- ✓ True 1:1 personalization at scale