Skip to content

Pipelines & Ingestion

Document Purpose: This document describes the data ingestion pipelines, message queues, and processing workflows that bring clinical data into the Entheory.AI platform.


Executive Summary

Entheory.AI ingests data from multiple heterogeneous hospital systems through standardized pipelines. All data flows through message queues for reliability, traceability, and error handling.

Related Documentation: - High-Level Architecture – System overview - Ingestion Use Cases – Detailed use cases - Processing Use Cases – OCR/ASR processing - Operations Use Cases – Job management


1. Data Sources Overview

1.1 Source Types

flowchart LR
    subgraph Sources["External Sources"]
        EMR["EMR/HIMS"]
        LIS["Lab Information System"]
        PACS["PACS/Imaging"]
        PATH["Pathology Lab"]
        GENOMICS["Genomics Lab"]
        DOCS["Scanned Documents"]
        AUDIO["Audio Recordings"]
    end

    subgraph Adapters["Ingestion Adapters"]
        HL7A["HL7 v2 Adapter"]
        FHIR["FHIR Adapter"]
        DICOM["DICOM Adapter"]
        JSON["JSON/CSV Adapter"]
        FILE["File Upload"]
    end

    subgraph Queue["Message Queue"]
        IQ["Ingestion Queue"]
    end

    EMR --> HL7A --> IQ
    LIS --> HL7A --> IQ
    PACS --> DICOM --> IQ
    PATH --> FILE --> IQ
    GENOMICS --> JSON --> IQ
    DOCS --> FILE --> IQ
    AUDIO --> FILE --> IQ

    style Sources fill:#ff9
    style Adapters fill:#9cf
    style Queue fill:#f9c

1.2 Source Configuration

Source Protocol Format Trigger Use Cases
EMR/HIMS HL7 MLLP (TCP:2575) HL7 v2.5 ADT Real-time push ING-001
LIS (Labs) HL7 MLLP HL7 v2.5 ORU Real-time push ING-004
PACS DICOM / REST DICOM JSON, SR Event webhook ING-006
Pathology File drop / API PDF, JSON File watcher / API ING-011
Genomics REST API JSON / VCF Polling / webhook ING-013
Documents File upload PDF, images User upload PROC-001
Audio File upload MP3, WAV User upload PROC-005

2. Pipeline Architecture

2.1 End-to-End Flow

sequenceDiagram
    participant Src as Source System
    participant Adp as Adapter
    participant Q as Message Queue
    participant Wrk as Worker
    participant Bnd as Patient Bundle
    participant Cache as Cache
    participant UI as Frontend

    Src->>Adp: Send data (HL7/FHIR/File)
    Adp->>Adp: Validate & normalize
    Adp->>Q: Enqueue job
    Adp-->>Src: ACK/NACK

    Q->>Wrk: Dequeue job
    Wrk->>Wrk: Process (OCR/ASR/Parse)
    Wrk->>Bnd: Update patient bundle
    Bnd->>Cache: Invalidate/refresh

    UI->>Cache: Request patient data
    Cache-->>UI: Return updated data

2.2 Pipeline Stages

Stage Description SLA Error Handling
1. Receive Accept data from source < 1s NACK if malformed
2. Validate Schema/format validation < 500ms Reject to DLQ
3. Normalize Map to canonical format < 1s Log + continue
4. Enqueue Add to processing queue < 100ms Retry 3x
5. Process Execute transforms (OCR/ASR/NLP) Varies Retry + DLQ
6. Persist Update patient bundle < 500ms Retry + alert
7. Notify Invalidate cache, emit events < 200ms Best-effort

3. Message Queues

3.1 Queue Architecture

flowchart TB
    subgraph Ingestion["Ingestion Queues"]
        HL7Q["HL7 Queue"]
        FHIRQ["FHIR Queue"]
        FILEQ["File Queue"]
    end

    subgraph Processing["Processing Queues"]
        OCRQ["OCR Queue"]
        ASRQ["ASR Queue"]
        NLPQ["NLP Queue"]
    end

    subgraph DLQ["Dead Letter Queues"]
        HL7DLQ["HL7 DLQ"]
        OCRDLQ["OCR DLQ"]
        ASRDLQ["ASR DLQ"]
    end

    HL7Q --> HL7DLQ
    OCRQ --> OCRDLQ
    ASRQ --> ASRDLQ

    style Ingestion fill:#9cf
    style Processing fill:#9fc
    style DLQ fill:#f99

3.2 Queue Configuration

Technology Choice: The messaging layer is configurable per deployment. Options include: - Kafka – High throughput, log-based, best for event streaming - NATS – Lightweight, low latency, ideal for microservices - RabbitMQ – Feature-rich, traditional message broker

