โ† Monday's Prompts

Automate Content Distribution ๐Ÿš€

Turn Monday's 3 prompts into a multi-channel posting machine

July 15, 2025
๐Ÿ“ฑ Marketing๐Ÿ Python + TypeScriptโšก 10 โ†’ 1000 posts/day

The Problem

On Monday you tested the 3 prompts in ChatGPT. You saw how one blog post becomes LinkedIn thought leadership, Twitter threads, and Instagram carousels. But here's the reality: you can't manually rewrite and post to 10 platforms every single day. Your team spends more time reformatting than creating.

3.5 hours
Per day manually posting to all platforms
35% error
From copy-paste formatting mistakes
Can't scale
Beyond 10 posts/day across channels

See It Work

Watch one blog post transform into platform-specific content automatically. This is what you'll build.

The Code

Three levels: start simple, add reliability, then scale to production. Pick where you are.

Level 1: Simple API Calls

Good for: 0-100 posts/day | Setup time: 30 minutes

# Simple Content Distribution (0-100 posts/day)
import openai
import requests
import json
from datetime import datetime, timedelta

def distribute_content(blog_content: str, target_platforms: list) -> dict:
    """Transform one blog post into platform-specific content"""
    
    # Step 1: Extract key information
    extraction_prompt = f"""Analyze this blog post and extract:
- Main topic
- Key points (list)
- Target audience
- Tone (professional/casual/technical)
- Optimal platforms for distribution
- Relevant hashtags

Blog content:
{blog_content}

Output as valid JSON."""

    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": extraction_prompt}],
        temperature=0.3
    )
    
    analysis = json.loads(response.choices[0].message.content)
    
    # Step 2: Generate platform-specific versions
    posts = {}
    
    if "LinkedIn" in target_platforms:
        linkedin_prompt = f"""Transform this blog into a LinkedIn post:
- Professional tone
- 1500-2000 characters
- Include line breaks for readability
- Add relevant hashtags
- End with CTA

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}

Output as plain text."""
        
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": linkedin_prompt}],
            temperature=0.7
        )
        posts['linkedin'] = response.choices[0].message.content
    
    if "Twitter" in target_platforms:
        twitter_prompt = f"""Transform this blog into a Twitter thread:
- 6-8 tweets
- Each tweet under 280 characters
- Start with hook tweet
- End with CTA
- Include relevant hashtags

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}

Output as JSON array of tweets."""
        
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": twitter_prompt}],
            temperature=0.7
        )
        posts['twitter'] = json.loads(response.choices[0].message.content)
    
    if "Facebook" in target_platforms:
        facebook_prompt = f"""Transform this blog into a Facebook post:
- Conversational tone
- 1000-1500 characters
- Engaging opening
- Include emojis
- End with question or CTA

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}

Output as plain text."""
        
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": facebook_prompt}],
            temperature=0.7
        )
        posts['facebook'] = response.choices[0].message.content
    
    # Step 3: Schedule posts (using Buffer API as example)
    scheduled = []
    
    for platform, content in posts.items():
        # Schedule for optimal time (simplified)
        post_time = datetime.now() + timedelta(hours=2)
        
        # Buffer API call
        buffer_response = requests.post(
            'https://api.bufferapp.com/1/updates/create.json',
            data={
                'access_token': 'YOUR_BUFFER_TOKEN',
                'profile_ids[]': get_profile_id(platform),
                'text': content,
                'scheduled_at': post_time.timestamp()
            }
        )
        
        scheduled.append({
            'platform': platform,
            'post_time': post_time.isoformat(),
            'status': 'scheduled',
            'buffer_id': buffer_response.json().get('id')
        })
    
    return {
        'analysis': analysis,
        'posts': posts,
        'scheduled': scheduled
    }

def get_profile_id(platform: str) -> str:
    """Map platform to Buffer profile ID"""
    profiles = {
        'linkedin': '5f8a9b2c3d4e5f6g7h8i9j0k',
        'twitter': '6g7h8i9j0k1l2m3n4o5p6q7r',
        'facebook': '7h8i9j0k1l2m3n4o5p6q7r8s'
    }
    return profiles.get(platform.lower(), '')

# Usage
blog_content = """Your blog post content here..."""

result = distribute_content(
    blog_content,
    target_platforms=['LinkedIn', 'Twitter', 'Facebook']
)

