CMS Claims Data Comparison Pipeline — 6-step design
The pipeline processes CMS Medicare claims data in six sequential steps.
Each step receives a PipelineContext object containing the DuckDB connection,
configuration, and accumulated results from prior steps. Each step returns a
StepResult with status, metrics, and warnings.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart LR
subgraph INPUT["📁 Input"]
OLD["Old System CSVs
(3 Beneficiary + 2 Carrier)"]
NEW["New System CSVs
(3 Beneficiary + 2 Carrier)"]
end
S1["1
Receive
& Verify"]
S2["2
Schema
Validation"]
S3["3
Ingest
& Profile"]
S4["4
Record
Matching"]
S5["5
Compare
& Analyze"]
S6["6
Report"]
subgraph OUTPUT["📊 Output"]
JSON["report_data.json"]
HTML["comparison_report.html"]
CSV["CSV Exports"]
end
INPUT --> S1 --> S2 --> S3 --> S4 --> S5 --> S6 --> OUTPUT
style S1 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0
style S2 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0
style S3 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0
style S4 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0
style S5 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0
style S6 fill:#1e293b,stroke:#38bdf8,stroke-width:2px,color:#e2e8f0
style INPUT fill:#0f172a,stroke:#334155,color:#94a3b8
style OUTPUT fill:#0f172a,stroke:#4ade80,color:#94a3b8
Discovers CSV files on disk (or from a StorageAdapter for cloud sources),
verifies file integrity via checksums, handles zip extraction, and builds a complete
file inventory with sizes and metadata. Separate discovery paths for old and new systems.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
subgraph SOURCES["Data Sources"]
LOCAL["Local Filesystem
data/"]
CLOUD["Cloud Storage
(S3 via StorageAdapter)"]
NEWDIR["New System Dir
--new-data path/"]
end
DISCOVER["File Discovery
Glob: *Beneficiary*, *Carrier*"]
ZIP["Zip Extraction
(if .zip detected)"]
CHECKSUM["SHA-256 Checksum
Integrity Verification"]
INVENTORY["Build Inventory
{name, path, size_mb, checksum}"]
SOURCES --> DISCOVER
DISCOVER --> ZIP
ZIP --> CHECKSUM
CHECKSUM --> INVENTORY
INVENTORY --> RES["StepResult
• old_system.inventory
• new_system.inventory
• file_count"]
style DISCOVER fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style ZIP fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style CHECKSUM fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style INVENTORY fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style RES fill:#0f172a,stroke:#4ade80,color:#4ade80
src/pipeline/step1_receive.pysrc/storage.py — StorageAdapter protocolReads CSV headers without loading full data and validates them against expected schemas. Classifies each file as beneficiary or carrier claims based on header fingerprints. Detects missing columns, extra columns, and type mismatches early — before the expensive ingestion step.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart LR
INV["File Inventory
from Step 1"] --> READ["Read CSV Headers
(first row only)"]
READ --> CLASS["Classify File Type
Beneficiary vs Carrier"]
CLASS --> BENE["Beneficiary Schema
32 expected columns
DESYNPUF_ID, BENE_BIRTH_DT, ..."]
CLASS --> CARR["Carrier Schema
141 expected columns
DESYNPUF_ID, CLM_ID, ..."]
BENE --> CHECK["Column Validation
• Missing columns?
• Extra columns?
• Name mismatches?"]
CARR --> CHECK
CHECK --> RES["StepResult
• validated_count
• beneficiary_count
• carrier_count
• schema_errors[]"]
style READ fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style CLASS fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style BENE fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style CARR fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style CHECK fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style RES fill:#0f172a,stroke:#4ade80,color:#4ade80
src/pipeline/step2_schema.py
The heaviest step. Loads all CSVs into DuckDB in-memory tables using read_csv_auto,
adds derived columns (e.g., summary_year from filename), then profiles every
column in every table — computing null rates, distinct counts, min/max/mean, and detecting
statistical anomalies.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
subgraph INGEST["Ingestion"]
CSV_OLD["Old System CSVs"] --> DUCK_OLD["DuckDB Tables
• beneficiary_summary
• carrier_claims"]
CSV_NEW["New System CSVs"] --> DUCK_NEW["DuckDB Tables
• new_beneficiary_summary
• new_carrier_claims"]
end
subgraph ENRICH["Enrichment"]
DUCK_OLD --> YEAR["Add summary_year
(extracted from filename)"]
DUCK_NEW --> YEAR_N["Add summary_year
(extracted from filename)"]
end
subgraph PROFILE["Profiling (per table)"]
COLS["For each column:
• null_count, null_pct
• distinct_count
• min, max, mean
• dtype detection"]
ANOMALY["Anomaly Detection
• High null columns (>50%)
• Zero-variance columns
• Outlier distributions"]
end
ENRICH --> PROFILE
PROFILE --> RES["StepResult
• profiles{table → TableProfile}
• anomalies[]
• row counts"]
style DUCK_OLD fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style DUCK_NEW fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style COLS fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style ANOMALY fill:#1e293b,stroke:#f87171,color:#e2e8f0
style RES fill:#0f172a,stroke:#4ade80,color:#4ade80
src/pipeline/step3_ingest.pysrc/ingest.py — DuckDB loading logicsrc/profile.py — column-level profiling
Joins old and new system tables on primary keys using FULL OUTER JOIN.
Classifies every record as matched, old-only, or
new-only. Also runs internal consistency validations (key integrity,
temporal checks, demographic checks, financial reconciliation) on the old system data.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
subgraph TABLES["DuckDB Tables"]
OLD_B["beneficiary_summary
343,644 rows"]
NEW_B["new_beneficiary_summary
343,644 rows"]
OLD_C["carrier_claims
4,741,335 rows"]
NEW_C["new_carrier_claims
4,746,112 rows"]
end
subgraph KEYS["Key Identification"]
K1["Beneficiary Key:
DESYNPUF_ID + summary_year"]
K2["Claims Key:
CLM_ID"]
end
subgraph JOIN["FULL OUTER JOIN"]
J1["CAST keys to VARCHAR
(handle type mismatches)"]
J1 --> CLASSIFY["Classify Records
• COALESCE(old, new) not null → matched
• new IS NULL → old_only
• old IS NULL → new_only"]
end
subgraph VALIDATE["Internal Validation"]
V1["Key Integrity (3 checks)"]
V2["Temporal Consistency"]
V3["Demographic Checks"]
V4["Financial Reconciliation"]
end
TABLES --> KEYS --> JOIN
TABLES --> VALIDATE
JOIN --> MATCH_T["_match_beneficiary
_match_claims"]
CLASSIFY --> RATES["Match Rates
Bene: 99.91%
Claims: 99.9%"]
VALIDATE --> VRES["ValidationResults
7 passed, 4 failed"]
style OLD_B fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style NEW_B fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style OLD_C fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style NEW_C fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style CLASSIFY fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style MATCH_T fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style RATES fill:#0f172a,stroke:#4ade80,color:#4ade80
style VRES fill:#0f172a,stroke:#f87171,color:#f87171
src/pipeline/step4_match.py — join + classifysrc/validate.py — 11 validation checksFor every matched record pair, compares each field value between old and new systems. Aggregates discrepancies by column and category. Computes financial divergence totals and builds a detailed discrepancy table with per-record diff information.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
MATCHED["Matched Records
from Step 4"] --> COMPARE["Field-by-Field Comparison
For each column:
old_value != new_value?"]
COMPARE --> AGG["Aggregate Discrepancies
• By column (which fields differ?)
• By category (demographic, financial, etc.)
• By severity (count + percentage)"]
COMPARE --> DETAIL["Discrepancy Detail Table
_discrepancy_detail
343,485 rows × 37 cols"]
COMPARE --> FINREC["Financial Reconciliation
_financial_recon
262,017 rows × 11 cols"]
AGG --> CHECKS["46 ComparisonResults
• check_name
• category
• table_pair
• metric_value"]
subgraph METRICS["Key Metrics"]
M1["446 beneficiaries mismatched (0.13%)"]
M2["$35,624.71 total financial divergence"]
M3["4,777 phantom claims in new system"]
end
CHECKS --> METRICS
style MATCHED fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style COMPARE fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style AGG fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style DETAIL fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style FINREC fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style CHECKS fill:#0f172a,stroke:#4ade80,color:#4ade80
src/pipeline/step5_compare.pysrc/compare.py — comparison logic_discrepancy_detail — row-level diffs_financial_recon — dollar-level reconciliationComparisonResult objects
Generates the final deliverables. First builds a canonical report_data.json
(the data layer), then renders the HTML report with client-side Plotly charts from the
embedded JSON. Also exports raw diff tables as CSVs for external analysis.
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
subgraph INPUTS["Inputs from Steps 1-5"]
PROF["TableProfiles
(8 tables)"]
VAL["ValidationResults
(11 checks)"]
COMP["ComparisonResults
(46 checks)"]
CTX["PipelineContext
(file inventories, match rates)"]
CHART_Q["Chart Data Queries
(DuckDB → aggregations)"]
end
BUILD["build_report_data()
Assemble canonical dict"]
INPUTS --> BUILD
BUILD --> JSON["report_data.json
292 KB
(presentation-agnostic)"]
BUILD --> RENDER["Jinja2 Template
+ Embedded chart_data JSON"]
RENDER --> HTML["comparison_report.html
• Data Context section
• Executive Summary
• 6 Plotly charts (client-side)
• Sortable tables
• Collapsible profiles"]
BUILD --> EXPORT["CSV Exports"]
EXPORT --> CSV1["_discrepancy_detail.csv"]
EXPORT --> CSV2["_financial_recon.csv"]
EXPORT --> CSV3["_match_beneficiary.csv"]
EXPORT --> CSV4["_match_claims.csv"]
JSON -.->|"fetch()"|REACT["React Diff Viewer
(separate app)"]
JSON -.->|"Any consumer"|OTHER["CLI / PDF / API
(future)"]
style BUILD fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style JSON fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style HTML fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style RENDER fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style REACT fill:#0f172a,stroke:#a78bfa,color:#a78bfa,stroke-dasharray: 5 5
style OTHER fill:#0f172a,stroke:#94a3b8,color:#94a3b8,stroke-dasharray: 5 5
src/pipeline/step6_report.pysrc/report.py — template + renderingreport_data.json is the canonical artifactAll intermediate data lives in DuckDB in-memory tables. The pipeline context accumulates results from each step. Only the final report step writes to disk (JSON, HTML, CSVs).
%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#1e293b', 'primaryBorderColor': '#38bdf8', 'primaryTextColor': '#e2e8f0', 'lineColor': '#38bdf8', 'secondaryColor': '#334155', 'tertiaryColor': '#0f172a'}}}%%
flowchart TB
subgraph DISK["Disk (Input)"]
F1["data/old_system/*.csv
(old system)"]
F2["data/new_system/*.csv
(new system)"]
end
subgraph DUCKDB["DuckDB In-Memory"]
T1["beneficiary_summary"]
T2["carrier_claims"]
T3["new_beneficiary_summary"]
T4["new_carrier_claims"]
T5["_match_beneficiary"]
T6["_match_claims"]
T7["_discrepancy_detail"]
T8["_financial_recon"]
end
subgraph PYTHON["Python Objects (PipelineContext)"]
P1["profiles: dict[str, TableProfile]"]
P2["validations: list[ValidationResult]"]
P3["comparisons: list[ComparisonResult]"]
P4["results: dict (step outputs)"]
end
subgraph OUTPUT_DISK["Disk (Output)"]
O1["reports/report_data.json"]
O2["reports/comparison_report.html"]
O3["reports/exports/*.csv"]
end
F1 --> T1 & T2
F2 --> T3 & T4
T1 & T2 & T3 & T4 --> T5 & T6
T5 & T6 --> T7 & T8
DUCKDB --> PYTHON
PYTHON --> OUTPUT_DISK
style T1 fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style T2 fill:#1e293b,stroke:#38bdf8,color:#e2e8f0
style T3 fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style T4 fill:#1e293b,stroke:#4ade80,color:#e2e8f0
style T5 fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style T6 fill:#1e293b,stroke:#a78bfa,color:#e2e8f0
style T7 fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style T8 fill:#1e293b,stroke:#fbbf24,color:#e2e8f0
style O1 fill:#0f172a,stroke:#4ade80,color:#4ade80
style O2 fill:#0f172a,stroke:#fbbf24,color:#fbbf24
style O3 fill:#0f172a,stroke:#94a3b8,color:#94a3b8
Serverless deployment using Lambda container images, Step Functions orchestration, and S3 for storage + static site hosting. Status: scaffolded, not production-tested.
Each pipeline step runs as a Lambda function (container image). Step Functions chains them with gate logic — if a step sets halted=true, execution stops immediately.
After the report step, docs/ and reports/ (including Parquet exports) are synced to an S3 static website bucket. The Parquet Viewer and SQL Explorer work identically — hyparquet and Squirreling run in the browser, loading .parquet files via HTTP GET from S3.
flowchart TB
USER["Browser"] -->|"GET"| S3SITE["S3 Static Website
docs + reports + exports"]
USER -->|"POST /pipeline/start"| APIGW["API Gateway"]
APIGW --> START["Lambda: Start
(trigger)"]
START -->|"StartExecution"| SFN["Step Functions
State Machine"]
subgraph PIPELINE["Pipeline Steps (Lambda Containers)"]
L1["Step 1: Receive
S3 file listing"]
L2["Step 2: Schema Validate
CSV header checks"]
L3["Step 3: Ingest
DuckDB + httpfs"]
L4["Step 4: Match
FULL OUTER JOIN"]
L5["Step 5: Compare
field-level diffs"]
L6["Step 6: Report
HTML + Parquet"]
end
SFN --> L1 --> L2 --> L3 --> L4 --> L5 --> L6
S3DATA["S3 Data Bucket
CSVs, DuckDB snapshots,
reports, exports"]
L1 <-->|"read files"| S3DATA
L3 <-->|"httpfs + snapshot"| S3DATA
L4 <-->|"restore + snapshot"| S3DATA
L5 <-->|"restore + snapshot"| S3DATA
L6 -->|"write reports"| S3DATA
L6 -->|"sync docs/"| S3SITE
style USER fill:#0f172a,stroke:#38bdf8,color:#38bdf8
style APIGW fill:#1e293b,stroke:#f59e0b,color:#f59e0b
style START fill:#1e293b,stroke:#f59e0b,color:#e2e8f0
style SFN fill:#1e293b,stroke:#a78bfa,color:#a78bfa
style S3DATA fill:#1e293b,stroke:#4ade80,color:#4ade80
style S3SITE fill:#1e293b,stroke:#38bdf8,color:#38bdf8
style L1 fill:#0f172a,stroke:#fb923c,color:#e2e8f0
style L2 fill:#0f172a,stroke:#fb923c,color:#e2e8f0
style L3 fill:#0f172a,stroke:#fb923c,color:#e2e8f0
style L4 fill:#0f172a,stroke:#fb923c,color:#e2e8f0
style L5 fill:#0f172a,stroke:#fb923c,color:#e2e8f0
style L6 fill:#0f172a,stroke:#fb923c,color:#e2e8f0
| Component | Status | Notes |
|---|---|---|
| src/pipeline/step1–6 | ✅ Implemented | Shared with local — same code |
| src/adapters/aws.py | ✅ Implemented | S3 read/write/list via boto3 |
| cloud/handlers.py | ✅ Implemented | Lambda entry points + API trigger |
| infra/template.yaml | ✅ Implemented | SAM: S3, Lambda ×7, Step Functions, API GW, static site |
| S3 static site for docs | ⚠️ Scaffolded | SAM resource defined; sync not yet wired |
| DuckDB S3 snapshotting | ⚠️ Scaffolded | Handler code exists; not tested on real AWS |
| End-to-end cloud test | ❌ Not tested | Not deployed or run on real AWS |
| CloudFront CDN | ❌ Not implemented | Recommended for production |
| CI/CD + Monitoring | ❌ Not implemented | No GitHub Actions or CloudWatch alarms |
CMS Claims Comparison Pipeline — Architecture Diagrams
Python DuckDB Jinja2 Plotly.js Mermaid.js