Pipelines & Ingestion
Document Purpose: This document describes the data ingestion pipelines, message queues, and processing workflows that bring clinical data into the Entheory.AI platform.
Executive Summary
Entheory.AI ingests data from multiple heterogeneous hospital systems through standardized pipelines. All data flows through message queues for reliability, traceability, and error handling.
Related Documentation:
- High-Level Architecture – System overview
- Ingestion Use Cases – Detailed use cases
- Processing Use Cases – OCR/ASR processing
- Operations Use Cases – Job management
1. Data Sources Overview
1.1 Source Types
flowchart LR
subgraph Sources["External Sources"]
EMR["EMR/HIMS"]
LIS["Lab Information System"]
PACS["PACS/Imaging"]
PATH["Pathology Lab"]
GENOMICS["Genomics Lab"]
DOCS["Scanned Documents"]
AUDIO["Audio Recordings"]
end
subgraph Adapters["Ingestion Adapters"]
HL7A["HL7 v2 Adapter"]
FHIR["FHIR Adapter"]
DICOM["DICOM Adapter"]
JSON["JSON/CSV Adapter"]
FILE["File Upload"]
end
subgraph Queue["Message Queue"]
IQ["Ingestion Queue"]
end
EMR --> HL7A --> IQ
LIS --> HL7A --> IQ
PACS --> DICOM --> IQ
PATH --> FILE --> IQ
GENOMICS --> JSON --> IQ
DOCS --> FILE --> IQ
AUDIO --> FILE --> IQ
style Sources fill:#ff9
style Adapters fill:#9cf
style Queue fill:#f9c
1.2 Source Configuration
| Source |
Protocol |
Format |
Trigger |
Use Cases |
| EMR/HIMS |
HL7 MLLP (TCP:2575) |
HL7 v2.5 ADT |
Real-time push |
ING-001 |
| LIS (Labs) |
HL7 MLLP |
HL7 v2.5 ORU |
Real-time push |
ING-004 |
| PACS |
DICOM / REST |
DICOM JSON, SR |
Event webhook |
ING-006 |
| Pathology |
File drop / API |
PDF, JSON |
File watcher / API |
ING-011 |
| Genomics |
REST API |
JSON / VCF |
Polling / webhook |
ING-013 |
| Documents |
File upload |
PDF, images |
User upload |
PROC-001 |
| Audio |
File upload |
MP3, WAV |
User upload |
PROC-005 |
2. Pipeline Architecture
2.1 End-to-End Flow
sequenceDiagram
participant Src as Source System
participant Adp as Adapter
participant Q as Message Queue
participant Wrk as Worker
participant Bnd as Patient Bundle
participant Cache as Cache
participant UI as Frontend
Src->>Adp: Send data (HL7/FHIR/File)
Adp->>Adp: Validate & normalize
Adp->>Q: Enqueue job
Adp-->>Src: ACK/NACK
Q->>Wrk: Dequeue job
Wrk->>Wrk: Process (OCR/ASR/Parse)
Wrk->>Bnd: Update patient bundle
Bnd->>Cache: Invalidate/refresh
UI->>Cache: Request patient data
Cache-->>UI: Return updated data
2.2 Pipeline Stages
| Stage |
Description |
SLA |
Error Handling |
| 1. Receive |
Accept data from source |
< 1s |
NACK if malformed |
| 2. Validate |
Schema/format validation |
< 500ms |
Reject to DLQ |
| 3. Normalize |
Map to canonical format |
< 1s |
Log + continue |
| 4. Enqueue |
Add to processing queue |
< 100ms |
Retry 3x |
| 5. Process |
Execute transforms (OCR/ASR/NLP) |
Varies |
Retry + DLQ |
| 6. Persist |
Update patient bundle |
< 500ms |
Retry + alert |
| 7. Notify |
Invalidate cache, emit events |
< 200ms |
Best-effort |
3. Message Queues
3.1 Queue Architecture
flowchart TB
subgraph Ingestion["Ingestion Queues"]
HL7Q["HL7 Queue"]
FHIRQ["FHIR Queue"]
FILEQ["File Queue"]
end
subgraph Processing["Processing Queues"]
OCRQ["OCR Queue"]
ASRQ["ASR Queue"]
NLPQ["NLP Queue"]
end
subgraph DLQ["Dead Letter Queues"]
HL7DLQ["HL7 DLQ"]
OCRDLQ["OCR DLQ"]
ASRDLQ["ASR DLQ"]
end
HL7Q --> HL7DLQ
OCRQ --> OCRDLQ
ASRQ --> ASRDLQ
style Ingestion fill:#9cf
style Processing fill:#9fc
style DLQ fill:#f99
3.2 Queue Configuration
Technology Choice: The messaging layer is configurable per deployment. Options include:
- Kafka – High throughput, log-based, best for event streaming
- NATS – Lightweight, low latency, ideal for microservices
- RabbitMQ – Feature-rich, traditional message broker
Selection depends on hospital IT capabilities and scale requirements.
| Queue |
Technology |
Workers |
Visibility Timeout |
Max Retries |
DLQ |
| hl7-ingestion |
Kafka/NATS/RabbitMQ |
4 |
30s |
3 |
hl7-dlq |
| fhir-ingestion |
Kafka/NATS/RabbitMQ |
2 |
60s |
3 |
fhir-dlq |
| ocr-processing |
Kafka/NATS/RabbitMQ |
10 |
5 min |
2 |
ocr-dlq |
| asr-processing |
Kafka/NATS/RabbitMQ |
3 (GPU) |
15 min |
2 |
asr-dlq |
| nlp-processing |
Kafka/NATS/RabbitMQ |
4 |
3 min |
2 |
nlp-dlq |
{
"messageId": "msg-2024-001234",
"jobId": "job-2024-567890",
"timestamp": "2024-12-09T10:30:00Z",
"source": "hospital_xyz_lis",
"type": "hl7_oru",
"priority": "normal",
"payload": {
"patientId": "ABHA-12345678901",
"messageType": "ORU^R01",
"rawMessage": "MSH|^~\\&|LIS|HOSP|...",
"parsedData": { ... }
},
"metadata": {
"hospital": "hospital_xyz",
"department": "oncology",
"correlationId": "corr-abc123"
},
"retryCount": 0
}
4. Ingestion Pipelines
4.1 HL7 Pipeline
flowchart LR
subgraph Receive["1. Receive"]
MLLP["MLLP Listener<br/>TCP:2575"]
end
subgraph Parse["2. Parse"]
PARSE["HL7 Parser"]
VAL["Validation"]
end
subgraph Map["3. Map"]
PID["Patient ID<br/>Mapping"]
NORM["Data<br/>Normalization"]
end
subgraph Store["4. Store"]
QUEUE["Queue"]
WORKER["Worker"]
BUNDLE["Bundle<br/>Update"]
end
MLLP --> PARSE --> VAL --> PID --> NORM --> QUEUE --> WORKER --> BUNDLE
Supported Message Types:
| Message Type |
Description |
Processing |
| ADT^A01 |
Patient admission |
Create/update demographics |
| ADT^A03 |
Patient discharge |
Update status |
| ADT^A08 |
Patient update |
Update demographics |
| ORU^R01 |
Lab results |
Append to labs array |
| ORM^O01 |
Order request |
Create order record |
| RDE^O11 |
Pharmacy dispense |
Update medications |
Use Cases: ING-001 - ING-005
4.2 PACS/Imaging Pipeline
flowchart LR
subgraph Receive["1. Receive"]
WEBHOOK["Webhook<br/>Notification"]
QUERY["DICOM Query"]
end
subgraph Fetch["2. Fetch"]
META["Fetch<br/>Metadata"]
SERIES["Fetch<br/>Series List"]
end
subgraph Process["3. Process"]
LINK["Link to<br/>Patient"]
PARSE["Parse<br/>Report SR"]
end
subgraph Store["4. Store"]
BUNDLE["Bundle<br/>Update"]
CACHE["Update<br/>Cache"]
end
WEBHOOK --> META --> LINK --> BUNDLE
QUERY --> SERIES --> PARSE --> BUNDLE --> CACHE
Use Cases: ING-006 - ING-010
4.3 Document (OCR) Pipeline
flowchart LR
subgraph Upload["1. Upload"]
API["Upload API"]
S3["Store in S3"]
end
subgraph Queue["2. Queue"]
JOB["Create Job"]
QUEUE["OCR Queue"]
end
subgraph Process["3. Process"]
WORKER["OCR Worker"]
LANG["Language<br/>Detection"]
TESS["Tesseract/<br/>PaddleOCR"]
end
subgraph Extract["4. Extract"]
NER["Entity<br/>Extraction"]
CLASS["Document<br/>Classification"]
end
subgraph Store["5. Store"]
BUNDLE["Bundle<br/>Update"]
end
API --> S3 --> JOB --> QUEUE --> WORKER --> LANG --> TESS --> NER --> CLASS --> BUNDLE
Use Cases: PROC-001 - PROC-010
4.4 Audio (ASR) Pipeline
flowchart LR
subgraph Upload["1. Upload"]
REC["Recording API"]
S3["Store in S3"]
end
subgraph Queue["2. Queue"]
JOB["Create Job"]
QUEUE["ASR Queue"]
end
subgraph Process["3. Process"]
WORKER["ASR Worker<br/>(GPU)"]
WHISPER["Whisper<br/>Transcription"]
DIAR["Speaker<br/>Diarization"]
end
subgraph Generate["4. Generate"]
NLP["NLP Pipeline"]
SOAP["SOAP Note<br/>Generation"]
end
subgraph Store["5. Store"]
BUNDLE["Bundle<br/>Update"]
EMR["Push to EMR"]
end
REC --> S3 --> JOB --> QUEUE --> WORKER --> WHISPER --> DIAR --> NLP --> SOAP --> BUNDLE --> EMR
Use Cases: PROC-005, PROC-006, NLP-101
5. Error Handling
5.1 Error Categories
| Category |
Examples |
Handling |
| Transient |
Network timeout, service unavailable |
Retry with exponential backoff |
| Validation |
Invalid HL7, malformed JSON |
Reject to DLQ, log error |
| Processing |
OCR failure, ASR timeout |
Retry 2x, then DLQ |
| Data |
Patient not found, duplicate |
Manual resolution queue |
| System |
Out of memory, disk full |
Alert, pause processing |
5.2 Retry Strategy
Attempt 1: Immediate
Attempt 2: Wait 30 seconds
Attempt 3: Wait 2 minutes
After 3 failures: Move to DLQ
5.3 Dead Letter Queue (DLQ) Handling
| Action |
Trigger |
Owner |
| Auto-retry |
Transient errors after 1 hour |
System |
| Manual Review |
Validation errors |
Data team |
| Bulk Replay |
After fix deployed |
Engineering |
| Discard |
Duplicate or obsolete |
Data team |
Use Cases: OPS-002, OPS-004
6. Job Management
6.1 Job Lifecycle
stateDiagram-v2
[*] --> Queued: Job created
Queued --> Processing: Worker picks up
Processing --> Completed: Success
Processing --> Failed: Error
Failed --> Queued: Retry
Failed --> DLQ: Max retries exceeded
DLQ --> Queued: Manual replay
Completed --> [*]
DLQ --> Discarded: Manual discard
Discarded --> [*]
6.2 Job Status API
GET /api/jobs/{jobId}
Response:
{
"jobId": "job-2024-567890",
"status": "processing",
"progress": 45,
"type": "ocr",
"createdAt": "2024-12-09T10:30:00Z",
"updatedAt": "2024-12-09T10:31:30Z",
"metadata": {
"patientId": "ABHA-12345",
"documentType": "pathology_report",
"pageCount": 3
},
"error": null
}
Use Cases: API-003
7. Monitoring & Metrics
7.1 Pipeline Metrics
| Metric |
Description |
Alert Threshold |
| queue.depth |
Messages waiting |
> 1000 |
| queue.age |
Oldest message age |
> 15 min |
| job.processing_time |
Job duration |
> SLA |
| job.success_rate |
Completed / Total |
< 95% |
| dlq.depth |
DLQ message count |
> 10 |
| worker.utilization |
Worker busy % |
> 80% |
7.2 Dashboard Alerts
| Alert |
Severity |
Action |
| Queue Backup |
P1 |
Scale workers |
| DLQ Growing |
P2 |
Investigate failures |
| Worker Crash |
P1 |
Restart, investigate |
| Processing Timeout |
P2 |
Check source data |
8.1 Normalization Rules
| Source Field |
Target Field |
Transformation |
| Patient MRN |
patientId |
Map via ID registry |
| Lab Code (local) |
loincCode |
Map via code table |
| Drug Name (local) |
rxnormCode |
Fuzzy match + map |
| Date formats |
ISO 8601 |
Parse and normalize |
| Units |
UCUM |
Convert and standardize |
8.2 Canonical Bundle Structure
{
"patientId": "ABHA-12345678901",
"demographics": { ... },
"labs": [
{
"id": "lab-001",
"date": "2024-12-09",
"loincCode": "26464-8",
"value": 12.5,
"unit": "g/dL",
"source": "hospital_xyz_lis",
"ingestedAt": "2024-12-09T10:31:00Z"
}
],
"imaging": [ ... ],
"documents": [ ... ],
"transcripts": [ ... ],
"provenance": {
"lastUpdated": "2024-12-09T10:31:00Z",
"sources": ["hospital_xyz_lis", "hospital_xyz_pacs"]
}
}
9.1 Throughput Optimization
| Technique |
Description |
Impact |
| Batch Processing |
Process multiple HL7 messages together |
3x throughput |
| Connection Pooling |
Reuse database connections |
Reduce latency |
| Async I/O |
Non-blocking S3 operations |
2x throughput |
| Worker Scaling |
Auto-scale on queue depth |
Handle peaks |
9.2 Latency Optimization
| Technique |
Description |
Impact |
| Priority Queues |
VIP patients processed first |
Faster critical data |
| Cache Warming |
Pre-load recently accessed patients |
Faster reads |
| Parallel Processing |
OCR pages in parallel |
2-3x faster |
Document Owner: Data Engineering Team
Last Updated: 2024-12-09
Next Review: Quarterly