print(f"Generated {len(result['posts'])} platform-specific posts")
print(f"Scheduled {len(result['scheduled'])} posts")

Level 2: With Error Handling & Platform APIs

Good for: 100-1,000 posts/day | Setup time: 2 hours

// Content Distribution with Error Handling (100-1000 posts/day)
import Anthropic from '@anthropic-ai/sdk';
import axios from 'axios';

interface ContentAnalysis {
  main_topic: string;
  key_points: string[];
  target_audience: string;
  tone: string;
  optimal_platforms: string[];
  hashtags: string[];
}

interface PlatformPost {
  platform: string;
  content: string;
  scheduled_time: string;
  post_id?: string;
  error?: string;
}

class ContentDistributor {
  private anthropic: Anthropic;
  private maxRetries = 3;

  constructor() {
    this.anthropic = new Anthropic({
      apiKey: process.env.ANTHROPIC_API_KEY!,
    });
  }

  async analyzeContent(blogContent: string): Promise<ContentAnalysis> {
    return this.retryWithBackoff(async () => {
      const response = await this.anthropic.messages.create({
        model: 'claude-3-5-sonnet-20241022',
        max_tokens: 2048,
        messages: [
          {
            role: 'user',
            content: `Analyze this blog post and extract key information as JSON:
${blogContent}

Return: main_topic, key_points (array), target_audience, tone, optimal_platforms (array), hashtags (array)`,
          },
        ],
      });

      const content = response.content[0];
      if (content.type !== 'text') throw new Error('Invalid response');
      return JSON.parse(content.text);
    });
  }

  async generateLinkedInPost(
    blogContent: string,
    analysis: ContentAnalysis
  ): Promise<string> {
    return this.retryWithBackoff(async () => {
      const response = await this.anthropic.messages.create({
        model: 'claude-3-5-sonnet-20241022',
        max_tokens: 2048,
        messages: [
          {
            role: 'user',
            content: `Transform this blog into a LinkedIn post (1500-2000 chars):

Blog: ${blogContent}

Key points: ${JSON.stringify(analysis.key_points)}

Requirements:
- Professional tone
- Line breaks for readability
- Include hashtags: ${analysis.hashtags.join(', ')}
- End with CTA

Output plain text only.`,
          },
        ],
      });

      const content = response.content[0];
      if (content.type !== 'text') throw new Error('Invalid response');
      return content.text;
    });
  }

  async generateTwitterThread(
    blogContent: string,
    analysis: ContentAnalysis
  ): Promise<string[]> {
    return this.retryWithBackoff(async () => {
      const response = await this.anthropic.messages.create({
        model: 'claude-3-5-sonnet-20241022',
        max_tokens: 2048,
        messages: [
          {
            role: 'user',
            content: `Transform this blog into a Twitter thread (6-8 tweets):

Blog: ${blogContent}

Key points: ${JSON.stringify(analysis.key_points)}

Requirements:
- Each tweet under 280 characters
- Start with hook
- End with CTA
- Include hashtags: ${analysis.hashtags.slice(0, 3).join(', ')}

Output as JSON array of tweet strings.`,
          },
        ],
      });

      const content = response.content[0];
      if (content.type !== 'text') throw new Error('Invalid response');
      return JSON.parse(content.text);
    });
  }

  async postToLinkedIn(content: string): Promise<PlatformPost> {
    try {
      const response = await axios.post(
        'https://api.linkedin.com/v2/ugcPosts',
        {
          author: `urn:li:person:${process.env.LINKEDIN_PERSON_ID}`,
          lifecycleState: 'PUBLISHED',
          specificContent: {
            'com.linkedin.ugc.ShareContent': {
              shareCommentary: {
                text: content,
              },
              shareMediaCategory: 'NONE',
            },
          },
          visibility: {
            'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC',
          },
        },
        {
          headers: {
            Authorization: `Bearer ${process.env.LINKEDIN_ACCESS_TOKEN}`,
            'Content-Type': 'application/json',
            'X-Restli-Protocol-Version': '2.0.0',
          },
        }
      );

      return {
        platform: 'LinkedIn',
        content,
        scheduled_time: new Date().toISOString(),
        post_id: response.data.id,
      };
    } catch (error: any) {
      return {
        platform: 'LinkedIn',
        content,
        scheduled_time: new Date().toISOString(),
        error: error.message,
      };
    }
  }

