The Problem
On Monday you tested the 3 prompts in ChatGPT. You saw how one blog post becomes LinkedIn thought leadership, Twitter threads, and Instagram carousels. But here's the reality: you can't manually rewrite and post to 10 platforms every single day. Your team spends more time reformatting than creating.
See It Work
Watch one blog post transform into platform-specific content automatically. This is what you'll build.
The Code
Three levels: start simple, add reliability, then scale to production. Pick where you are.
Level 1: Simple API Calls
Good for: 0-100 posts/day | Setup time: 30 minutes
# Simple Content Distribution (0-100 posts/day) import openai import requests import json from datetime import datetime, timedelta def distribute_content(blog_content: str, target_platforms: list) -> dict: """Transform one blog post into platform-specific content""" # Step 1: Extract key information extraction_prompt = f"""Analyze this blog post and extract: - Main topic - Key points (list) - Target audience - Tone (professional/casual/technical) - Optimal platforms for distribution - Relevant hashtags Blog content: {blog_content} Output as valid JSON.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": extraction_prompt}], temperature=0.3 ) analysis = json.loads(response.choices[0].message.content) # Step 2: Generate platform-specific versions posts = {} if "LinkedIn" in target_platforms: linkedin_prompt = f"""Transform this blog into a LinkedIn post: - Professional tone - 1500-2000 characters - Include line breaks for readability - Add relevant hashtags - End with CTA Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Output as plain text.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": linkedin_prompt}], temperature=0.7 ) posts['linkedin'] = response.choices[0].message.content if "Twitter" in target_platforms: twitter_prompt = f"""Transform this blog into a Twitter thread: - 6-8 tweets - Each tweet under 280 characters - Start with hook tweet - End with CTA - Include relevant hashtags Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Output as JSON array of tweets.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": twitter_prompt}], temperature=0.7 ) posts['twitter'] = json.loads(response.choices[0].message.content) if "Facebook" in target_platforms: facebook_prompt = f"""Transform this blog into a Facebook post: - Conversational tone - 1000-1500 characters - Engaging opening - Include emojis - End with question or CTA Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Output as plain text.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": facebook_prompt}], temperature=0.7 ) posts['facebook'] = response.choices[0].message.content # Step 3: Schedule posts (using Buffer API as example) scheduled = [] for platform, content in posts.items(): # Schedule for optimal time (simplified) post_time = datetime.now() + timedelta(hours=2) # Buffer API call buffer_response = requests.post( 'https://api.bufferapp.com/1/updates/create.json', data={ 'access_token': 'YOUR_BUFFER_TOKEN', 'profile_ids[]': get_profile_id(platform), 'text': content, 'scheduled_at': post_time.timestamp() } ) scheduled.append({ 'platform': platform, 'post_time': post_time.isoformat(), 'status': 'scheduled', 'buffer_id': buffer_response.json().get('id') }) return { 'analysis': analysis, 'posts': posts, 'scheduled': scheduled } def get_profile_id(platform: str) -> str: """Map platform to Buffer profile ID""" profiles = { 'linkedin': '5f8a9b2c3d4e5f6g7h8i9j0k', 'twitter': '6g7h8i9j0k1l2m3n4o5p6q7r', 'facebook': '7h8i9j0k1l2m3n4o5p6q7r8s' } return profiles.get(platform.lower(), '') # Usage blog_content = """Your blog post content here...""" result = distribute_content( blog_content, target_platforms=['LinkedIn', 'Twitter', 'Facebook'] ) print(f"Generated {len(result['posts'])} platform-specific posts") print(f"Scheduled {len(result['scheduled'])} posts")
Level 2: With Error Handling & Platform APIs
Good for: 100-1,000 posts/day | Setup time: 2 hours
// Content Distribution with Error Handling (100-1000 posts/day) import Anthropic from '@anthropic-ai/sdk'; import axios from 'axios'; interface ContentAnalysis { main_topic: string; key_points: string[]; target_audience: string; tone: string; optimal_platforms: string[]; hashtags: string[]; } interface PlatformPost { platform: string; content: string; scheduled_time: string; post_id?: string; error?: string; } class ContentDistributor { private anthropic: Anthropic; private maxRetries = 3; constructor() { this.anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY!, }); } async analyzeContent(blogContent: string): Promise<ContentAnalysis> { return this.retryWithBackoff(async () => { const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [ { role: 'user', content: `Analyze this blog post and extract key information as JSON: ${blogContent} Return: main_topic, key_points (array), target_audience, tone, optimal_platforms (array), hashtags (array)`, }, ], }); const content = response.content[0]; if (content.type !== 'text') throw new Error('Invalid response'); return JSON.parse(content.text); }); } async generateLinkedInPost( blogContent: string, analysis: ContentAnalysis ): Promise<string> { return this.retryWithBackoff(async () => { const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [ { role: 'user', content: `Transform this blog into a LinkedIn post (1500-2000 chars): Blog: ${blogContent} Key points: ${JSON.stringify(analysis.key_points)} Requirements: - Professional tone - Line breaks for readability - Include hashtags: ${analysis.hashtags.join(', ')} - End with CTA Output plain text only.`, }, ], }); const content = response.content[0]; if (content.type !== 'text') throw new Error('Invalid response'); return content.text; }); } async generateTwitterThread( blogContent: string, analysis: ContentAnalysis ): Promise<string[]> { return this.retryWithBackoff(async () => { const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 2048, messages: [ { role: 'user', content: `Transform this blog into a Twitter thread (6-8 tweets): Blog: ${blogContent} Key points: ${JSON.stringify(analysis.key_points)} Requirements: - Each tweet under 280 characters - Start with hook - End with CTA - Include hashtags: ${analysis.hashtags.slice(0, 3).join(', ')} Output as JSON array of tweet strings.`, }, ], }); const content = response.content[0]; if (content.type !== 'text') throw new Error('Invalid response'); return JSON.parse(content.text); }); } async postToLinkedIn(content: string): Promise<PlatformPost> { try { const response = await axios.post( 'https://api.linkedin.com/v2/ugcPosts', { author: `urn:li:person:${process.env.LINKEDIN_PERSON_ID}`, lifecycleState: 'PUBLISHED', specificContent: { 'com.linkedin.ugc.ShareContent': { shareCommentary: { text: content, }, shareMediaCategory: 'NONE', }, }, visibility: { 'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC', }, }, { headers: { Authorization: `Bearer ${process.env.LINKEDIN_ACCESS_TOKEN}`, 'Content-Type': 'application/json', 'X-Restli-Protocol-Version': '2.0.0', }, } ); return { platform: 'LinkedIn', content, scheduled_time: new Date().toISOString(), post_id: response.data.id, }; } catch (error: any) { return { platform: 'LinkedIn', content, scheduled_time: new Date().toISOString(), error: error.message, }; } } async postToTwitter(tweets: string[]): Promise<PlatformPost> { try { // Post first tweet const firstTweet = await axios.post( 'https://api.twitter.com/2/tweets', { text: tweets[0] }, { headers: { Authorization: `Bearer ${process.env.TWITTER_BEARER_TOKEN}`, 'Content-Type': 'application/json', }, } ); let previousId = firstTweet.data.data.id; // Post remaining tweets as replies for (let i = 1; i < tweets.length; i++) { const reply = await axios.post( 'https://api.twitter.com/2/tweets', { text: tweets[i], reply: { in_reply_to_tweet_id: previousId }, }, { headers: { Authorization: `Bearer ${process.env.TWITTER_BEARER_TOKEN}`, 'Content-Type': 'application/json', }, } ); previousId = reply.data.data.id; } return { platform: 'Twitter', content: tweets.join('\n\n'), scheduled_time: new Date().toISOString(), post_id: firstTweet.data.data.id, }; } catch (error: any) { return { platform: 'Twitter', content: tweets.join('\n\n'), scheduled_time: new Date().toISOString(), error: error.message, }; } } async distributeContent( blogContent: string, platforms: string[] ): Promise<PlatformPost[]> { // Step 1: Analyze content const analysis = await this.analyzeContent(blogContent); console.log('Content analyzed:', analysis.main_topic); // Step 2: Generate platform-specific content const results: PlatformPost[] = []; if (platforms.includes('LinkedIn')) { const linkedinContent = await this.generateLinkedInPost( blogContent, analysis ); const post = await this.postToLinkedIn(linkedinContent); results.push(post); } if (platforms.includes('Twitter')) { const twitterThread = await this.generateTwitterThread( blogContent, analysis ); const post = await this.postToTwitter(twitterThread); results.push(post); } return results; } private async retryWithBackoff<T>( fn: () => Promise<T>, retries = this.maxRetries ): Promise<T> { let lastError: Error | null = null; for (let attempt = 0; attempt < retries; attempt++) { try { return await Promise.race([ fn(), new Promise<never>((_, reject) => setTimeout(() => reject(new Error('Timeout')), 60000) ), ]); } catch (error) { lastError = error as Error; if (attempt < retries - 1) { await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempt) * 1000) ); } } } throw lastError; } } // Usage const distributor = new ContentDistributor(); const blogContent = `Your blog post content here...`; const results = await distributor.distributeContent(blogContent, [ 'LinkedIn', 'Twitter', ]); console.log(`Posted to ${results.length} platforms`); results.forEach((r) => { if (r.error) { console.error(`${r.platform} failed:`, r.error); } else { console.log(`${r.platform} posted:`, r.post_id); } });
Level 3: Production Pattern with Queue & Analytics
Good for: 1,000+ posts/day | Setup time: 1 day
# Production Content Distribution (1000+ posts/day) from langgraph.graph import Graph, END from typing import TypedDict, List import openai import boto3 import json from datetime import datetime, timedelta import requests from redis import Redis class DistributionState(TypedDict): blog_content: str analysis: dict platform_posts: dict scheduled_posts: List[dict] analytics: dict retry_count: int class ContentDistributionPipeline: def __init__(self): self.openai = openai self.sqs = boto3.client('sqs') self.cloudwatch = boto3.client('cloudwatch') self.redis = Redis(host='localhost', port=6379, db=0) self.queue_url = 'https://sqs.us-east-1.amazonaws.com/123456/content-queue' def analyze_node(self, state: DistributionState) -> DistributionState: """Analyze blog content for distribution""" try: response = openai.chat.completions.create( model="gpt-4", messages=[{ "role": "user", "content": f"""Analyze this blog post and extract: - Main topic - Key points (list) - Target audience - Tone - Optimal platforms - Relevant hashtags - Best posting times per platform Blog: {state['blog_content']} Output as JSON.""" }], temperature=0.3 ) state['analysis'] = json.loads(response.choices[0].message.content) # Cache analysis for 24 hours cache_key = f"analysis:{hash(state['blog_content'])}" self.redis.setex(cache_key, 86400, json.dumps(state['analysis'])) return state except Exception as e: self.log_error('analyze_node', str(e)) raise def generate_posts_node(self, state: DistributionState) -> DistributionState: """Generate platform-specific posts in parallel""" platforms = state['analysis'].get('optimal_platforms', []) platform_posts = {} # Use ThreadPoolExecutor for parallel generation from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=4) as executor: futures = {} if 'LinkedIn' in platforms: futures['LinkedIn'] = executor.submit( self.generate_linkedin_post, state['blog_content'], state['analysis'] ) if 'Twitter' in platforms: futures['Twitter'] = executor.submit( self.generate_twitter_thread, state['blog_content'], state['analysis'] ) if 'Facebook' in platforms: futures['Facebook'] = executor.submit( self.generate_facebook_post, state['blog_content'], state['analysis'] ) if 'Instagram' in platforms: futures['Instagram'] = executor.submit( self.generate_instagram_carousel, state['blog_content'], state['analysis'] ) for platform, future in futures.items(): try: platform_posts[platform] = future.result(timeout=60) except Exception as e: self.log_error(f'generate_{platform.lower()}', str(e)) platform_posts[platform] = {'error': str(e)} state['platform_posts'] = platform_posts return state def schedule_posts_node(self, state: DistributionState) -> DistributionState: """Schedule posts to SQS queue for async posting""" scheduled_posts = [] for platform, content in state['platform_posts'].items(): if 'error' in content: continue # Get optimal posting time from analysis post_time = self.get_optimal_post_time( platform, state['analysis'].get('best_posting_times', {}) ) # Send to SQS for async processing message = { 'platform': platform, 'content': content, 'post_time': post_time.isoformat(), 'blog_id': hash(state['blog_content']), 'retry_count': 0 } try: response = self.sqs.send_message( QueueUrl=self.queue_url, MessageBody=json.dumps(message), DelaySeconds=self.calculate_delay(post_time) ) scheduled_posts.append({ 'platform': platform, 'post_time': post_time.isoformat(), 'message_id': response['MessageId'], 'status': 'queued' }) except Exception as e: self.log_error('schedule_posts', str(e)) scheduled_posts.append({ 'platform': platform, 'post_time': post_time.isoformat(), 'status': 'failed', 'error': str(e) }) state['scheduled_posts'] = scheduled_posts return state def track_analytics_node(self, state: DistributionState) -> DistributionState: """Send metrics to CloudWatch""" try: # Send custom metrics self.cloudwatch.put_metric_data( Namespace='ContentDistribution', MetricData=[ { 'MetricName': 'PostsScheduled', 'Value': len(state['scheduled_posts']), 'Unit': 'Count', 'Timestamp': datetime.utcnow() }, { 'MetricName': 'PlatformsTargeted', 'Value': len(state['platform_posts']), 'Unit': 'Count', 'Timestamp': datetime.utcnow() } ] ) state['analytics'] = { 'total_posts': len(state['scheduled_posts']), 'platforms': list(state['platform_posts'].keys()), 'timestamp': datetime.utcnow().isoformat() } except Exception as e: self.log_error('track_analytics', str(e)) return state def generate_linkedin_post(self, blog_content: str, analysis: dict) -> dict: """Generate LinkedIn post""" response = openai.chat.completions.create( model="gpt-4", messages=[{ "role": "user", "content": f"""Transform into LinkedIn post (1500-2000 chars): Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Hashtags: {', '.join(analysis['hashtags'])} Requirements: - Professional tone - Line breaks - CTA at end Output plain text.""" }], temperature=0.7 ) return { 'content': response.choices[0].message.content, 'format': 'long_form_post', 'hashtags': analysis['hashtags'][:5] } def generate_twitter_thread(self, blog_content: str, analysis: dict) -> dict: """Generate Twitter thread""" response = openai.chat.completions.create( model="gpt-4", messages=[{ "role": "user", "content": f"""Transform into Twitter thread (6-8 tweets): Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Each tweet under 280 chars. Output as JSON array.""" }], temperature=0.7 ) return { 'tweets': json.loads(response.choices[0].message.content), 'format': 'thread', 'hashtags': analysis['hashtags'][:3] } def generate_facebook_post(self, blog_content: str, analysis: dict) -> dict: """Generate Facebook post""" response = openai.chat.completions.create( model="gpt-4", messages=[{ "role": "user", "content": f"""Transform into Facebook post (1000-1500 chars): Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Conversational tone, emojis, CTA. Output plain text.""" }], temperature=0.7 ) return { 'content': response.choices[0].message.content, 'format': 'story_post' } def generate_instagram_carousel(self, blog_content: str, analysis: dict) -> dict: """Generate Instagram carousel content""" response = openai.chat.completions.create( model="gpt-4", messages=[{ "role": "user", "content": f"""Transform into Instagram carousel (6 slides): Blog: {blog_content} Key points: {json.dumps(analysis['key_points'])} Each slide: short text + design suggestion. Output as JSON array.""" }], temperature=0.7 ) return { 'slides': json.loads(response.choices[0].message.content), 'format': 'carousel', 'hashtags': analysis['hashtags'][:10] } def get_optimal_post_time(self, platform: str, best_times: dict) -> datetime: """Calculate optimal posting time""" # Use platform-specific best times or defaults default_times = { 'LinkedIn': {'day_offset': 1, 'hour': 9}, # Next day 9 AM 'Twitter': {'day_offset': 1, 'hour': 13}, # Next day 1 PM 'Facebook': {'day_offset': 2, 'hour': 14}, # 2 days 2 PM 'Instagram': {'day_offset': 2, 'hour': 12} # 2 days 12 PM } timing = best_times.get(platform, default_times.get(platform, {'day_offset': 1, 'hour': 10})) return datetime.now() + timedelta(days=timing['day_offset'], hours=timing['hour']) def calculate_delay(self, post_time: datetime) -> int: """Calculate SQS delay in seconds (max 900)""" delay = (post_time - datetime.now()).total_seconds() return min(int(delay), 900) # SQS max delay is 15 minutes def log_error(self, node: str, error: str): """Log errors to CloudWatch""" self.cloudwatch.put_metric_data( Namespace='ContentDistribution', MetricData=[{ 'MetricName': 'Errors', 'Value': 1, 'Unit': 'Count', 'Dimensions': [{'Name': 'Node', 'Value': node}] }] ) print(f"Error in {node}: {error}") def build_graph(self) -> Graph: """Build LangGraph pipeline""" graph = Graph() graph.add_node("analyze", self.analyze_node) graph.add_node("generate_posts", self.generate_posts_node) graph.add_node("schedule_posts", self.schedule_posts_node) graph.add_node("track_analytics", self.track_analytics_node) graph.set_entry_point("analyze") graph.add_edge("analyze", "generate_posts") graph.add_edge("generate_posts", "schedule_posts") graph.add_edge("schedule_posts", "track_analytics") graph.add_edge("track_analytics", END) return graph.compile() # Usage pipeline = ContentDistributionPipeline() graph = pipeline.build_graph() initial_state = { 'blog_content': blog_content, 'analysis': {}, 'platform_posts': {}, 'scheduled_posts': [], 'analytics': {}, 'retry_count': 0 } result = graph.invoke(initial_state) print(f"Distributed to {len(result['scheduled_posts'])} platforms") print(f"Analytics: {result['analytics']}")
When to Level Up
Start: Simple API Calls
0-100 posts/day
- Sequential API calls to OpenAI/Claude
- Manual scheduling via Buffer/Hootsuite
- Basic error logging (print statements)
- Direct API calls to LinkedIn/Twitter
Scale: Add Reliability
100-1,000 posts/day
- Automatic retries with exponential backoff
- Proper error handling and logging (Winston/Sentry)
- Rate limiting per platform (LinkedIn 100/day, Twitter 300/day)
- Timeouts and circuit breakers (60s max per operation)
- Parallel content generation (ThreadPoolExecutor)
Production: Queue & Analytics
1,000-5,000 posts/day
- SQS/RabbitMQ for async posting (decoupled from generation)
- CloudWatch/Datadog for real-time monitoring
- Redis caching for content analysis (avoid re-analyzing)
- LangGraph for complex workflows (conditional platform selection)
- Retry queues with dead-letter handling
Enterprise: Multi-Tenant System
5,000+ posts/day
- Multi-tenant architecture (separate accounts per client)
- Load balancing across LLM providers (OpenAI โ Anthropic โ Cohere fallback)
- A/B testing for content variations (track engagement per version)
- Real-time engagement tracking (webhook listeners for likes/comments)
- Custom ML models for platform optimization (learn best posting times per account)
Marketing-Specific Gotchas
The code examples above work. But marketing automation has unique challenges you need to handle.
Platform Rate Limits & API Quotas
Each platform has different rate limits. LinkedIn allows 100 posts/day per user, Twitter 300 tweets/day, Facebook varies by page. Exceed these and you get blocked. Track usage per platform and implement queuing.
from datetime import datetime, timedelta from collections import defaultdict class RateLimiter: def __init__(self): self.limits = { 'linkedin': {'max': 100, 'window': 86400}, # 100 per day 'twitter': {'max': 300, 'window': 86400}, # 300 per day 'facebook': {'max': 200, 'window': 86400}, # 200 per day } self.usage = defaultdict(list) def can_post(self, platform: str) -> bool: now = datetime.now() limit = self.limits.get(platform.lower()) if not limit: return True # Remove old entries outside window cutoff = now - timedelta(seconds=limit['window']) self.usage[platform] = [ ts for ts in self.usage[platform] if ts > cutoff ] # Check if under limit return len(self.usage[platform]) < limit['max'] def record_post(self, platform: str): self.usage[platform].append(datetime.now()) # Usage limiter = RateLimiter() if limiter.can_post('linkedin'): post_to_linkedin(content) limiter.record_post('linkedin') else: # Queue for later queue_for_tomorrow(content, 'linkedin')
Character Limits & Formatting Per Platform
LinkedIn allows 3000 chars but best engagement is 1500-2000. Twitter is 280 chars per tweet. Instagram captions max at 2200 chars. Each platform also has different formatting rules (hashtags, mentions, links). Validate before posting.
interface PlatformRules { maxChars: number; maxHashtags: number; supportsFormatting: boolean; linkHandling: 'inline' | 'separate' | 'shortened'; } const PLATFORM_RULES: Record<string, PlatformRules> = { linkedin: { maxChars: 3000, maxHashtags: 5, supportsFormatting: true, linkHandling: 'inline', }, twitter: { maxChars: 280, maxHashtags: 3, supportsFormatting: false, linkHandling: 'shortened', }, instagram: { maxChars: 2200, maxHashtags: 30, supportsFormatting: false, linkHandling: 'separate', }, }; function validateContent( platform: string, content: string, hashtags: string[] ): { valid: boolean; errors: string[] } { const rules = PLATFORM_RULES[platform.toLowerCase()]; const errors: string[] = []; if (!rules) { return { valid: false, errors: ['Unknown platform'] }; } if (content.length > rules.maxChars) { errors.push( `Content too long: ${content.length}/${rules.maxChars} chars` ); } if (hashtags.length > rules.maxHashtags) { errors.push( `Too many hashtags: ${hashtags.length}/${rules.maxHashtags}` ); } return { valid: errors.length === 0, errors }; } // Usage const validation = validateContent('twitter', tweetContent, ['AI', 'Marketing']); if (!validation.valid) { console.error('Validation failed:', validation.errors); // Regenerate with stricter limits }
Optimal Posting Times Vary by Industry & Audience
B2B content performs best on LinkedIn Tuesday-Thursday 8-10 AM. B2C does better on Facebook/Instagram evenings and weekends. Your audience timezone matters. Don't just schedule randomly - use analytics to find YOUR best times.
from datetime import datetime, timedelta import pytz def get_optimal_post_time( platform: str, audience_type: str, timezone: str = 'America/New_York' ) -> datetime: """Calculate optimal posting time based on platform and audience""" tz = pytz.timezone(timezone) now = datetime.now(tz) # B2B optimal times b2b_times = { 'linkedin': { 'days': [1, 2, 3], # Tue-Thu (0=Mon) 'hours': [8, 9, 10] }, 'twitter': { 'days': [1, 2, 3, 4], # Tue-Fri 'hours': [12, 13, 17, 18] } } # B2C optimal times b2c_times = { 'facebook': { 'days': [2, 3, 5, 6], # Wed, Thu, Sat, Sun 'hours': [13, 14, 19, 20] }, 'instagram': { 'days': [0, 2, 4], # Mon, Wed, Fri 'hours': [11, 12, 19, 20] } } times = b2b_times if audience_type == 'b2b' else b2c_times platform_times = times.get(platform.lower(), {'days': [1, 2, 3], 'hours': [9, 10]}) # Find next optimal day current_day = now.weekday() days_ahead = [ (d - current_day) % 7 for d in platform_times['days'] ] next_day = min([d for d in days_ahead if d > 0] or [min(days_ahead) + 7]) # Pick first optimal hour target_hour = platform_times['hours'][0] target_time = now + timedelta(days=next_day) target_time = target_time.replace(hour=target_hour, minute=0, second=0) return target_time # Usage post_time = get_optimal_post_time('linkedin', 'b2b', 'America/Los_Angeles') print(f"Schedule for: {post_time}")
Image/Video Requirements Per Platform
LinkedIn images: 1200x627px. Twitter: 1200x675px. Instagram: 1080x1080px (square) or 1080x1350px (portrait). Facebook: 1200x630px. Wrong aspect ratios get cropped badly. Generate or resize images programmatically.
from PIL import Image import io PLATFORM_IMAGE_SPECS = { 'linkedin': {'width': 1200, 'height': 627, 'format': 'JPEG'}, 'twitter': {'width': 1200, 'height': 675, 'format': 'JPEG'}, 'instagram': {'width': 1080, 'height': 1080, 'format': 'JPEG'}, 'facebook': {'width': 1200, 'height': 630, 'format': 'JPEG'}, } def resize_image_for_platform( image_path: str, platform: str ) -> bytes: """Resize image to platform specs""" specs = PLATFORM_IMAGE_SPECS.get(platform.lower()) if not specs: raise ValueError(f"Unknown platform: {platform}") # Open image img = Image.open(image_path) # Calculate aspect ratios target_ratio = specs['width'] / specs['height'] img_ratio = img.width / img.height # Crop to target aspect ratio if img_ratio > target_ratio: # Image is wider - crop width new_width = int(img.height * target_ratio) left = (img.width - new_width) // 2 img = img.crop((left, 0, left + new_width, img.height)) else: # Image is taller - crop height new_height = int(img.width / target_ratio) top = (img.height - new_height) // 2 img = img.crop((0, top, img.width, top + new_height)) # Resize to exact dimensions img = img.resize((specs['width'], specs['height']), Image.LANCZOS) # Convert to bytes buffer = io.BytesIO() img.save(buffer, format=specs['format'], quality=90) return buffer.getvalue() # Usage linkedin_image = resize_image_for_platform('original.jpg', 'linkedin') twitter_image = resize_image_for_platform('original.jpg', 'twitter')
Tracking UTM Parameters & Campaign Attribution
You need to track which platform drives traffic and conversions. Add UTM parameters to all links: utm_source (linkedin/twitter), utm_medium (social), utm_campaign (blog_post_title). Use URL shorteners that preserve tracking.
interface UTMParams { source: string; medium: string; campaign: string; content?: string; term?: string; } function addUTMParams(url: string, params: UTMParams): string { const urlObj = new URL(url); urlObj.searchParams.set('utm_source', params.source); urlObj.searchParams.set('utm_medium', params.medium); urlObj.searchParams.set('utm_campaign', params.campaign); if (params.content) { urlObj.searchParams.set('utm_content', params.content); } if (params.term) { urlObj.searchParams.set('utm_term', params.term); } return urlObj.toString(); } async function shortenURL(longUrl: string): Promise<string> { // Use Bitly API to shorten while preserving UTM params const response = await fetch('https://api-ssl.bitly.com/v4/shorten', { method: 'POST', headers: { Authorization: `Bearer ${process.env.BITLY_ACCESS_TOKEN}`, 'Content-Type': 'application/json', }, body: JSON.stringify({ long_url: longUrl }), }); const data = await response.json(); return data.link; } // Usage const blogUrl = 'https://example.com/blog/ai-customer-service'; const linkedinUrl = addUTMParams(blogUrl, { source: 'linkedin', medium: 'social', campaign: 'ai_customer_service_blog', content: 'organic_post', }); const shortUrl = await shortenURL(linkedinUrl); console.log('Trackable short URL:', shortUrl); // Result: https://bit.ly/3xYz (with UTM params preserved)
Cost Calculator
Manual Cross-Posting Process
Limitations:
- โข Can't scale beyond 3 posts/week
- โข Inconsistent posting times
- โข Copy-paste formatting errors
- โข No A/B testing
- โข Manual UTM tracking
Automated Distribution Pipeline
Benefits:
- โ Scale to 100+ posts/week with no added cost
- โ Consistent optimal posting times
- โ Zero formatting errors
- โ Built-in A/B testing
- โ Automatic UTM tracking
- โ Real-time analytics dashboard