Selection depends on hospital IT capabilities and scale requirements.

Queue Technology Workers Visibility Timeout Max Retries DLQ
hl7-ingestion Kafka/NATS/RabbitMQ 4 30s 3 hl7-dlq
fhir-ingestion Kafka/NATS/RabbitMQ 2 60s 3 fhir-dlq
ocr-processing Kafka/NATS/RabbitMQ 10 5 min 2 ocr-dlq
asr-processing Kafka/NATS/RabbitMQ 3 (GPU) 15 min 2 asr-dlq
nlp-processing Kafka/NATS/RabbitMQ 4 3 min 2 nlp-dlq

3.3 Message Format

{
  "messageId": "msg-2024-001234",
  "jobId": "job-2024-567890",
  "timestamp": "2024-12-09T10:30:00Z",
  "source": "hospital_xyz_lis",
  "type": "hl7_oru",
  "priority": "normal",
  "payload": {
    "patientId": "ABHA-12345678901",
    "messageType": "ORU^R01",
    "rawMessage": "MSH|^~\\&|LIS|HOSP|...",
    "parsedData": { ... }
  },
  "metadata": {
    "hospital": "hospital_xyz",
    "department": "oncology",
    "correlationId": "corr-abc123"
  },
  "retryCount": 0
}

4. Ingestion Pipelines

4.1 HL7 Pipeline

flowchart LR
    subgraph Receive["1. Receive"]
        MLLP["MLLP Listener<br/>TCP:2575"]
    end

    subgraph Parse["2. Parse"]
        PARSE["HL7 Parser"]
        VAL["Validation"]
    end

    subgraph Map["3. Map"]
        PID["Patient ID<br/>Mapping"]
        NORM["Data<br/>Normalization"]
    end

    subgraph Store["4. Store"]
        QUEUE["Queue"]
        WORKER["Worker"]
        BUNDLE["Bundle<br/>Update"]
    end

    MLLP --> PARSE --> VAL --> PID --> NORM --> QUEUE --> WORKER --> BUNDLE

Supported Message Types:

Message Type Description Processing
ADT^A01 Patient admission Create/update demographics
ADT^A03 Patient discharge Update status
ADT^A08 Patient update Update demographics
ORU^R01 Lab results Append to labs array
ORM^O01 Order request Create order record
RDE^O11 Pharmacy dispense Update medications

Use Cases: ING-001 - ING-005

4.2 PACS/Imaging Pipeline

flowchart LR
    subgraph Receive["1. Receive"]
        WEBHOOK["Webhook<br/>Notification"]
        QUERY["DICOM Query"]
    end

    subgraph Fetch["2. Fetch"]
        META["Fetch<br/>Metadata"]
        SERIES["Fetch<br/>Series List"]
    end

    subgraph Process["3. Process"]
        LINK["Link to<br/>Patient"]
        PARSE["Parse<br/>Report SR"]
    end

    subgraph Store["4. Store"]
        BUNDLE["Bundle<br/>Update"]
        CACHE["Update<br/>Cache"]
    end

    WEBHOOK --> META --> LINK --> BUNDLE
    QUERY --> SERIES --> PARSE --> BUNDLE --> CACHE

Use Cases: ING-006 - ING-010

4.3 Document (OCR) Pipeline

flowchart LR
    subgraph Upload["1. Upload"]
        API["Upload API"]
        S3["Store in S3"]
    end

    subgraph Queue["2. Queue"]
        JOB["Create Job"]
        QUEUE["OCR Queue"]
    end

    subgraph Process["3. Process"]
        WORKER["OCR Worker"]
        LANG["Language<br/>Detection"]
        TESS["Tesseract/<br/>PaddleOCR"]
    end

    subgraph Extract["4. Extract"]
        NER["Entity<br/>Extraction"]
        CLASS["Document<br/>Classification"]
    end

    subgraph Store["5. Store"]
        BUNDLE["Bundle<br/>Update"]
    end

    API --> S3 --> JOB --> QUEUE --> WORKER --> LANG --> TESS --> NER --> CLASS --> BUNDLE

Use Cases: PROC-001 - PROC-010

4.4 Audio (ASR) Pipeline