  async postToTwitter(tweets: string[]): Promise<PlatformPost> {
    try {
      // Post first tweet
      const firstTweet = await axios.post(
        'https://api.twitter.com/2/tweets',
        { text: tweets[0] },
        {
          headers: {
            Authorization: `Bearer ${process.env.TWITTER_BEARER_TOKEN}`,
            'Content-Type': 'application/json',
          },
        }
      );

      let previousId = firstTweet.data.data.id;

      // Post remaining tweets as replies
      for (let i = 1; i < tweets.length; i++) {
        const reply = await axios.post(
          'https://api.twitter.com/2/tweets',
          {
            text: tweets[i],
            reply: { in_reply_to_tweet_id: previousId },
          },
          {
            headers: {
              Authorization: `Bearer ${process.env.TWITTER_BEARER_TOKEN}`,
              'Content-Type': 'application/json',
            },
          }
        );
        previousId = reply.data.data.id;
      }

      return {
        platform: 'Twitter',
        content: tweets.join('\n\n'),
        scheduled_time: new Date().toISOString(),
        post_id: firstTweet.data.data.id,
      };
    } catch (error: any) {
      return {
        platform: 'Twitter',
        content: tweets.join('\n\n'),
        scheduled_time: new Date().toISOString(),
        error: error.message,
      };
    }
  }

  async distributeContent(
    blogContent: string,
    platforms: string[]
  ): Promise<PlatformPost[]> {
    // Step 1: Analyze content
    const analysis = await this.analyzeContent(blogContent);
    console.log('Content analyzed:', analysis.main_topic);

    // Step 2: Generate platform-specific content
    const results: PlatformPost[] = [];

    if (platforms.includes('LinkedIn')) {
      const linkedinContent = await this.generateLinkedInPost(
        blogContent,
        analysis
      );
      const post = await this.postToLinkedIn(linkedinContent);
      results.push(post);
    }

    if (platforms.includes('Twitter')) {
      const twitterThread = await this.generateTwitterThread(
        blogContent,
        analysis
      );
      const post = await this.postToTwitter(twitterThread);
      results.push(post);
    }

    return results;
  }

  private async retryWithBackoff<T>(
    fn: () => Promise<T>,
    retries = this.maxRetries
  ): Promise<T> {
    let lastError: Error | null = null;

    for (let attempt = 0; attempt < retries; attempt++) {
      try {
        return await Promise.race([
          fn(),
          new Promise<never>((_, reject) =>
            setTimeout(() => reject(new Error('Timeout')), 60000)
          ),
        ]);
      } catch (error) {
        lastError = error as Error;
        if (attempt < retries - 1) {
          await new Promise((resolve) =>
            setTimeout(resolve, Math.pow(2, attempt) * 1000)
          );
        }
      }
    }

    throw lastError;
  }
}

// Usage
const distributor = new ContentDistributor();

const blogContent = `Your blog post content here...`;

const results = await distributor.distributeContent(blogContent, [
  'LinkedIn',
  'Twitter',
]);

console.log(`Posted to ${results.length} platforms`);
results.forEach((r) => {
  if (r.error) {
    console.error(`${r.platform} failed:`, r.error);
  } else {
    console.log(`${r.platform} posted:`, r.post_id);
  }
});

Level 3: Production Pattern with Queue & Analytics

Good for: 1,000+ posts/day | Setup time: 1 day

# Production Content Distribution (1000+ posts/day)
from langgraph.graph import Graph, END
from typing import TypedDict, List
import openai
import boto3
import json
from datetime import datetime, timedelta
import requests
from redis import Redis

class DistributionState(TypedDict):
    blog_content: str
    analysis: dict
    platform_posts: dict
    scheduled_posts: List[dict]
    analytics: dict
    retry_count: int

