Why Data Never Reaches Bronze: Source-Layer Ingestion Failures in Medallion Architectures
The Bronze layer is the entry point of every medallion architecture. It is also where the most failures originate — not from your compute infrastructure, not from your transformation logic, but from the sources themselves. Most of these failures are entirely preventable. The pattern recognition is the work.
Across a large-scale production data platform processing financial data from multiple source systems — relational databases, event streams, file drops, and external APIs — 188 source-layer incidents were logged and resolved. Every single one shared a common characteristic: the data platform was ready. The source was not.
This is the breakdown of all eight failure patterns — what they look like in production, why they recur, and the engineering discipline that stops them from recurring.
Incident distribution — the eight categories
| Category | Incidents | Fix type |
|---|---|---|
| PostgreSQL connectivity failure | 32 | Infra + config |
| Source file not delivered | 31 | Process + SLA |
| Schema drift / column mismatch | 26 | Contract enforcement |
| Source API change / unavailability | 17 | Versioning + governance |
| Source DB connectivity (non-Postgres) | 12 | Network + health checks |
| Empty / absent source data | 11 | Code guard + branching |
| Kafka / IP network whitelisting | 10 | Arch + process |
| DB lock, encryption, format, SLA | 22 | Multiple |
Category one — PostgreSQL connectivity failure
32 incidents · 17% of total · highest single categoryPostgreSQL connectivity failures are the largest single failure category, and they are structurally different from most other failures: the source database exists, is healthy, and has data. The ingestion pipeline cannot reach it. Every incident in this category follows the same cascade — the Bronze layer ingest fails, downstream Silver processing has no input, and every dashboard depending on that Silver table surfaces as missing or stale.
The failure presents in three distinct forms. The first is connection pool exhaustion: when multiple Spark executors attempt to open JDBC connections simultaneously to the source database, and the total exceeds the database's max_connections limit, new connections are refused with PSQLException: FATAL: remaining connection slots are reserved.
The second form is network path interruption: a firewall rule change, IP address update on the source side, or DNS resolution failure silently breaks the route between the data lake and the source database. These failures are particularly difficult to diagnose because the error surface is a generic connection timeout rather than a specific network error.
The third form is idle session timeout: long-running ingestion jobs that pause between stages find their JDBC connections terminated by the database's idle connection timeout, causing the subsequent read to fail mid-execution.
Root pattern: the data platform treats source database connectivity as an assumption rather than a contract with observable state. There is no health check before the ingest begins, no connection pool management, and no alerting when the connection path changes. The failure is discovered at runtime, not at the network boundary.
# Cap Spark JDBC parallelism — never exceed max_connections / 4
spark.read.format("jdbc") \
.option("url", pg_url) \
.option("dbtable", "source_schema.target_table") \
.option("numPartitions", "4") \
.option("fetchsize", "10000") \
.option("connectionTimeout", "30000") \
.option("socketTimeout", "300000") \
.load()
# Pre-ingestion connectivity health check
import psycopg2
def check_pg_health(host, port, db, user, pw):
try:
conn = psycopg2.connect(host=host, port=port, dbname=db,
user=user, password=pw, connect_timeout=10)
conn.close()
return True
except Exception as e:
raise RuntimeError(f"Source DB unreachable before ingest: {e}")
# Deploy PgBouncer in transaction pooling mode between Spark and PostgreSQL
# max_client_conn=1000, server_pool_size=20
# This decouples Spark executor count from Postgres max_connections entirely
Category two — Source file not delivered
31 incidents · 16.5% of totalThe second-largest failure category involves file-based ingestion: the source system is expected to place a file in a designated location by a specific time, and the pipeline triggers on a schedule assuming the file will be there. When the file is absent, the pipeline fails with a zero-file load error and all downstream processing halts.
This pattern is particularly damaging in its recurrence. Across this dataset, the same file delivery failure recurred across consecutive days for the same source, with each occurrence treated as a new incident rather than the symptom of an unresolved upstream delivery problem. In one documented case, files for three consecutive business days were delivered in a single batch on the fourth day — causing three scheduled failures followed by a fourth run that silently ingested stale data with no mechanism for downstream consumers to identify the latency.
A secondary manifestation is delivery timing inconsistency: file arrival varies by several hours from day to day. A pipeline scheduled at 08:30 to consume a file that arrives at 12:00 will fail every day it triggers before delivery, generating spurious incidents resolved only by re-triggering after the file eventually appears.
The design principle being violated: a scheduled ingestion pipeline must never be tightly coupled to an assumption about source file timing. The trigger for Bronze layer file ingestion should be file arrival, not clock time. File-sensor-based orchestration eliminates this entire category.
# File arrival sensor — trigger on arrival, not on schedule
from airflow.sensors.s3_key_sensor import S3KeySensor
file_sensor = S3KeySensor(
task_id='wait_for_source_file',
bucket_name='source-data-bucket',
bucket_key='path/to/{{ ds_nodash }}/source_file_*.csv',
wildcard_match=True,
timeout=3600 * 8, # Wait up to 8 hours (business SLA window)
poke_interval=300, # Check every 5 minutes
soft_fail=True, # Raise SLA alert, do not fail the DAG
mode='reschedule' # Release worker slot while waiting
)
# Zero-file guard — never proceed with an empty file list
def validate_file_count(file_list, source_name, business_date):
if len(file_list) == 0:
send_sla_alert(source=source_name, expected_by="08:00",
business_date=business_date)
raise AirflowSkipException(
"No source file received within SLA window. Alert raised."
)
return file_list
Category three — Schema drift and column mismatch
26 incidents · 13.8% of totalSchema drift is the failure category with the most variety in its manifestations and the most consistent cause: source teams change the structure of their output without notifying the ingestion team. In this dataset, schema drift presents in six distinct forms: column names renamed without notice, data types changed incompatibly, delimiter characters switched without warning, extra whitespace in column headers, UDF column ranges shifted, and entirely new columns introduced that break fixed-position parsing.
The most operationally damaging variant is the silent type change — where a source column moves from INTEGER to VARCHAR, or from STRING to INT, without any column name change. The pipeline reads the column successfully but fails at the write stage when it attempts to insert the value into a Bronze table with the original type definition. The error surface is a type cast failure in Spark with no indication at the read stage that the source structure has changed.
A related failure is the precision mismatch: a source column defined as DECIMAL(14,2) mapped to DECIMAL(10,0) in the Bronze table causes silent truncation discovered downstream when Gold-layer aggregations produce incorrect totals.
# Schema contract validation at the Bronze boundary
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
import json
def validate_schema(df: DataFrame, contract_path: str) -> bool:
with open(contract_path) as f:
expected = StructType.fromJson(json.load(f))
incoming = {f.name: f.dataType for f in df.schema.fields}
contract = {f.name: f.dataType for f in expected.fields}
missing = set(contract) - set(incoming)
type_mismatch = {k for k in contract
if k in incoming and incoming[k] != contract[k]}
if missing or type_mismatch:
raise SchemaContractViolation(
f"Missing columns: {missing} | "
f"Type mismatches: {type_mismatch}"
)
extra = set(incoming) - set(contract)
if extra:
log.warning(f"New columns from source (not in contract): {extra}")
# Quarantine — alert source team, do not fail the run
return True
# CSV delimiter auto-detection — never assume ','
import csv
def detect_delimiter(sample_path: str) -> str:
with open(sample_path) as f:
return csv.Sniffer().sniff(f.read(2048)).delimiter
The architecture fix: register a schema contract in a central schema registry at the time of first ingestion. Every subsequent run validates the incoming payload against that contract before writing a single record to Bronze. Non-conforming records go to a quarantine zone. The alert goes to both the ingestion team and the source team simultaneously.
Category four — Source API change and unavailability
17 incidents · 9% of totalAPI-based ingestion failures cluster into two sub-types. The first is the unnotified breaking change — the source team modifies the API endpoint, response schema, or authentication mechanism without coordinating with the data platform team. The pipeline attempts to call an endpoint that no longer exists, receives a response it does not recognise, or fails authentication with credentials that have been rotated.
The second sub-type is the null response: the API call returns HTTP 200 but the response body contains a null payload for the requested business date. This is operationally legitimate (no transactions on a holiday) but indistinguishable at the pipeline level from an API failure unless the pipeline explicitly handles the null case. In the incidents documented here, null API responses caused pipeline failures identical in error signature to connectivity failures, making root cause diagnosis slower.
A third variant is encryption key rotation without coordination: source files delivered with PGP encryption had their decryption keys rotated multiple times over a short period, with each rotation generating a new incident. In one documented case, three different decryption keys were circulated in succession before the correct key was confirmed and the pipeline stabilised.
# API response validation — never trust HTTP 200 alone
import requests
def fetch_with_validation(endpoint, params, expected_fields):
resp = requests.get(endpoint, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
# Null response is a distinct business state — handle cleanly
if data is None or (isinstance(data, list) and len(data) == 0):
log.info(f"Null response from {endpoint}. Writing zero-row metadata.")
write_zero_row_metadata(endpoint, params)
return None # Exit cleanly — do NOT raise exception
# Schema validation
for field in expected_fields:
if field not in data:
raise APIContractViolation(
f"Required field '{field}' missing from API response. "
f"Source team notified."
)
return data
# Decryption keys — always versioned, never hardcoded
import boto3
def get_decryption_key(source_name: str, version='AWSCURRENT') -> str:
client = boto3.client('secretsmanager')
resp = client.get_secret_value(
SecretId=f"ingestion/{source_name}/pgp_key",
VersionStage=version
)
return resp['SecretString']
Category five — Source DB connectivity (non-Postgres)
12 incidents · 6.4% of totalThis category covers connectivity failures to MySQL, MariaDB, and MongoDB source systems. While PostgreSQL failures were concentrated on a single high-volume ingestion path, non-Postgres DB failures were spread across multiple source systems with distinct causes: a MariaDB socket refusing connections, a MongoDB partition helper failing with collStats errors, and a MySQL replica losing its connection entirely.
The MongoDB failure warrants specific attention. The MongoSpark connector's partitioner calls the collStats command to split the collection into read partitions. If the collection is temporarily locked, the command times out and the entire Spark read fails before reading a single document — the ingest simply does not run, no records are written, and the downstream Silver process waits indefinitely for input that will not arrive.
# MongoDB partitioned read with safe fallback on collStats failure
try:
df = spark.read.format("mongodb") \
.option("database", db_name) \
.option("collection", collection_name) \
.load()
except Exception as e:
if "collStats" in str(e) or "Partitioning failed" in str(e):
log.warning("MongoSpark partitioner failed — falling back to "
"single partition read")
df = spark.read.format("mongodb") \
.option("database", db_name) \
.option("collection", collection_name) \
.option("partitioner",
"com.mongodb.spark.sql.connector.read.partitioner"
".SinglePartitionPartitioner") \
.load()
else:
raise
Category six — Empty and absent source data
11 incidents · 5.9% of totalThis category exposes a fundamental design gap: the inability to distinguish between "no data today" (a legitimate business state) and "something went wrong" (a failure requiring intervention). In this dataset, empty source data caused pipeline failures because the downstream write operation received an empty DataFrame and raised a division-by-zero equivalent — passing batch_size=0 to a batch write function — or attempted to write zero rows to a target expecting at least one, triggering schema inference failures.
The correct behaviour for an empty source is deterministic and cheap to implement: write a metadata record indicating a zero-row load for the business date, continue the DAG without failure, and alert on a separate monitoring channel if zero-row loads occur consecutively beyond a configurable threshold.
# Zero-row guard — handle empty source as a first-class execution path
from datetime import datetime
def safe_write_bronze(df, target_table, business_date, source_name):
row_count = df.count()
if row_count == 0:
# Write audit record — never fail on empty source
audit_record = spark.createDataFrame([{
"business_date": business_date,
"source_name": source_name,
"target_table": target_table,
"rows_loaded": 0,
"status": "ZERO_ROWS",
"load_ts": datetime.utcnow().isoformat()
}])
audit_record.write.mode("append").saveAsTable("ops.ingestion_audit")
log.warning(f"Zero rows from {source_name} for {business_date}. "
f"Audit record written. Exiting cleanly.")
return 0
df.write.mode("append").saveAsTable(target_table)
log.info(f"Loaded {row_count:,} rows to {target_table}")
return row_count
Category seven — Kafka connectivity and IP whitelisting
10 incidents · 5.3% of totalKafka-related failures in this dataset are almost entirely network governance failures rather than Kafka instability. The dominant sub-pattern is IP whitelisting lag: when the data platform's Kafka Connect worker nodes are assigned new IPs — through autoscaling, node replacement, or infrastructure changes — the Kafka broker's IP allowlist is not updated simultaneously. The consumer cannot connect. All topic consumption stalls silently until whitelisting is updated manually.
A secondary failure is topic retention policy misconfiguration: a Kafka topic configured with cleanup.policy=delete on a broker expecting cleanup.policy=compact causes the Kafka Connect setup to error at startup. A third variant is consumer group session timeout under high load: the session timeout is exceeded, the coordinator removes the consumer from the group and triggers a rebalance. Without session timeout tuning, the same failure recurs under the same load conditions.
# Kafka Connect consumer tuning for production load
# consumer.properties
session.timeout.ms=45000 # Default 10s too short under load
heartbeat.interval.ms=3000 # Must be < session.timeout.ms / 3
max.poll.interval.ms=300000 # 5 min for slow-processing batches
max.poll.records=500 # Cap per poll to avoid session timeout
auto.offset.reset=earliest # Replay from last committed on restart
enable.auto.commit=false # Commit offsets explicitly
# Align topic retention policy — always check before connecting
kafka-configs.sh --bootstrap-server broker:9092 \
--entity-type topics --entity-name your-topic \
--alter \
--add-config cleanup.policy=compact,retention.ms=604800000
# IP whitelisting — automate via ASG lifecycle hook
# Never rely on a manual process that fires after infrastructure changes
Category eight — Database locks, encryption changes, format failures, SLA breaches
22 incidents · ~10% of total · four distinct sub-patternsDatabase lock contention
When a pipeline runs in OVERWRITE mode against a PostgreSQL table, it issues a TRUNCATE before writing — requiring an Access Exclusive Lock. If any concurrent query holds an open transaction against the same table, the truncate blocks indefinitely and the pipeline sits in a running state until the blocking transaction is terminated or the pipeline times out. The resolution is always manual termination of blocking queries. The preventable variant is to replace truncate-and-reload with upsert, eliminating the exclusive lock requirement entirely.
Encryption key rotation without coordination
Source systems rotated PGP decryption keys multiple times in short succession without coordinating with the data platform team. Each rotation generated a new ingestion failure. The fix is centralised secrets management: store all keys in AWS Secrets Manager or equivalent, with a rotation workflow that updates both the source system and the ingestion platform simultaneously, never sequentially.
Timestamp and timezone format mismatch
Parquet files delivered with nanosecond timestamp precision failed to write to Iceberg tables configured for microsecond precision. Athena, which stores query results in UTC, produced mismatched results when compared against Iceberg tables storing timestamps in IST — causing reconciliation failures diagnosed as data discrepancies rather than timezone handling issues.
Source upload SLA breach
File delivery schedules were not enforced by any contract. Files expected by 08:00 arrived at 11:00 or 12:00. Three days of files were delivered in a single batch on the fourth day. The pipeline schedule and the source delivery schedule were never formally aligned, producing a category of failures that is operationally invisible until it hits production.
-- Replace OVERWRITE / TRUNCATE with upsert to eliminate lock contention
INSERT INTO bronze.target_table (id, col1, col2, updated_at)
SELECT id, col1, col2, now() FROM staging.source_extract
ON CONFLICT (id) DO UPDATE SET
col1 = EXCLUDED.col1, col2 = EXCLUDED.col2,
updated_at = EXCLUDED.updated_at;
-- Timezone-safe timestamp handling
-- Always cast at query time — never rely on storage timezone assumption
SELECT
CAST(event_ts AT TIME ZONE 'UTC' AS TIMESTAMP) AS event_ts_utc
FROM bronze.events_table
WHERE DATE(event_ts AT TIME ZONE 'Asia/Kolkata') = DATE '2026-03-15';
The complete prevention framework — 18 engineering controls
Applied as a standard operating baseline, these controls would have prevented approximately 85% of the incidents documented here.
numPartitions so total concurrent connections never approach the database's max_connections limit.session.timeout.ms to at least 45 seconds for high-volume consumers. Set max.poll.interval.ms to accommodate the slowest expected processing batch. Default values are sized for development, not production.| delimiters delivered to a pipeline expecting , delimiters should be quarantined — not silently mis-parsed into incorrect records.DECIMAL(14,2) truncated to DECIMAL(10,0) — produces incorrect totals in Gold-layer reporting.The source reliability tier model
A source that fails consistently should be managed differently from one that has failed once. The following classification gives operations teams a structured framework for proportionate monitoring investment.
What this incident record tells us about medallion architecture
Medallion architecture is a sound design. Bronze, Silver, Gold as a progressive quality model works. The architecture is not what fails. What fails is the assumption that the Bronze layer can be designed in isolation from the reliability characteristics of its sources.
Every failure in this dataset originated outside the data platform. The platform code was correct. The orchestration was correct. The Spark jobs executed exactly as designed. What was absent was a systematic model for managing source unreliability — a model that treats connectivity failures, schema drift, missing files, and API breaks not as exceptional events but as routine operational inputs the platform must absorb without cascading.
The Bronze layer is not just a storage zone. It is a resilience boundary. Designing it as one changes every engineering decision made about what sits in front of it.
The eighteen controls listed above are not aspirational. They are the specific interventions that, applied systematically, would have prevented the large majority of 188 incidents documented here. Most are configuration and code changes that take days to implement. None require a platform rebuild. All require treating source reliability as a first-class engineering concern — not an operational afterthought managed through incident tickets.
What causes PostgreSQL connection failures in data ingestion pipelines?
The most common cause is connection pool exhaustion — too many Spark executors opening JDBC connections simultaneously, exceeding the database's max_connections limit. Secondary causes include network instability, IP or firewall rule changes, and idle session timeouts. The fix is PgBouncer in transaction pooling mode, capping numPartitions on JDBC reads, and a pre-ingestion connectivity health check before any read is attempted.
How should a medallion architecture handle missing source files?
Replace schedule-based triggers with file arrival sensors. A sensor polls the source location at a configurable interval and waits up to the SLA window before raising a structured alert. The pipeline exits gracefully on no-file. A zero-file guard prevents any write operation from proceeding with an empty file list, eliminating the downstream cascade entirely.
What is schema drift and how does it break medallion pipelines?
Schema drift occurs when a source changes column names, data types, delimiters, or structure without notifying the ingestion team. Prevention requires a schema contract registry that validates every incoming payload before writing to Bronze. Non-conforming records go to a quarantine zone. The Bronze table is never written with mismatched schema.
How do you handle empty source data in a Bronze layer pipeline?
Empty source data is a legitimate business state. Count rows before any write operation. On zero rows, write a metadata record to the ingestion audit table for that business date and exit cleanly without failing. Downstream Silver processes must be designed to tolerate zero-row Bronze loads without failing themselves.
How do you prevent API contract changes from breaking ingestion pipelines?
Source teams must commit to a versioned API contract with a mandatory advance notice period for any breaking change. Every API call must validate the response schema before processing. Null responses are handled as zero-row loads. Credentials are stored in a secrets manager and updated automatically on rotation — never manually propagated.
Found this useful? I write weekly on cloud data infrastructure, AWS engineering, and the operational discipline that separates data platforms that survive regulatory review from those that do not. Subscribe to the newsletter — no spam, unsubscribe anytime.
Working on a data platform reliability or ingestion architecture challenge? Get in touch directly.
Raj Thilak is Head of Technology for Data & Analytics with 24 years of experience in BFSI technology leadership across Citi, Standard Chartered, and Accenture. He directs large-scale engineering programmes for financial services data platforms on AWS. Based in Pune, India. rajthilak.dev
Found this useful? Subscribe for weekly insights.
Join the conversation
Loading comments...