flowchart LR
    subgraph Upload["1. Upload"]
        REC["Recording API"]
        S3["Store in S3"]
    end

    subgraph Queue["2. Queue"]
        JOB["Create Job"]
        QUEUE["ASR Queue"]
    end

    subgraph Process["3. Process"]
        WORKER["ASR Worker<br/>(GPU)"]
        WHISPER["Whisper<br/>Transcription"]
        DIAR["Speaker<br/>Diarization"]
    end

    subgraph Generate["4. Generate"]
        NLP["NLP Pipeline"]
        SOAP["SOAP Note<br/>Generation"]
    end

    subgraph Store["5. Store"]
        BUNDLE["Bundle<br/>Update"]
        EMR["Push to EMR"]
    end

    REC --> S3 --> JOB --> QUEUE --> WORKER --> WHISPER --> DIAR --> NLP --> SOAP --> BUNDLE --> EMR

Use Cases: PROC-005, PROC-006, NLP-101


5. Error Handling

5.1 Error Categories

Category Examples Handling
Transient Network timeout, service unavailable Retry with exponential backoff
Validation Invalid HL7, malformed JSON Reject to DLQ, log error
Processing OCR failure, ASR timeout Retry 2x, then DLQ
Data Patient not found, duplicate Manual resolution queue
System Out of memory, disk full Alert, pause processing

5.2 Retry Strategy

Attempt 1: Immediate
Attempt 2: Wait 30 seconds
Attempt 3: Wait 2 minutes
After 3 failures: Move to DLQ

5.3 Dead Letter Queue (DLQ) Handling

Action Trigger Owner
Auto-retry Transient errors after 1 hour System
Manual Review Validation errors Data team
Bulk Replay After fix deployed Engineering
Discard Duplicate or obsolete Data team

Use Cases: OPS-002, OPS-004


6. Job Management

6.1 Job Lifecycle

stateDiagram-v2
    [*] --> Queued: Job created
    Queued --> Processing: Worker picks up
    Processing --> Completed: Success
    Processing --> Failed: Error
    Failed --> Queued: Retry
    Failed --> DLQ: Max retries exceeded
    DLQ --> Queued: Manual replay
    Completed --> [*]
    DLQ --> Discarded: Manual discard
    Discarded --> [*]

6.2 Job Status API

GET /api/jobs/{jobId}

Response:
{
  "jobId": "job-2024-567890",
  "status": "processing",
  "progress": 45,
  "type": "ocr",
  "createdAt": "2024-12-09T10:30:00Z",
  "updatedAt": "2024-12-09T10:31:30Z",
  "metadata": {
    "patientId": "ABHA-12345",
    "documentType": "pathology_report",
    "pageCount": 3
  },
  "error": null
}

Use Cases: API-003


7. Monitoring & Metrics

7.1 Pipeline Metrics

Metric Description Alert Threshold
queue.depth Messages waiting > 1000
queue.age Oldest message age > 15 min
job.processing_time Job duration > SLA
job.success_rate Completed / Total < 95%
dlq.depth DLQ message count > 10
worker.utilization Worker busy % > 80%

7.2 Dashboard Alerts

Alert Severity Action
Queue Backup P1 Scale workers
DLQ Growing P2 Investigate failures
Worker Crash P1 Restart, investigate
Processing Timeout P2 Check source data

8. Data Transformations

8.1 Normalization Rules

Source Field Target Field Transformation
Patient MRN patientId Map via ID registry
Lab Code (local) loincCode Map via code table
Drug Name (local) rxnormCode Fuzzy match + map
Date formats ISO 8601 Parse and normalize
Units UCUM Convert and standardize

8.2 Canonical Bundle Structure

{
  "patientId": "ABHA-12345678901",
  "demographics": { ... },
  "labs": [
    {
      "id": "lab-001",
      "date": "2024-12-09",
      "loincCode": "26464-8",
      "value": 12.5,
      "unit": "g/dL",
      "source": "hospital_xyz_lis",
      "ingestedAt": "2024-12-09T10:31:00Z"
    }
  ],
  "imaging": [ ... ],
  "documents": [ ... ],
  "transcripts": [ ... ],
  "provenance": {
    "lastUpdated": "2024-12-09T10:31:00Z",
    "sources": ["hospital_xyz_lis", "hospital_xyz_pacs"]
  }
}

9. Performance Optimization

9.1 Throughput Optimization

Technique Description Impact
Batch Processing Process multiple HL7 messages together 3x throughput
Connection Pooling Reuse database connections Reduce latency
Async I/O Non-blocking S3 operations 2x throughput
Worker Scaling Auto-scale on queue depth Handle peaks

9.2 Latency Optimization

Technique Description Impact
Priority Queues VIP patients processed first Faster critical data
Cache Warming Pre-load recently accessed patients Faster reads
Parallel Processing OCR pages in parallel 2-3x faster


Document Owner: Data Engineering Team
Last Updated: 2024-12-09
Next Review: Quarterly