class ContentDistributionPipeline:
    def __init__(self):
        self.openai = openai
        self.sqs = boto3.client('sqs')
        self.cloudwatch = boto3.client('cloudwatch')
        self.redis = Redis(host='localhost', port=6379, db=0)
        self.queue_url = 'https://sqs.us-east-1.amazonaws.com/123456/content-queue'
    
    def analyze_node(self, state: DistributionState) -> DistributionState:
        """Analyze blog content for distribution"""
        try:
            response = openai.chat.completions.create(
                model="gpt-4",
                messages=[{
                    "role": "user",
                    "content": f"""Analyze this blog post and extract:
- Main topic
- Key points (list)
- Target audience
- Tone
- Optimal platforms
- Relevant hashtags
- Best posting times per platform

Blog: {state['blog_content']}

Output as JSON."""
                }],
                temperature=0.3
            )
            
            state['analysis'] = json.loads(response.choices[0].message.content)
            
            # Cache analysis for 24 hours
            cache_key = f"analysis:{hash(state['blog_content'])}"
            self.redis.setex(cache_key, 86400, json.dumps(state['analysis']))
            
            return state
        except Exception as e:
            self.log_error('analyze_node', str(e))
            raise
    
    def generate_posts_node(self, state: DistributionState) -> DistributionState:
        """Generate platform-specific posts in parallel"""
        platforms = state['analysis'].get('optimal_platforms', [])
        platform_posts = {}
        
        # Use ThreadPoolExecutor for parallel generation
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = {}
            
            if 'LinkedIn' in platforms:
                futures['LinkedIn'] = executor.submit(
                    self.generate_linkedin_post,
                    state['blog_content'],
                    state['analysis']
                )
            
            if 'Twitter' in platforms:
                futures['Twitter'] = executor.submit(
                    self.generate_twitter_thread,
                    state['blog_content'],
                    state['analysis']
                )
            
            if 'Facebook' in platforms:
                futures['Facebook'] = executor.submit(
                    self.generate_facebook_post,
                    state['blog_content'],
                    state['analysis']
                )
            
            if 'Instagram' in platforms:
                futures['Instagram'] = executor.submit(
                    self.generate_instagram_carousel,
                    state['blog_content'],
                    state['analysis']
                )
            
            for platform, future in futures.items():
                try:
                    platform_posts[platform] = future.result(timeout=60)
                except Exception as e:
                    self.log_error(f'generate_{platform.lower()}', str(e))
                    platform_posts[platform] = {'error': str(e)}
        
        state['platform_posts'] = platform_posts
        return state
    
    def schedule_posts_node(self, state: DistributionState) -> DistributionState:
        """Schedule posts to SQS queue for async posting"""
        scheduled_posts = []
        
        for platform, content in state['platform_posts'].items():
            if 'error' in content:
                continue
            
            # Get optimal posting time from analysis
            post_time = self.get_optimal_post_time(
                platform,
                state['analysis'].get('best_posting_times', {})
            )
            
            # Send to SQS for async processing
            message = {
                'platform': platform,
                'content': content,
                'post_time': post_time.isoformat(),
                'blog_id': hash(state['blog_content']),
                'retry_count': 0
            }
            
            try:
                response = self.sqs.send_message(
                    QueueUrl=self.queue_url,
                    MessageBody=json.dumps(message),
                    DelaySeconds=self.calculate_delay(post_time)
                )
                
                scheduled_posts.append({
                    'platform': platform,
                    'post_time': post_time.isoformat(),
                    'message_id': response['MessageId'],
                    'status': 'queued'
                })
            except Exception as e:
                self.log_error('schedule_posts', str(e))
                scheduled_posts.append({
                    'platform': platform,
                    'post_time': post_time.isoformat(),
                    'status': 'failed',
                    'error': str(e)
                })
        
        state['scheduled_posts'] = scheduled_posts
        return state
    
    def track_analytics_node(self, state: DistributionState) -> DistributionState:
        """Send metrics to CloudWatch"""
        try:
            # Send custom metrics
            self.cloudwatch.put_metric_data(
                Namespace='ContentDistribution',
                MetricData=[
                    {
                        'MetricName': 'PostsScheduled',
                        'Value': len(state['scheduled_posts']),
                        'Unit': 'Count',
                        'Timestamp': datetime.utcnow()
                    },
                    {
                        'MetricName': 'PlatformsTargeted',
                        'Value': len(state['platform_posts']),
                        'Unit': 'Count',
                        'Timestamp': datetime.utcnow()
                    }
                ]
            )
            
            state['analytics'] = {
                'total_posts': len(state['scheduled_posts']),
                'platforms': list(state['platform_posts'].keys()),
                'timestamp': datetime.utcnow().isoformat()
            }
        except Exception as e:
            self.log_error('track_analytics', str(e))
        
        return state
    
    def generate_linkedin_post(self, blog_content: str, analysis: dict) -> dict:
        """Generate LinkedIn post"""
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"""Transform into LinkedIn post (1500-2000 chars):

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}
Hashtags: {', '.join(analysis['hashtags'])}

Requirements:
- Professional tone
- Line breaks
- CTA at end

Output plain text."""
            }],
            temperature=0.7
        )
        
        return {
            'content': response.choices[0].message.content,
            'format': 'long_form_post',
            'hashtags': analysis['hashtags'][:5]
        }
    
    def generate_twitter_thread(self, blog_content: str, analysis: dict) -> dict:
        """Generate Twitter thread"""
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"""Transform into Twitter thread (6-8 tweets):

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}

Each tweet under 280 chars. Output as JSON array."""
            }],
            temperature=0.7
        )
        
        return {
            'tweets': json.loads(response.choices[0].message.content),
            'format': 'thread',
            'hashtags': analysis['hashtags'][:3]
        }
    
    def generate_facebook_post(self, blog_content: str, analysis: dict) -> dict:
        """Generate Facebook post"""
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"""Transform into Facebook post (1000-1500 chars):

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}

Conversational tone, emojis, CTA. Output plain text."""
            }],
            temperature=0.7
        )
        
        return {
            'content': response.choices[0].message.content,
            'format': 'story_post'
        }
    
    def generate_instagram_carousel(self, blog_content: str, analysis: dict) -> dict:
        """Generate Instagram carousel content"""
        response = openai.chat.completions.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"""Transform into Instagram carousel (6 slides):

Blog: {blog_content}
Key points: {json.dumps(analysis['key_points'])}

Each slide: short text + design suggestion. Output as JSON array."""
            }],
            temperature=0.7
        )
        
        return {
            'slides': json.loads(response.choices[0].message.content),
            'format': 'carousel',
            'hashtags': analysis['hashtags'][:10]
        }
    
    def get_optimal_post_time(self, platform: str, best_times: dict) -> datetime:
        """Calculate optimal posting time"""
        # Use platform-specific best times or defaults
        default_times = {
            'LinkedIn': {'day_offset': 1, 'hour': 9},  # Next day 9 AM
            'Twitter': {'day_offset': 1, 'hour': 13},  # Next day 1 PM
            'Facebook': {'day_offset': 2, 'hour': 14}, # 2 days 2 PM
            'Instagram': {'day_offset': 2, 'hour': 12} # 2 days 12 PM
        }
        
        timing = best_times.get(platform, default_times.get(platform, {'day_offset': 1, 'hour': 10}))
        return datetime.now() + timedelta(days=timing['day_offset'], hours=timing['hour'])
    
    def calculate_delay(self, post_time: datetime) -> int:
        """Calculate SQS delay in seconds (max 900)"""
        delay = (post_time - datetime.now()).total_seconds()
        return min(int(delay), 900)  # SQS max delay is 15 minutes
    
    def log_error(self, node: str, error: str):
        """Log errors to CloudWatch"""
        self.cloudwatch.put_metric_data(
            Namespace='ContentDistribution',
            MetricData=[{
                'MetricName': 'Errors',
                'Value': 1,
                'Unit': 'Count',
                'Dimensions': [{'Name': 'Node', 'Value': node}]
            }]
        )
        print(f"Error in {node}: {error}")
    
    def build_graph(self) -> Graph:
        """Build LangGraph pipeline"""
        graph = Graph()
        
        graph.add_node("analyze", self.analyze_node)
        graph.add_node("generate_posts", self.generate_posts_node)
        graph.add_node("schedule_posts", self.schedule_posts_node)
        graph.add_node("track_analytics", self.track_analytics_node)
        
        graph.set_entry_point("analyze")
        graph.add_edge("analyze", "generate_posts")
        graph.add_edge("generate_posts", "schedule_posts")
        graph.add_edge("schedule_posts", "track_analytics")
        graph.add_edge("track_analytics", END)
        
        return graph.compile()

