The Problem
On Monday you tested the 3 prompts in ChatGPT. Awesome! You saw how extraction → validation → questions works. But here's the thing: you can't ask your staff to copy-paste 500 times per day. One nurse spending 2 hours running prompts manually? That's $60/day in labor costs. Multiply that across a busy clinic and you're looking at $18,000/year just on intake admin. Plus the copy-paste errors that lead to incomplete charts.
See It Work
Watch the 3 prompts chain together automatically. This is what you'll build.
The Code
Three levels: start simple, add reliability, then scale to production. Pick where you are.
Level 1: Simple API Calls
Good for: 0-100 patients/day | Setup time: 15 minutes
# Simple API Calls (0-100 patients/day) import openai import json import os from typing import Dict, List, Optional # Set your API key openai.api_key = os.getenv('OPENAI_API_KEY') def automate_patient_intake(patient_text: str) -> Dict: """Chain the 3 prompts: extract → validate → question""" # Step 1: Extract patient data extraction_prompt = f"""Extract patient symptoms and demographics from this intake text and format as JSON. Include: patient_name, age, gender, chief_complaint, symptoms (list), duration, severity (1-10), location, medications (list of dicts with name/dose/indication), allergies (list), family_history, confidence_score (0-1). Intake text: {patient_text} Output as valid JSON only, no markdown formatting.""" extraction_response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": extraction_prompt}], temperature=0.3 ) extracted_data = json.loads(extraction_response.choices[0].message.content) # Step 2: Validate completeness validation_prompt = f"""Review this extracted patient data and identify critical missing information. For each missing field, explain why it's critical for diagnosis/treatment. Return as JSON with array of objects containing: field, why_critical, priority (high/medium/low). Also include: is_complete (boolean), completeness_score (0-1). Patient data: {json.dumps(extracted_data, indent=2)} Output as valid JSON only.""" validation_response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": validation_prompt}], temperature=0.3 ) validation_result = json.loads(validation_response.choices[0].message.content) missing_fields = validation_result.get('missing_fields', []) is_complete = validation_result.get('is_complete', False) # Step 3: Generate clarifying questions (if needed) questions = [] if not is_complete and missing_fields: question_prompt = f"""Generate 3-5 clarifying questions based on missing information. Make questions conversational and patient-friendly. Patient data: {json.dumps(extracted_data, indent=2)} Missing info: {json.dumps(missing_fields, indent=2)} Output as JSON array with objects containing: question, help_text, field, priority. Also include estimated_completion_time.""" question_response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": question_prompt}], temperature=0.7 ) question_result = json.loads(question_response.choices[0].message.content) questions = question_result.get('questions', []) return { "extracted": extracted_data, "validation": validation_result, "questions": questions, "is_complete": is_complete, "timestamp": datetime.now().isoformat() } # Usage example if __name__ == "__main__": patient_text = """My name is Sarah Johnson, I'm 54. I've been having really bad headaches for about 2 weeks now, mostly on my right side. It's like a pounding feeling, maybe 7 out of 10 pain. I'm taking blood pressure meds (lisinopril 10mg) and I'm allergic to penicillin. My mom had migraines too.""" result = automate_patient_intake(patient_text) print(f"Complete: {result['is_complete']}") print(f"Questions needed: {len(result['questions'])}") print(f"\nExtracted data:") print(json.dumps(result['extracted'], indent=2)) if result['questions']: print(f"\nFollow-up questions:") for q in result['questions']: print(f"- {q['question']}")
Level 2: With Error Handling & Retries
Good for: 100-1,000 patients/day | Setup time: 1 hour
// With Error Handling & Retries (100-1000 patients/day) import Anthropic from '@anthropic-ai/sdk'; import winston from 'winston'; // Setup logging const logger = winston.createLogger({ level: 'info', format: winston.format.json(), transports: [ new winston.transports.File({ filename: 'error.log', level: 'error' }), new winston.transports.File({ filename: 'combined.log' }), ], }); interface ExtractedData { patient_name: string; age: number; gender: string; chief_complaint: string; symptoms: string[]; duration: string; severity: number; location?: string; medications: Array<{ name: string; dose: string; indication: string }>; allergies: string[]; family_history?: string; confidence_score: number; } interface ValidationResult { missing_fields: Array<{ field: string; why_critical: string; priority: 'high' | 'medium' | 'low'; }>; is_complete: boolean; completeness_score: number; } interface Question { question: string; help_text: string; field: string; priority: 'high' | 'medium' | 'low'; } interface IntakeResult { extracted: ExtractedData; validation: ValidationResult; questions: Question[]; is_complete: boolean; timestamp: string; processing_time_ms: number; } class IntakeAutomation { private anthropic: Anthropic; private maxRetries: number; private timeoutMs: number; constructor(apiKey: string, maxRetries: number = 3, timeoutMs: number = 30000) { this.anthropic = new Anthropic({ apiKey }); this.maxRetries = maxRetries; this.timeoutMs = timeoutMs; } async processIntake(patientText: string): Promise<IntakeResult> { const startTime = Date.now(); try { // Step 1: Extract with retries logger.info('Starting extraction', { textLength: patientText.length }); const extracted = await this.retryWithBackoff<ExtractedData>(async () => { return await this.extractPatientData(patientText); }); logger.info('Extraction complete', { patientName: extracted.patient_name }); // Step 2: Validate logger.info('Starting validation'); const validation = await this.retryWithBackoff<ValidationResult>(async () => { return await this.validateCompleteness(extracted); }); logger.info('Validation complete', { isComplete: validation.is_complete, missingFields: validation.missing_fields.length }); // Step 3: Generate questions if needed let questions: Question[] = []; if (!validation.is_complete && validation.missing_fields.length > 0) { logger.info('Generating questions'); questions = await this.retryWithBackoff<Question[]>(async () => { return await this.generateQuestions(extracted, validation.missing_fields); }); logger.info('Questions generated', { count: questions.length }); } const processingTime = Date.now() - startTime; return { extracted, validation, questions, is_complete: validation.is_complete, timestamp: new Date().toISOString(), processing_time_ms: processingTime, }; } catch (error) { logger.error('Intake processing failed', { error: error.message }); throw new Error(`Failed to process intake after ${this.maxRetries} retries: ${error.message}`); } } private async extractPatientData(patientText: string): Promise<ExtractedData> { const prompt = `Extract patient symptoms and demographics from this intake text and format as JSON. Include: patient_name, age, gender, chief_complaint, symptoms (array), duration, severity (1-10), location, medications (array of objects with name/dose/indication), allergies (array), family_history, confidence_score (0-1). Intake text: ${patientText} Output as valid JSON only, no markdown.`; const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 1024, temperature: 0.3, messages: [{ role: 'user', content: prompt }], }); const content = response.content[0]; if (content.type !== 'text') { throw new Error('Invalid response type from Claude'); } return JSON.parse(content.text); } private async validateCompleteness(extracted: ExtractedData): Promise<ValidationResult> { const prompt = `Review this extracted patient data and identify critical missing information. For each missing field, explain why it's critical for diagnosis/treatment. Return as JSON with: missing_fields (array of {field, why_critical, priority}), is_complete (boolean), completeness_score (0-1). Patient data: ${JSON.stringify(extracted, null, 2)} Output as valid JSON only.`; const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 1024, temperature: 0.3, messages: [{ role: 'user', content: prompt }], }); const content = response.content[0]; if (content.type !== 'text') { throw new Error('Invalid response type from Claude'); } return JSON.parse(content.text); } private async generateQuestions( extracted: ExtractedData, missingFields: ValidationResult['missing_fields'] ): Promise<Question[]> { const prompt = `Generate 3-5 clarifying questions based on missing information. Make questions conversational and patient-friendly. Patient data: ${JSON.stringify(extracted, null, 2)} Missing info: ${JSON.stringify(missingFields, null, 2)} Output as JSON array with objects: {question, help_text, field, priority}.`; const response = await this.anthropic.messages.create({ model: 'claude-3-5-sonnet-20241022', max_tokens: 1024, temperature: 0.7, messages: [{ role: 'user', content: prompt }], }); const content = response.content[0]; if (content.type !== 'text') { throw new Error('Invalid response type from Claude'); } const result = JSON.parse(content.text); return result.questions || result; } private async retryWithBackoff<T>(fn: () => Promise<T>): Promise<T> { let lastError: Error | null = null; for (let attempt = 0; attempt < this.maxRetries; attempt++) { try { return await Promise.race([ fn(), new Promise<never>((_, reject) => setTimeout(() => reject(new Error('Request timeout')), this.timeoutMs) ), ]); } catch (error) { lastError = error as Error; logger.warn(`Attempt ${attempt + 1} failed`, { error: error.message }); if (attempt < this.maxRetries - 1) { const delayMs = Math.pow(2, attempt) * 1000; logger.info(`Retrying in ${delayMs}ms`); await new Promise((resolve) => setTimeout(resolve, delayMs)); } } } throw lastError; } } // Usage example async function main() { const automation = new IntakeAutomation( process.env.ANTHROPIC_API_KEY!, 3, // max retries 30000 // 30 second timeout ); const patientText = `My name is Sarah Johnson, I'm 54. I've been having really bad headaches for about 2 weeks now, mostly on my right side. It's like a pounding feeling, maybe 7 out of 10 pain. I'm taking blood pressure meds (lisinopril 10mg) and I'm allergic to penicillin. My mom had migraines too.`; try { const result = await automation.processIntake(patientText); console.log(`Processed in ${result.processing_time_ms}ms`); console.log(`Complete: ${result.is_complete}`); console.log(`Questions: ${result.questions.length}`); console.log(JSON.stringify(result, null, 2)); } catch (error) { console.error('Processing failed:', error.message); } } main();
Level 3: Production Pattern with LangGraph
Good for: 1,000+ patients/day | Setup time: 1 day
# Production Pattern with LangGraph (1000+ patients/day) from langgraph.graph import StateGraph, END from typing import TypedDict, List, Dict, Annotated import operator import openai import json import logging from datetime import datetime import os # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) openai.api_key = os.getenv('OPENAI_API_KEY') class IntakeState(TypedDict): """State that flows through the graph""" patient_text: str extracted_data: Dict missing_fields: List[Dict] questions: List[Dict] is_complete: bool retry_count: int errors: Annotated[List[str], operator.add] # Accumulate errors timestamp: str processing_steps: Annotated[List[str], operator.add] # Track workflow def extract_node(state: IntakeState) -> IntakeState: """Extract patient data from text""" logger.info("Starting extraction") state['processing_steps'].append(f"extract_started_{datetime.now().isoformat()}") try: prompt = f"""Extract patient symptoms and demographics from this intake text and format as JSON. Include: patient_name, age, gender, chief_complaint, symptoms (list), duration, severity (1-10), location, medications (list of dicts), allergies (list), family_history, confidence_score (0-1). Intake text: {state['patient_text']} Output as valid JSON only.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.3, timeout=30 ) state['extracted_data'] = json.loads(response.choices[0].message.content) state['processing_steps'].append(f"extract_completed_{datetime.now().isoformat()}") logger.info(f"Extracted data for patient: {state['extracted_data'].get('patient_name')}") except Exception as e: error_msg = f"Extraction failed: {str(e)}" logger.error(error_msg) state['errors'].append(error_msg) state['retry_count'] += 1 return state def validate_node(state: IntakeState) -> IntakeState: """Validate completeness of extracted data""" logger.info("Starting validation") state['processing_steps'].append(f"validate_started_{datetime.now().isoformat()}") try: prompt = f"""Review this extracted patient data and identify critical missing information. For each missing field, explain why it's critical. Return as JSON with: missing_fields (array of {{field, why_critical, priority}}), is_complete (boolean), completeness_score (0-1). Patient data: {json.dumps(state['extracted_data'], indent=2)} Output as valid JSON only.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.3, timeout=30 ) validation_result = json.loads(response.choices[0].message.content) state['missing_fields'] = validation_result.get('missing_fields', []) state['is_complete'] = validation_result.get('is_complete', False) state['processing_steps'].append(f"validate_completed_{datetime.now().isoformat()}") logger.info(f"Validation complete. Missing fields: {len(state['missing_fields'])}") except Exception as e: error_msg = f"Validation failed: {str(e)}" logger.error(error_msg) state['errors'].append(error_msg) state['retry_count'] += 1 return state def question_node(state: IntakeState) -> IntakeState: """Generate clarifying questions for missing information""" logger.info("Generating questions") state['processing_steps'].append(f"questions_started_{datetime.now().isoformat()}") try: prompt = f"""Generate 3-5 clarifying questions based on missing information. Make questions conversational and patient-friendly. Patient data: {json.dumps(state['extracted_data'], indent=2)} Missing info: {json.dumps(state['missing_fields'], indent=2)} Output as JSON array with objects: {{question, help_text, field, priority}}.""" response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.7, timeout=30 ) result = json.loads(response.choices[0].message.content) state['questions'] = result.get('questions', result) state['processing_steps'].append(f"questions_completed_{datetime.now().isoformat()}") logger.info(f"Generated {len(state['questions'])} questions") except Exception as e: error_msg = f"Question generation failed: {str(e)}" logger.error(error_msg) state['errors'].append(error_msg) state['retry_count'] += 1 return state def check_completeness(state: IntakeState) -> str: """Route based on completeness and retry count""" if state['is_complete']: logger.info("Intake is complete, ending workflow") return "complete" elif state['retry_count'] >= 3: logger.warning(f"Max retries reached ({state['retry_count']}), ending workflow") return "max_retries" elif len(state['errors']) > 0: logger.warning(f"Errors detected: {len(state['errors'])}, retrying extraction") return "retry" else: logger.info("Intake incomplete, generating questions") return "incomplete" def human_review_node(state: IntakeState) -> IntakeState: """Flag for human review when max retries reached""" logger.warning("Flagging for human review") state['processing_steps'].append(f"flagged_for_review_{datetime.now().isoformat()}") state['questions'].append({ "question": "This intake requires human review due to processing errors.", "help_text": f"Errors: {', '.join(state['errors'])}", "field": "manual_review", "priority": "high" }) return state def build_intake_graph(): """Build the LangGraph workflow""" graph = StateGraph(IntakeState) # Add nodes graph.add_node("extract", extract_node) graph.add_node("validate", validate_node) graph.add_node("generate_questions", question_node) graph.add_node("human_review", human_review_node) # Set entry point graph.set_entry_point("extract") # Add edges graph.add_edge("extract", "validate") # Conditional routing from validate graph.add_conditional_edges( "validate", check_completeness, { "complete": END, "incomplete": "generate_questions", "retry": "extract", # Retry extraction on errors "max_retries": "human_review" } ) graph.add_edge("generate_questions", END) graph.add_edge("human_review", END) return graph.compile() class ProductionIntakeAutomation: """Production-ready intake automation with monitoring""" def __init__(self): self.graph = build_intake_graph() self.stats = { "total_processed": 0, "completed": 0, "incomplete": 0, "errors": 0 } def process_intake(self, patient_text: str) -> Dict: """Process a single patient intake""" initial_state = { "patient_text": patient_text, "extracted_data": {}, "missing_fields": [], "questions": [], "is_complete": False, "retry_count": 0, "errors": [], "timestamp": datetime.now().isoformat(), "processing_steps": [] } try: result = self.graph.invoke(initial_state) self.stats["total_processed"] += 1 if result['is_complete']: self.stats["completed"] += 1 elif len(result['errors']) > 0: self.stats["errors"] += 1 else: self.stats["incomplete"] += 1 return result except Exception as e: logger.error(f"Graph execution failed: {str(e)}") self.stats["errors"] += 1 raise def get_stats(self) -> Dict: """Get processing statistics""" return self.stats # Usage example if __name__ == "__main__": automation = ProductionIntakeAutomation() patient_text = """My name is Sarah Johnson, I'm 54. I've been having really bad headaches for about 2 weeks now, mostly on my right side. It's like a pounding feeling, maybe 7 out of 10 pain. I'm taking blood pressure meds (lisinopril 10mg) and I'm allergic to penicillin. My mom had migraines too.""" result = automation.process_intake(patient_text) print(f"\nIntake complete: {result['is_complete']}") print(f"Questions: {len(result['questions'])}") print(f"Processing steps: {len(result['processing_steps'])}") print(f"Errors: {len(result['errors'])}") print(f"\nExtracted data:") print(json.dumps(result['extracted_data'], indent=2)) if result['questions']: print(f"\nFollow-up questions:") for q in result['questions']: print(f"- [{q['priority']}] {q['question']}") print(f"\nStats: {automation.get_stats()}")
When to Level Up
Start: Simple API Calls
0-100 patients/day
- Sequential API calls, no framework needed
- Basic error logging with print statements
- Manual retry on failures
- Good for: Testing the concept, small clinics
Scale: Add Reliability
100-1,000 patients/day
- Automatic retries with exponential backoff
- Proper error handling and logging (Winston/Sentry)
- Rate limiting and queue management
- Timeouts and circuit breakers
- Good for: Growing practices, multi-provider clinics
Production: Framework & Orchestration
1,000-5,000 patients/day
- LangGraph for complex workflows with cycles
- Conditional routing (complete → END, incomplete → questions, errors → retry)
- State management across steps (no data loss on failures)
- Human-in-the-loop for edge cases (flags for manual review)
- Monitoring and statistics tracking
- Good for: Hospital systems, urgent care networks
Enterprise: Multi-Agent System
5,000+ patients/day
- Multiple specialized agents (extraction, validation, PHI detection, EHR integration)
- Load balancing across multiple LLM providers (fallback from Claude → GPT-4 → Gemini)
- Real-time monitoring dashboard (Grafana + Prometheus)
- Distributed processing (Kubernetes + message queues)
- A/B testing of prompts and models
- Cost optimization with model routing
- Good for: Large hospital networks, insurance companies
Healthcare-Specific Gotchas
The code examples above work. But healthcare has unique challenges you need to handle.
PHI Redaction Before Sending to LLMs
HIPAA requires you to redact Protected Health Information (PHI) before sending to third-party LLMs. You can't just send "My name is Sarah Johnson" to OpenAI without violating HIPAA. Use AWS Comprehend Medical to detect and redact names, SSNs, addresses, phone numbers, dates, and medical record numbers. This adds 200-300ms per request but it's non-negotiable for compliance.
import boto3 import re from typing import Dict, List class PHIRedactor: def __init__(self): self.comprehend = boto3.client('comprehendmedical') def redact_phi(self, text: str) -> Dict[str, any]: """Redact PHI using AWS Comprehend Medical""" # Detect PHI entities result = self.comprehend.detect_phi(Text=text) # Sort entities by position (reverse order to maintain indices) entities = sorted(result['Entities'], key=lambda x: x['BeginOffset'], reverse=True) redacted_text = text phi_map = {} # Store original values for re-identification later for entity in entities: original = entity['Text'] entity_type = entity['Type'] begin = entity['BeginOffset'] end = entity['EndOffset'] # Generate replacement token replacement = f"[{entity_type}_{len(phi_map)}]" phi_map[replacement] = original # Replace in text redacted_text = redacted_text[:begin] + replacement + redacted_text[end:] return { 'redacted_text': redacted_text, 'phi_map': phi_map, 'entities_found': len(entities) } def restore_phi(self, text: str, phi_map: Dict[str, str]) -> str: """Restore original PHI after processing""" restored = text for token, original in phi_map.items(): restored = restored.replace(token, original) return restored # Usage in intake automation redactor = PHIRedactor() def safe_automate_intake(patient_text: str) -> Dict: # Step 1: Redact PHI redaction = redactor.redact_phi(patient_text) safe_text = redaction['redacted_text'] # Step 2: Process with LLM (now HIPAA compliant) result = automate_patient_intake(safe_text) # Step 3: Restore PHI in final output result['extracted']['patient_name'] = redactor.restore_phi( result['extracted']['patient_name'], redaction['phi_map'] ) return result
HIPAA Audit Logging Requirements
Every access to patient data must be logged with: who accessed it, what they did, when, where (IP address), and why. Keep logs for 7 years. This isn't optional - it's federal law. Use CloudWatch Logs with encryption at rest, or a HIPAA-compliant logging service like Splunk. Log before extraction, after validation, and when questions are generated. Include patient ID (not name), user ID, action type, and timestamp.
import logging import json from datetime import datetime import boto3 from typing import Dict class HIPAALogger: def __init__(self, log_group: str, log_stream: str): self.cloudwatch = boto3.client('logs') self.log_group = log_group self.log_stream = log_stream # Create log stream if it doesn't exist try: self.cloudwatch.create_log_stream( logGroupName=log_group, logStreamName=log_stream ) except self.cloudwatch.exceptions.ResourceAlreadyExistsException: pass def log_phi_access(self, user_id: str, patient_id: str, action: str, reason: str, ip_address: str, additional_data: Dict = None) -> None: """Log PHI access event""" event = { 'timestamp': datetime.now().isoformat(), 'user_id': user_id, 'patient_id': patient_id, # Use patient_id, NOT patient_name 'action': action, # 'extract', 'validate', 'generate_questions' 'reason': reason, # 'automated_intake', 'manual_review' 'ip_address': ip_address, 'system': 'intake_automation', 'version': '1.0', 'additional_data': additional_data or {} } # Send to CloudWatch self.cloudwatch.put_log_events( logGroupName=self.log_group, logStreamName=self.log_stream, logEvents=[{ 'timestamp': int(datetime.now().timestamp() * 1000), 'message': json.dumps(event) }] ) # Usage in intake automation hipaa_logger = HIPAALogger( log_group='/healthcare/phi-access', log_stream='intake-automation' ) def logged_automate_intake(patient_text: str, user_id: str, patient_id: str, ip_address: str) -> Dict: # Log extraction hipaa_logger.log_phi_access( user_id=user_id, patient_id=patient_id, action='extract', reason='automated_intake', ip_address=ip_address ) result = automate_patient_intake(patient_text) # Log validation hipaa_logger.log_phi_access( user_id=user_id, patient_id=patient_id, action='validate', reason='automated_intake', ip_address=ip_address, additional_data={'is_complete': result['is_complete']} ) # Log question generation if needed if not result['is_complete']: hipaa_logger.log_phi_access( user_id=user_id, patient_id=patient_id, action='generate_questions', reason='automated_intake', ip_address=ip_address, additional_data={'question_count': len(result['questions'])} ) return result
Medical Abbreviation Handling
Patients and staff use medical abbreviations that LLMs might misinterpret. 'SOB' means shortness of breath, not what you think. 'CP' is chest pain. 'N/V' is nausea and vomiting. If you don't expand these before processing, the LLM might miss critical symptoms. Build an abbreviation dictionary and expand them pre-processing. Update it regularly - medical slang evolves.
import re from typing import Dict class MedicalAbbreviationExpander: # Common medical abbreviations ABBREVS = { 'SOB': 'shortness of breath', 'CP': 'chest pain', 'N/V': 'nausea and vomiting', 'HA': 'headache', 'HTN': 'hypertension', 'DM': 'diabetes mellitus', 'COPD': 'chronic obstructive pulmonary disease', 'CHF': 'congestive heart failure', 'MI': 'myocardial infarction', 'CVA': 'cerebrovascular accident', 'Pt': 'patient', 'Hx': 'history', 'Sx': 'symptoms', 'Tx': 'treatment', 'Dx': 'diagnosis', 'Rx': 'prescription', 'c/o': 'complains of', 'w/': 'with', 'w/o': 'without', 'prn': 'as needed', 'bid': 'twice daily', 'tid': 'three times daily', 'qid': 'four times daily', 'qd': 'once daily', 'qhs': 'at bedtime', 'NPO': 'nothing by mouth', 'STAT': 'immediately' } def expand(self, text: str) -> Dict[str, any]: """Expand medical abbreviations in text""" expanded_text = text expansions_made = [] for abbrev, full in self.ABBREVS.items(): # Use word boundaries to avoid partial matches pattern = r'\b' + re.escape(abbrev) + r'\b' if re.search(pattern, expanded_text, re.IGNORECASE): expansions_made.append({ 'abbreviation': abbrev, 'expanded_to': full }) expanded_text = re.sub( pattern, full, expanded_text, flags=re.IGNORECASE ) return { 'original_text': text, 'expanded_text': expanded_text, 'expansions': expansions_made } # Usage in intake automation expander = MedicalAbbreviationExpander() def preprocess_patient_text(patient_text: str) -> str: """Preprocess text before sending to LLM""" # Expand abbreviations expansion = expander.expand(patient_text) # Log what was expanded for audit trail if expansion['expansions']: print(f"Expanded {len(expansion['expansions'])} abbreviations:") for exp in expansion['expansions']: print(f" {exp['abbreviation']} → {exp['expanded_to']}") return expansion['expanded_text'] # Example patient_text = "Pt is 54yo F c/o severe HA x 2 weeks. CP 7/10. Hx of HTN, on lisinopril 10mg qd." expanded = preprocess_patient_text(patient_text) print(f"\nExpanded: {expanded}") # Output: "patient is 54yo F complains of severe headache x 2 weeks. chest pain 7/10. history of hypertension, on lisinopril 10mg once daily."
EHR Integration (Epic/Cerner FHIR APIs)
After extraction, you need to write structured data back to the EHR using HL7 FHIR format. Each EHR has different authentication (OAuth2 for Epic, custom tokens for Cerner), different FHIR versions (R4 vs DSTU2), and different required fields. Epic requires you to create a Patient resource, then link Condition, MedicationStatement, and AllergyIntolerance resources. Test in sandbox first - production EHR writes are scary.
from fhirclient import client from fhirclient.models import patient, humanname, condition, codeableconcept, reference from fhirclient.models import medicationstatement, allergyintolerance from typing import Dict import os class EHRIntegration: def __init__(self, ehr_type: str = 'epic'): if ehr_type == 'epic': settings = { 'app_id': os.getenv('EPIC_APP_ID'), 'api_base': os.getenv('EPIC_FHIR_BASE'), # https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/ 'redirect_uri': os.getenv('EPIC_REDIRECT_URI') } elif ehr_type == 'cerner': settings = { 'app_id': os.getenv('CERNER_APP_ID'), 'api_base': os.getenv('CERNER_FHIR_BASE'), 'redirect_uri': os.getenv('CERNER_REDIRECT_URI') } else: raise ValueError(f"Unsupported EHR type: {ehr_type}") self.smart = client.FHIRClient(settings=settings) def write_intake_to_ehr(self, extracted_data: Dict, patient_id: str = None) -> Dict: """Write extracted intake data to EHR as FHIR resources""" results = {} # 1. Create or update Patient resource if not patient_id: pat = patient.Patient() name = humanname.HumanName() name.given = [extracted_data.get('patient_name', '').split()[0]] name.family = ' '.join(extracted_data.get('patient_name', '').split()[1:]) pat.name = [name] pat.birthDate = self._calculate_birth_date(extracted_data.get('age')) pat.gender = extracted_data.get('gender', 'unknown') pat.create(self.smart.server) patient_id = pat.id results['patient_id'] = patient_id # 2. Create Condition resource for chief complaint cond = condition.Condition() cond.code = codeableconcept.CodeableConcept() cond.code.text = extracted_data.get('chief_complaint') cond.subject = reference.Reference() cond.subject.reference = f'Patient/{patient_id}' cond.severity = self._map_severity(extracted_data.get('severity', 5)) cond.create(self.smart.server) results['condition_id'] = cond.id # 3. Create MedicationStatement resources medication_ids = [] for med in extracted_data.get('medications', []): med_stmt = medicationstatement.MedicationStatement() med_stmt.medicationCodeableConcept = codeableconcept.CodeableConcept() med_stmt.medicationCodeableConcept.text = f"{med.get('name')} {med.get('dose')}" med_stmt.subject = reference.Reference() med_stmt.subject.reference = f'Patient/{patient_id}' med_stmt.status = 'active' med_stmt.create(self.smart.server) medication_ids.append(med_stmt.id) results['medication_ids'] = medication_ids # 4. Create AllergyIntolerance resources allergy_ids = [] for allergy in extracted_data.get('allergies', []): allergy_int = allergyintolerance.AllergyIntolerance() allergy_int.code = codeableconcept.CodeableConcept() allergy_int.code.text = allergy allergy_int.patient = reference.Reference() allergy_int.patient.reference = f'Patient/{patient_id}' allergy_int.clinicalStatus = codeableconcept.CodeableConcept() allergy_int.clinicalStatus.text = 'active' allergy_int.create(self.smart.server) allergy_ids.append(allergy_int.id) results['allergy_ids'] = allergy_ids return results def _calculate_birth_date(self, age: int) -> str: """Calculate approximate birth date from age""" from datetime import datetime, timedelta birth_year = datetime.now().year - age return f"{birth_year}-01-01" def _map_severity(self, severity: int) -> codeableconcept.CodeableConcept: """Map 1-10 severity to FHIR severity code""" severity_code = codeableconcept.CodeableConcept() if severity <= 3: severity_code.text = 'mild' elif severity <= 6: severity_code.text = 'moderate' else: severity_code.text = 'severe' return severity_code # Usage after extraction ehr = EHRIntegration(ehr_type='epic') def automate_with_ehr_write(patient_text: str) -> Dict: # Extract data result = automate_patient_intake(patient_text) # Write to EHR if extraction was successful if result['extracted']: try: ehr_result = ehr.write_intake_to_ehr( extracted_data=result['extracted'] ) result['ehr_write'] = { 'success': True, 'patient_id': ehr_result['patient_id'], 'resources_created': len(ehr_result) } except Exception as e: result['ehr_write'] = { 'success': False, 'error': str(e) } return result
Patient Consent Management
Before processing any patient data with AI, verify consent is on file. Some patients opt out of AI processing for religious, privacy, or trust reasons. You need a consent database that tracks: patient ID, consent type (ai_processing), granted (true/false), date granted, expiration date. Check consent before each extraction. If no consent, route to manual intake workflow. Document the consent check in HIPAA logs.
import { createClient } from '@supabase/supabase-js'; interface ConsentRecord { patient_id: string; consent_type: 'ai_processing' | 'data_sharing' | 'research'; granted: boolean; granted_date: string; expiration_date?: string; revoked_date?: string; } class ConsentManager { private db: any; constructor() { this.db = createClient( process.env.SUPABASE_URL!, process.env.SUPABASE_KEY! ); } async checkAIConsent(patientId: string): Promise<{ hasConsent: boolean; reason?: string; consentDate?: string; }> { try { const { data, error } = await this.db .from('patient_consent') .select('*') .eq('patient_id', patientId) .eq('consent_type', 'ai_processing') .single(); if (error) { return { hasConsent: false, reason: 'no_consent_record', }; } const consent = data as ConsentRecord; // Check if consent was revoked if (consent.revoked_date) { return { hasConsent: false, reason: 'consent_revoked', }; } // Check if consent expired if (consent.expiration_date) { const expirationDate = new Date(consent.expiration_date); if (expirationDate < new Date()) { return { hasConsent: false, reason: 'consent_expired', }; } } // Check if consent was granted if (!consent.granted) { return { hasConsent: false, reason: 'consent_denied', }; } return { hasConsent: true, consentDate: consent.granted_date, }; } catch (error) { console.error('Consent check failed:', error); // Fail closed - if we can't verify consent, don't process return { hasConsent: false, reason: 'consent_check_error', }; } } async logConsentCheck( patientId: string, hasConsent: boolean, reason: string ): Promise<void> { await this.db.from('consent_audit_log').insert({ patient_id: patientId, check_timestamp: new Date().toISOString(), has_consent: hasConsent, reason: reason, checked_by: 'intake_automation', }); } } const consentManager = new ConsentManager(); async function automateWithConsentCheck( patientText: string, patientId: string ): Promise<any> { // Check consent before processing const consentCheck = await consentManager.checkAIConsent(patientId); // Log the consent check await consentManager.logConsentCheck( patientId, consentCheck.hasConsent, consentCheck.reason || 'consent_granted' ); if (!consentCheck.hasConsent) { // Route to manual intake workflow return { status: 'manual_review_required', reason: consentCheck.reason, message: 'Patient has not consented to AI processing', manual_workflow_url: `/manual-intake/${patientId}`, }; } // Proceed with automated intake const result = await automateIntake(patientText); return { status: 'automated_processing_complete', consent_verified: true, consent_date: consentCheck.consentDate, ...result, }; } // Usage const result = await automateWithConsentCheck(patientText, 'patient-12345'); if (result.status === 'manual_review_required') { console.log(`Routing to manual workflow: ${result.reason}`); } else { console.log(`Automated processing complete: ${result.is_complete}`); }
Cost Calculator
Manual Process (Current State)
Limitations:
- • Can't scale beyond 10-20 patients/day
- • High error rate (40%)
- • Staff burnout from repetitive tasks
- • No standardization across staff
Automated Process (With This System)
Benefits:
- ✓ Scales to 5000+ patients/day
- ✓ Error rate reduced to <5%
- ✓ Consistent quality across all intakes
- ✓ Staff focus on complex cases only