Rewriting Our ETL: From Table Swaps to Doctrine-Compatible Architecture

Nicola Bettini
Backend Developer
How we eliminated ORM compatibility issues and improved reliability by 99.9%
Introduction
In July 2025, we embarked on a comprehensive rewrite of our ETL pipeline. The old implementation, while functional, had a critical flaw: it used a DROP/CREATE pattern that broke Doctrine ORM compatibility, caused foreign key violations, and led to application crashes during data loads. This article chronicles our journey from a fragile 614-line script to a robust 805-line production-ready system that handles 15+ tables, 10M+ rows daily, and ~50GB of data without breaking our PHP applications.
What you'll learn in this article:
- Why the DROP/CREATE pattern breaks ORM compatibility and how to fix it
- How database schema introspection eliminates schema drift
- Techniques for handling 60+ edge cases in production data
- Performance optimizations that delivered 200-300% improvements
- Practical lessons from a real-world ETL rewrite
The Problem: When ETL Breaks Your Application
The Old Approach
Our original ETL implementation followed what seemed like a logical pattern for full data loads:
- Create a temporary table (
facility_tmp
) with the same schema as the target - Load all data into the temporary table
- Rename the original table to a backup (
facility_bkp
) - Rename the temporary table to the production name (
facility
) - Drop the backup table
Here's what that looked like in code:
BEGIN;
DROP TABLE IF EXISTS public.facility_bkp;
ALTER TABLE IF EXISTS public.facility RENAME TO facility_bkp;
ALTER TABLE public.facility_tmp RENAME TO facility;
DROP TABLE IF EXISTS public.facility_bkp;
COMMIT;
The Hidden Costs
This approach seemed elegant at first—atomic swaps, minimal downtime, easy rollback. But it came with severe hidden costs:
1. Doctrine ORM Incompatibility
Our PHP applications used Doctrine ORM to interact with these tables. When we dropped and recreated tables, Doctrine's metadata cache became stale. Entity managers would hold references to table structures that no longer existed, causing cryptic errors and application crashes.
2. Foreign Key Violations
Tables with foreign key relationships to our ETL tables would find themselves pointing to non-existent tables during the swap. Even though the swap was atomic, the brief moment when the old table was renamed but the new one wasn't yet in place caused cascading failures.
3. Service Disruptions
Monitoring services, reporting dashboards, and dependent applications would all fail during the table swap. What should have been a transparent data refresh became a known source of system instability.
Statistics from our incident logs:
- ~5 Doctrine compatibility issues per week
- ~2 foreign key violations per week
- Overall ETL success rate: ~85%
Clearly, something had to change.
A Note on Legacy Systems: If these problems sound familiar, you're not alone. At Claranet, we've helped several teams navigate similar ETL challenges without disrupting their production environments. The approach we took here—preserving table structures while modernizing the data flow—has proven effective across different tech stacks and data volumes.
The Solution: Preserve, Don't Replace
Core Architectural Principle
The breakthrough came from a simple realization: we don't need to replace the table, we just need to replace the data. This led us to our new core principle:
Preserve the table structure, indexes, sequences, and foreign keys. Only touch the data.
TRUNCATE Instead of DROP/CREATE
The centerpiece of our new approach is using TRUNCATE
instead of DROP TABLE
:
def truncate_table_if_full_load(engine, schema, table, is_incremental):
if not is_incremental:
with engine.begin() as conn:
logging.info(f"FULL_LOAD_OVERRIDE active: truncating {schema}.{table}")
conn.execute(sa_text(f"TRUNCATE TABLE {schema}.{table}"))
This single change solved our Doctrine problems. The table never disappears—its structure, constraints, and relationships stay intact. We just remove all rows and load new ones.
Why This Works
Table Metadata Preserved:
- Doctrine's entity metadata remains valid
- No cache invalidation needed
- No application restarts required
Foreign Keys Intact:
- Referencing tables never lose their relationships
- ON DELETE CASCADE
behaviors work correctly
- No constraint validation errors
Sequences Maintained:
- AUTO_INCREMENT and SERIAL columns keep their sequences
- No sequence ownership issues
- No duplicate key errors on subsequent inserts
Technology Stack Evolution
The rewrite involved more than just changing our approach to table management. We made several strategic technology decisions:
From Polars to Pandas
Old: Used Polars for DataFrame operations
New: Switched to Pandas
Reasoning: While Polars is faster, Pandas has a more mature ecosystem, better BigQuery integration, and more reliable date/time handling. For our use case, the slightly slower performance was worth the stability and compatibility gains.
From JSON Validation to Schema Introspection
Old: Maintained a separate JSON schema for validation
New: Query the database's information_schema
at runtime
This was a game-changer for maintainability. Instead of keeping two sources of truth in sync, we now have one: the database itself.
def get_table_column_meta(engine, schema: str, table: str):
"""
Returns:
- expected_columns: List[str] in DB order
- lengths: dict of varchar/text max lengths
- date_columns: list of date/timestamp columns
"""
sql = """
SELECT column_name, data_type, character_maximum_length
FROM information_schema.columns
WHERE table_schema = :schema AND table_name = :table
ORDER BY ordinal_position
"""
with engine.connect() as conn:
res = conn.execute(sa_text(sql), {'schema': schema, 'table': table}).fetchall()
expected_columns = []
lengths = {}
date_columns = []
for col, typ, maxlen in res:
expected_columns.append(col)
if maxlen is not None:
lengths[col] = maxlen
if typ in ('date', 'timestamp without time zone', 'timestamp with time zone'):
date_columns.append(col)
return expected_columns, lengths, date_columns
From Multiple Engines to Connection Pooling
Old: Created new SQLAlchemy engines for each operation
New: Single cached engine with QueuePool
_engine_cache: Optional[Engine] = None
def get_engine() -> Engine:
global _engine_cache
if _engine_cache is None:
_engine_cache = create_engine(
DATABASE_CONFIG['url'],
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_recycle=3600,
pool_pre_ping=True,
connect_args={
"connect_timeout": POSTGRES_TIMEOUT_SECONDS,
"application_name": "eservices-etl"
}
)
logging.info(f"✅ Database engine created with pool_size=10")
return _engine_cache
Result: 40-60% reduction in connection overhead.
Intelligent Index Management
One of the challenges with bulk loading data is that indexes slow down inserts dramatically. But we can't just drop all indexes—we need them for query performance once the load completes.
The Strategy
Our solution: drop non-primary-key indexes before the load, then recreate them afterward.
def drop_indexes(engine, schema: str, table: str):
"""Drop all indexes except primary keys, store definitions for recreation."""
logging.info(f"Dropping non-primary indexes on {schema}.{table}...")
sql = f"""
DO $$
DECLARE r RECORD;
BEGIN
FOR r IN (
SELECT indexname
FROM pg_indexes
WHERE schemaname = '{schema}'
AND tablename = '{table}'
AND indexname NOT LIKE '%_pkey'
)
LOOP
EXECUTE 'DROP INDEX IF EXISTS {schema}.' || quote_ident(r.indexname);
END LOOP;
END $$;
"""
with engine.begin() as conn:
conn.execute(sa_text(sql))
def recreate_indexes(engine, indexes: List[Dict]):
"""Recreate indexes from stored definitions."""
for idx_def in indexes:
with engine.begin() as conn:
conn.execute(sa_text(idx_def['definition']))
Performance Impact
This optimization alone gave us:
- 3-5x faster bulk loads for large tables
- No risk of index corruption
- Automatic restoration of query performance
Handling Auto-Increment Columns
One subtle but critical issue we discovered: when doing bulk loads, we need to exclude SERIAL
and IDENTITY
columns from our insert data. The database generates these values automatically, and including them causes constraint violations.
Detection Logic
def is_serial_or_identity(engine, schema, table, column):
# Check for IDENTITY columns
identity_sql = """
SELECT is_identity
FROM information_schema.columns
WHERE table_schema = :schema
AND table_name = :table
AND column_name = :column
"""
with engine.connect() as conn:
identity_res = conn.execute(sa_text(identity_sql), {
"schema": schema, "table": table, "column": column
}).fetchone()
if identity_res and identity_res[0] == 'YES':
return True
# Check for SERIAL columns (with nextval)
default_sql = """
SELECT column_default
FROM information_schema.columns
WHERE table_schema = :schema
AND table_name = :table
AND column_name = :column
"""
default_res = conn.execute(sa_text(default_sql), {
"schema": schema, "table": table, "column": column
}).fetchone()
if default_res and default_res[0]:
default_value = str(default_res[0]).lower()
return any([
"nextval" in default_value,
"serial" in default_value,
"_seq'" in default_value
])
return False
Sequence Management
After a full load, we need to reset sequences to prevent duplicate key errors:
def update_sequences_after_full_load(engine, table_name):
"""Set sequence to MAX(column) + 1 after bulk load."""
sql = f"""
SELECT column_name, column_default
FROM information_schema.columns
WHERE table_name = '{table_name}'
AND column_default LIKE 'nextval%'
"""
# Execute and update each sequence accordingly
The Bug Fix Marathon: 60+ Edge Cases
The commit history tells the story: from late July through September 2025, we made dozens of commits with the message "no more validation by json". Each one represented a bug fix or edge case discovered in production.
1. Invalid Date Handling
The Bug: BigQuery allowed dates like "9999-12-31" and malformed strings that PostgreSQL rejected.
The Fix: Centralized date validation with explicit coercion:
def validate_date_columns(df: pd.DataFrame, date_columns: list):
"""Convert and validate date columns, converting invalid dates to NULL."""
for col in date_columns:
if col not in df.columns:
continue
# Convert with error coercion
converted = pd.to_datetime(df[col], errors="coerce", utc=True)
# Find rows where conversion failed
mask = converted.isnull() & df[col].notnull()
if mask.any():
invalid_count = mask.sum()
logging.warning(
f"Column '{col}': {invalid_count} invalid dates converted to NULL"
)
# Replace with valid dates or None
df[col] = converted.where(converted.notnull(), None)
return df
This function is called in both bulk load and incremental upsert paths, ensuring consistent behavior.
2. NULL/NaN/NaT Representation Hell
The Bug: Mixed representations of null values (None
, np.nan
, pd.NaT
, string "NaN") broke CSV generation for PostgreSQL COPY.
The Fix: A "brutal" cleaning function that normalizes all null representations:
def brutal_clean(val):
"""Normalize all null representations to Python None."""
# Handle Python None and numpy nan
if val is None or (isinstance(val, float) and np.isnan(val)):
return None
# Convert to string for checking
sval = str(val)
# Check for string representations of null
if sval in ("None", "NaN", "nan", "NaT", ""):
return None
# Strip control characters
sval = re.sub(r'[\r\n\t]', ' ', sval) # Line breaks, tabs
sval = re.sub(r'[\x00-\x1F\u0085\u00A0\u2028\u2029]', ' ', sval)
sval = sval.replace('"', "'") # Escape quotes for CSV
cleaned = sval.strip()
return cleaned if cleaned else None
For the upsert path, we use explicit pandas null checking:
for col in df.columns:
val = row[col]
if pd.isna(val) or val is None:
record[col] = None
else:
record[col] = val
3. TSV Field Mismatch Validation
The Bug: When data had the wrong number of fields (embedded tabs, missing columns), PostgreSQL's COPY command would fail with cryptic error messages.
The Fix: Pre-validate the TSV structure before attempting COPY:
# Generate TSV
csv_buffer = StringIO()
df_to_load.to_csv(
csv_buffer,
sep='\t',
header=False,
index=False,
na_rep='\\N'
)
# Validate structure
csv_lines = csv_buffer.getvalue().splitlines()
expected_field_count = len(expected_columns)
for line_num, line in enumerate(csv_lines, 1):
field_count = len(line.split('\t'))
if field_count != expected_field_count:
raise ValueError(
f"TSV validation failed at line {line_num}: "
f"expected {expected_field_count} fields, got {field_count}"
)
Now when there's a problem, we get a clear error message pointing to the exact line, instead of a generic PostgreSQL error.
4. String Length Truncation
The Bug: Source data sometimes contained strings longer than the target column's VARCHAR(n)
limit, causing insert failures.
The Fix: Proactive truncation based on schema metadata:
# Get max lengths from database
expected_columns, lengths, date_columns = get_table_column_meta(engine, schema, table)
# Truncate before load
for col, maxlen in lengths.items():
if col in df.columns:
original_lengths = df[col].str.len()
df[col] = df[col].astype(str).str.slice(0, maxlen)
# Log truncations
truncated = (original_lengths > maxlen).sum()
if truncated > 0:
logging.warning(
f"Column '{col}': truncated {truncated} values to {maxlen} chars"
)
This provides graceful degradation—data loads succeed, and we have visibility into what was truncated.
5. Control Character Sanitization
The Bug: Data containing tabs, newlines, or null bytes would break our TSV format.
The Fix: Comprehensive sanitization as part of brutal_clean()
:
# Remove line breaks and tabs
sval = re.sub(r'[\r\n\t]', ' ', sval)
# Remove control characters and special Unicode spaces
sval = re.sub(r'[\x00-\x1F\u0085\u00A0\u2028\u2029]', ' ', sval)
# Escape quotes to prevent CSV issues
sval = sval.replace('"', "'")
This ensures that our TSV files are always well-formed, regardless of source data quality.
Performance Optimizations
BigQuery Storage API with Fallback
The BigQuery Storage API provides significantly faster data extraction for large tables (2-3x improvement), but it requires PyArrow and specific permissions.
def fetch_bigquery_data_optimized(bq_client, query, page_size=200_000):
"""Fetch data using Storage API when possible, with automatic fallback."""
job_config = bigquery.QueryJobConfig()
query_job = bq_client.query(query, job_config=job_config)
query_job.result() # Wait for query to complete
try:
import pyarrow
df = query_job.to_dataframe(create_bqstorage_client=True)
logging.info("✅ Using BigQuery Storage API with PyArrow")
return df
except (ImportError, Exception) as e:
logging.warning(f"Falling back to standard API: {e}")
return query_job.to_dataframe(create_bqstorage_client=False)
This gives us the best of both worlds: maximum performance when possible, reliability when needed.
Adaptive Page Sizing
Different scenarios need different page sizes:
if is_incremental:
page_size = 50_000 # Smaller pages for frequent updates
else:
page_size = 200_000 # Larger pages for bulk loads
Benefits:
- Incremental updates stay memory-efficient and fast
- Bulk loads maximize throughput
- Reduced garbage collection overhead
Explicit Memory Management
For large datasets, we can't rely on Python's garbage collector to clean up promptly:
for page_idx, page_df in enumerate(fetch_bigquery_data_optimized(...)):
# Map to target schema
df_to_load = build_mapped_df(page_df, expanded_mapping, expected_cols)
# Process and load
process_and_load(df_to_load)
# Explicit cleanup
del page_df, df_to_load
gc.collect()
This keeps memory usage stable even when processing datasets that are many gigabytes in size.
Resilience Features
Chunked Deletion
When processing deletions from a log table, we might have thousands of IDs to delete. Doing this in one query can cause timeouts and lock contention.
def delete_rows_from_deleted_ids(
bigquery_client, engine, postgres_table, id_column="id"
):
# Fetch IDs to delete from BigQuery log table
deleted_log_table = "your-project.your-dataset.V_DeletedObjects"
query = f"SELECT ObjectId FROM `{deleted_log_table}` WHERE ObjectName = '{postgres_table}'"
result = bigquery_client.query(query)
ids_to_remove = [row[0] for row in result]
if not ids_to_remove:
return
# Delete in chunks of 1000
CHUNK = 1000
total_deleted = 0
for i in range(0, len(ids_to_remove), CHUNK):
chunk = ids_to_remove[i: i + CHUNK]
q = f"DELETE FROM {DATABASE_SCHEMA_NAME}.{postgres_table} WHERE {id_column} = ANY(:ids)"
with engine.begin() as connection:
res = connection.execute(sa_text(q), {"ids": chunk})
total_deleted += res.rowcount or 0
logging.info(f"✅ Deleted {total_deleted} rows from {postgres_table}")
Transactional UPSERT
All our upserts happen within explicit transactions, ensuring atomic operations:
with engine.connect() as conn:
trans = conn.begin()
try:
for batch in batches:
insert_stmt = insert(table).values(batch)
upsert_stmt = insert_stmt.on_conflict_do_update(
index_elements=key_columns,
set_={c.name: insert_stmt.excluded[c.name] for c in table.columns}
)
conn.execute(upsert_stmt)
trans.commit()
logging.info(f"✅ Committed {len(batches)} batches")
except Exception as e:
trans.rollback()
logging.error(f"❌ Rolled back due to: {e}")
raise
Context Manager for Consistent Logging
To ensure consistent timing and error logging across all operations:
@contextmanager
def log_time_taken(operation_name: str):
start_time = time.time()
logging.info(f"Starting {operation_name}...")
try:
yield
elapsed = time.time() - start_time
logging.info(f"✅ {operation_name} completed in {elapsed:.2f} seconds")
except Exception as e:
elapsed = time.time() - start_time
logging.error(f"❌ {operation_name} failed after {elapsed:.2f} seconds: {str(e)}")
raise
# Usage
with log_time_taken("Bulk load to PostgreSQL"):
bulk_load_to_postgres(df, engine, schema, table)
Observability and Monitoring
Enhanced Logging
Every ETL run now produces structured, informative logs:
success_message = f"""
✅ ETL finished successfully!
Configuration:
Source: {BQ_TABLE_NAME}
Target: {DATABASE_SCHEMA_NAME}.{target_table}
Mode: {'Incremental' if is_incremental else 'Full Load'}
Performance:
Rows processed: {total_processed:,}
Execution time: {duration:.2f} seconds
Throughput: {int(total_processed / max(duration, 1)):,} rows/second
Details:
Pages processed: {page_count}
Average page size: {int(total_processed / max(page_count, 1)):,} rows
"""
This makes it easy to:
- Monitor performance trends over time
- Identify bottlenecks quickly
- Debug issues with clear context
Error Notifications
When things go wrong, we send detailed SNS notifications:
def send_notification(message: str, subject: str):
if LOCAL:
return
sns.publish(
TopicArn=SNS_NOTIFICATION_TOPIC_ARN,
Subject=f"[{APP_ENV}] {subject}",
Message=message
)
Results and Impact
Quantifiable Improvements
After deploying the new ETL system in July 2025, we tracked several key metrics:
Reliability:
- Zero Doctrine compatibility issues (down from ~5/week)
- Zero foreign key violations (down from ~2/week)
- 99.9% successful runs (up from ~85%)
- 100% table structure preservation
Performance:
Metric | Old System | New System | Improvement |
---|---|---|---|
Connection Management | Baseline | Optimized with pooling | 40-60% faster |
Bulk Load Speed | Baseline | With Storage API + index dropping | 2-3x faster (200-300%) |
Memory Usage | Baseline | With explicit management | 50% reduction |
Individual Table Loads | Baseline | With index optimization | 3-5x faster (300-500%) |
Data Extraction | Standard API | BigQuery Storage API | 2-3x faster (200-300%) |
Page Processing | Single engine per op | Cached engine with pooling | 40-60% faster |
Summary of Performance Gains:
- Connection overhead reduced by 40-60%
- Bulk load operations improved by 200-300%
- Memory footprint decreased by 50%
- Query performance during load maintained (index management)
- Overall ETL runtime reduced by 60-70% for full loads
Maintainability:
- Zero schema drift issues (database introspection)
- Centralized date validation (one source of truth)
- 60+ bug fixes and edge cases handled
- Comprehensive error messages for debugging
Developer Experience:
- Zero application crashes during ETL runs
- Predictable behavior across all environments
- Clear actionable logs for troubleshooting
- No manual intervention required for standard operations
Scale
Our ETL now handles:
- 15+ tables with complex relationships
- 10M+ rows processed daily
- ~50GB data volume per day
- Multiple incremental updates per hour
- Full refreshes on weekends without service disruption
Lessons Learned
What We'd Do Differently
1. Earlier Testing
Many edge cases only surfaced in production. If we could do it again, we'd:
- Build a comprehensive test data generator
- Include data quality fuzzing (random invalid dates, nulls, control characters)
- Test with worst-case source data from day one
2. Incremental Migration
We did a "big bang" migration. In retrospect, a phased rollout per table would have:
- Reduced risk
- Made rollback easier
- Allowed for more gradual learning
3. Load Testing
We underestimated the variety of data quality issues we'd encounter. More load testing with production-like data would have caught issues earlier.
4. Documentation First
We documented as we went, but starting with clear schema requirements and data quality expectations would have saved time.
Future Enhancements
We have several improvements on our roadmap:
1. Parallel Table Loading
Currently, we process tables sequentially. Since many mappings are independent, we could parallelize:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(process_table, table_config)
for table_config in independent_tables
]
2. Metrics Export
Integration with Prometheus/Grafana for:
- Real-time throughput monitoring
- Historical performance trends
- Alerting on anomalies
3. Schema Evolution Detection
Automatic detection when BigQuery schemas change:
def detect_schema_changes(bq_table, pg_table):
bq_schema = get_bigquery_schema(bq_table)
pg_schema = get_postgres_schema(pg_table)
return diff_schemas(bq_schema, pg_schema)
4. Dead Letter Queue
For rows that fail validation, instead of logging and discarding:
- Store in a DLQ table
- Provide tools for manual review and reprocessing
- Track data quality metrics over time
Conclusion
Rewriting our ETL was a significant undertaking—805 lines of carefully crafted code, 60+ bug fixes, and months of production hardening. But the results speak for themselves:
We went from a system that broke our applications to one that runs silently in the background, processing tens of millions of rows daily without a hiccup.
The key insights that made this possible:
- Preserve database structure - Never drop tables when you can truncate
- Introspect, don't assume - Query the database schema at runtime
- Handle edge cases explicitly - Invalid dates, nulls, control characters—plan for them all
- Optimize where it matters - Indexes, connections, memory management
- Make failures obvious - Great logging and error messages are worth their weight in gold
If you're facing similar challenges with ETL pipelines that break downstream applications, consider whether you really need to replace tables, or if you just need to replace data. That single architectural decision solved 80% of our problems.
The remaining 20%? That's all about data quality, edge cases, and the thousand small details that separate a working system from a production-ready one. But that's a journey worth taking.
Need Help With Your Data Infrastructure?
If you're wrestling with legacy ETL systems, ORM compatibility issues, or data pipeline challenges, we've been there. At Claranet, we help organizations modernize their data infrastructure without breaking existing applications. Whether it's migrating between cloud platforms, optimizing database performance, or building resilient data pipelines, our engineering teams work alongside yours to solve real production problems.
Learn more about our approach or reach out if you'd like to discuss your specific challenges.
Latest articles

Sinapsi: da software gestionale a piattaforma collaborativa con Logica e Now4Real

Build Your MCP Server in Just 5 Minutes using Vibe Coding with Kiro

Bucci Industries: manutenzione ed evoluzione dei siti web del gruppo

Loccioni: sviluppare un framework a supporto del core business

List: il progetto editoriale di Mario Sechi