# Usage
pipeline = ContentDistributionPipeline()
graph = pipeline.build_graph()

initial_state = {
    'blog_content': blog_content,
    'analysis': {},
    'platform_posts': {},
    'scheduled_posts': [],
    'analytics': {},
    'retry_count': 0
}

result = graph.invoke(initial_state)

print(f"Distributed to {len(result['scheduled_posts'])} platforms")
print(f"Analytics: {result['analytics']}")

When to Level Up

1

Start: Simple API Calls

0-100 posts/day

  • Sequential API calls to OpenAI/Claude
  • Manual scheduling via Buffer/Hootsuite
  • Basic error logging (print statements)
  • Direct API calls to LinkedIn/Twitter
2

Scale: Add Reliability

100-1,000 posts/day

  • Automatic retries with exponential backoff
  • Proper error handling and logging (Winston/Sentry)
  • Rate limiting per platform (LinkedIn 100/day, Twitter 300/day)
  • Timeouts and circuit breakers (60s max per operation)
  • Parallel content generation (ThreadPoolExecutor)
3

Production: Queue & Analytics

1,000-5,000 posts/day

  • SQS/RabbitMQ for async posting (decoupled from generation)
  • CloudWatch/Datadog for real-time monitoring
  • Redis caching for content analysis (avoid re-analyzing)
  • LangGraph for complex workflows (conditional platform selection)
  • Retry queues with dead-letter handling
4

Enterprise: Multi-Tenant System

5,000+ posts/day

  • Multi-tenant architecture (separate accounts per client)
  • Load balancing across LLM providers (OpenAI โ†’ Anthropic โ†’ Cohere fallback)
  • A/B testing for content variations (track engagement per version)
  • Real-time engagement tracking (webhook listeners for likes/comments)
  • Custom ML models for platform optimization (learn best posting times per account)

Marketing-Specific Gotchas

The code examples above work. But marketing automation has unique challenges you need to handle.

Platform Rate Limits & API Quotas

Each platform has different rate limits. LinkedIn allows 100 posts/day per user, Twitter 300 tweets/day, Facebook varies by page. Exceed these and you get blocked. Track usage per platform and implement queuing.

from datetime import datetime, timedelta
from collections import defaultdict

class RateLimiter:
    def __init__(self):
        self.limits = {
            'linkedin': {'max': 100, 'window': 86400},  # 100 per day
            'twitter': {'max': 300, 'window': 86400},   # 300 per day
            'facebook': {'max': 200, 'window': 86400},  # 200 per day
        }
        self.usage = defaultdict(list)
    
    def can_post(self, platform: str) -> bool:
        now = datetime.now()
        limit = self.limits.get(platform.lower())
        if not limit:
            return True
        
        # Remove old entries outside window
        cutoff = now - timedelta(seconds=limit['window'])
        self.usage[platform] = [
            ts for ts in self.usage[platform] if ts > cutoff
        ]
        
        # Check if under limit
        return len(self.usage[platform]) < limit['max']
    
    def record_post(self, platform: str):
        self.usage[platform].append(datetime.now())

# Usage
limiter = RateLimiter()

if limiter.can_post('linkedin'):
    post_to_linkedin(content)
    limiter.record_post('linkedin')
else:
    # Queue for later
    queue_for_tomorrow(content, 'linkedin')

Character Limits & Formatting Per Platform

LinkedIn allows 3000 chars but best engagement is 1500-2000. Twitter is 280 chars per tweet. Instagram captions max at 2200 chars. Each platform also has different formatting rules (hashtags, mentions, links). Validate before posting.

interface PlatformRules {
  maxChars: number;
  maxHashtags: number;
  supportsFormatting: boolean;
  linkHandling: 'inline' | 'separate' | 'shortened';
}

const PLATFORM_RULES: Record<string, PlatformRules> = {
  linkedin: {
    maxChars: 3000,
    maxHashtags: 5,
    supportsFormatting: true,
    linkHandling: 'inline',
  },
  twitter: {
    maxChars: 280,
    maxHashtags: 3,
    supportsFormatting: false,
    linkHandling: 'shortened',
  },
  instagram: {
    maxChars: 2200,
    maxHashtags: 30,
    supportsFormatting: false,
    linkHandling: 'separate',
  },
};

function validateContent(
  platform: string,
  content: string,
  hashtags: string[]
): { valid: boolean; errors: string[] } {
  const rules = PLATFORM_RULES[platform.toLowerCase()];
  const errors: string[] = [];

  if (!rules) {
    return { valid: false, errors: ['Unknown platform'] };
  }

  if (content.length > rules.maxChars) {
    errors.push(
      `Content too long: ${content.length}/${rules.maxChars} chars`
    );
  }

  if (hashtags.length > rules.maxHashtags) {
    errors.push(
      `Too many hashtags: ${hashtags.length}/${rules.maxHashtags}`
    );
  }

  return { valid: errors.length === 0, errors };
}

// Usage
const validation = validateContent('twitter', tweetContent, ['AI', 'Marketing']);
if (!validation.valid) {
  console.error('Validation failed:', validation.errors);
  // Regenerate with stricter limits
}

Optimal Posting Times Vary by Industry & Audience

B2B content performs best on LinkedIn Tuesday-Thursday 8-10 AM. B2C does better on Facebook/Instagram evenings and weekends. Your audience timezone matters. Don't just schedule randomly - use analytics to find YOUR best times.

from datetime import datetime, timedelta
import pytz

def get_optimal_post_time(
    platform: str,
    audience_type: str,
    timezone: str = 'America/New_York'
) -> datetime:
    """Calculate optimal posting time based on platform and audience"""
    
    tz = pytz.timezone(timezone)
    now = datetime.now(tz)
    
    # B2B optimal times
    b2b_times = {
        'linkedin': {
            'days': [1, 2, 3],  # Tue-Thu (0=Mon)
            'hours': [8, 9, 10]
        },
        'twitter': {
            'days': [1, 2, 3, 4],  # Tue-Fri
            'hours': [12, 13, 17, 18]
        }
    }
    
    # B2C optimal times
    b2c_times = {
        'facebook': {
            'days': [2, 3, 5, 6],  # Wed, Thu, Sat, Sun
            'hours': [13, 14, 19, 20]
        },
        'instagram': {
            'days': [0, 2, 4],  # Mon, Wed, Fri
            'hours': [11, 12, 19, 20]
        }
    }
    
    times = b2b_times if audience_type == 'b2b' else b2c_times
    platform_times = times.get(platform.lower(), {'days': [1, 2, 3], 'hours': [9, 10]})
    
    # Find next optimal day
    current_day = now.weekday()
    days_ahead = [
        (d - current_day) % 7 for d in platform_times['days']
    ]
    next_day = min([d for d in days_ahead if d > 0] or [min(days_ahead) + 7])
    
    # Pick first optimal hour
    target_hour = platform_times['hours'][0]
    
    target_time = now + timedelta(days=next_day)
    target_time = target_time.replace(hour=target_hour, minute=0, second=0)
    
    return target_time

# Usage
post_time = get_optimal_post_time('linkedin', 'b2b', 'America/Los_Angeles')
print(f"Schedule for: {post_time}")

Image/Video Requirements Per Platform

LinkedIn images: 1200x627px. Twitter: 1200x675px. Instagram: 1080x1080px (square) or 1080x1350px (portrait). Facebook: 1200x630px. Wrong aspect ratios get cropped badly. Generate or resize images programmatically.

from PIL import Image
import io

PLATFORM_IMAGE_SPECS = {
    'linkedin': {'width': 1200, 'height': 627, 'format': 'JPEG'},
    'twitter': {'width': 1200, 'height': 675, 'format': 'JPEG'},
    'instagram': {'width': 1080, 'height': 1080, 'format': 'JPEG'},
    'facebook': {'width': 1200, 'height': 630, 'format': 'JPEG'},
}

def resize_image_for_platform(
    image_path: str,
    platform: str
) -> bytes:
    """Resize image to platform specs"""
    
    specs = PLATFORM_IMAGE_SPECS.get(platform.lower())
    if not specs:
        raise ValueError(f"Unknown platform: {platform}")
    
    # Open image
    img = Image.open(image_path)
    
    # Calculate aspect ratios
    target_ratio = specs['width'] / specs['height']
    img_ratio = img.width / img.height
    
    # Crop to target aspect ratio
    if img_ratio > target_ratio:
        # Image is wider - crop width
        new_width = int(img.height * target_ratio)
        left = (img.width - new_width) // 2
        img = img.crop((left, 0, left + new_width, img.height))
    else:
        # Image is taller - crop height
        new_height = int(img.width / target_ratio)
        top = (img.height - new_height) // 2
        img = img.crop((0, top, img.width, top + new_height))
    
    # Resize to exact dimensions
    img = img.resize((specs['width'], specs['height']), Image.LANCZOS)
    
    # Convert to bytes
    buffer = io.BytesIO()
    img.save(buffer, format=specs['format'], quality=90)
    return buffer.getvalue()

# Usage
linkedin_image = resize_image_for_platform('original.jpg', 'linkedin')
twitter_image = resize_image_for_platform('original.jpg', 'twitter')

Tracking UTM Parameters & Campaign Attribution

You need to track which platform drives traffic and conversions. Add UTM parameters to all links: utm_source (linkedin/twitter), utm_medium (social), utm_campaign (blog_post_title). Use URL shorteners that preserve tracking.

interface UTMParams {
  source: string;
  medium: string;
  campaign: string;
  content?: string;
  term?: string;
}

function addUTMParams(url: string, params: UTMParams): string {
  const urlObj = new URL(url);

  urlObj.searchParams.set('utm_source', params.source);
  urlObj.searchParams.set('utm_medium', params.medium);
  urlObj.searchParams.set('utm_campaign', params.campaign);

  if (params.content) {
    urlObj.searchParams.set('utm_content', params.content);
  }

  if (params.term) {
    urlObj.searchParams.set('utm_term', params.term);
  }

  return urlObj.toString();
}

async function shortenURL(longUrl: string): Promise<string> {
  // Use Bitly API to shorten while preserving UTM params
  const response = await fetch('https://api-ssl.bitly.com/v4/shorten', {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.BITLY_ACCESS_TOKEN}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({ long_url: longUrl }),
  });

  const data = await response.json();
  return data.link;
}

// Usage
const blogUrl = 'https://example.com/blog/ai-customer-service';

const linkedinUrl = addUTMParams(blogUrl, {
  source: 'linkedin',
  medium: 'social',
  campaign: 'ai_customer_service_blog',
  content: 'organic_post',
});

const shortUrl = await shortenURL(linkedinUrl);
console.log('Trackable short URL:', shortUrl);
// Result: https://bit.ly/3xYz (with UTM params preserved)

Cost Calculator

Manual Cross-Posting Process

Content writer reformats for each platform
45 min per post ร— 10 platforms = 7.5 hours
$50/hour ร— 7.5 hours = $375
Social media manager schedules posts
30 min per day
$40/hour ร— 0.5 hours = $20/day
Designer creates platform-specific images
1 hour per post set
$60/hour ร— 1 hour = $60
Analytics tracking (manual spreadsheet)
1 hour per week
$40/hour ร— 1 hour = $40/week
Total:$495 per post set + $260/week overhead = $1,535/week
Per week (3 blog posts distributed)

Limitations:

  • โ€ข Can't scale beyond 3 posts/week
  • โ€ข Inconsistent posting times
  • โ€ข Copy-paste formatting errors
  • โ€ข No A/B testing
  • โ€ข Manual UTM tracking

Automated Distribution Pipeline

OpenAI API (GPT-4 for content generation)
$0.03/1K tokens ร— 120 = $3.60/week
Anthropic Claude (backup + validation)
$0.40/week
Buffer/Hootsuite API (scheduling)
$99/month = $23/week
AWS services (SQS + Lambda + CloudWatch)
$2/week
Image generation/resizing (Cloudinary)
$5/week
Developer maintenance
$100/hour ร— 2 = $200/week
Total:$234/week
Per week (unlimited posts)

Benefits:

  • โœ“ Scale to 100+ posts/week with no added cost
  • โœ“ Consistent optimal posting times
  • โœ“ Zero formatting errors
  • โœ“ Built-in A/B testing
  • โœ“ Automatic UTM tracking
  • โœ“ Real-time analytics dashboard
$185/day saved
85% cost reduction | $5,200/month | $62,400/year
๐Ÿ’ก Payback in 2 weeks. After that, pure savings.
๐Ÿ“ฑ

Want This Running in Your Marketing Stack?

We build custom content distribution systems that integrate with your existing tools and workflows. From simple Buffer automation to enterprise multi-tenant platforms.