14 ottobre 2025

Rewriting Our ETL: From Table Swaps to Doctrine-Compatible Architecture

Nicola Bettini

Nicola Bettini

Backend Developer

How we eliminated ORM compatibility issues and improved reliability by 99.9%

Introduction

In July 2025, we embarked on a comprehensive rewrite of our ETL pipeline. The old implementation, while functional, had a critical flaw: it used a DROP/CREATE pattern that broke Doctrine ORM compatibility, caused foreign key violations, and led to application crashes during data loads. This article chronicles our journey from a fragile 614-line script to a robust 805-line production-ready system that handles 15+ tables, 10M+ rows daily, and ~50GB of data without breaking our PHP applications.

What you'll learn in this article:
- Why the DROP/CREATE pattern breaks ORM compatibility and how to fix it
- How database schema introspection eliminates schema drift
- Techniques for handling 60+ edge cases in production data
- Performance optimizations that delivered 200-300% improvements
- Practical lessons from a real-world ETL rewrite


The Problem: When ETL Breaks Your Application

The Old Approach

Our original ETL implementation followed what seemed like a logical pattern for full data loads:

  1. Create a temporary table (facility_tmp) with the same schema as the target
  2. Load all data into the temporary table
  3. Rename the original table to a backup (facility_bkp)
  4. Rename the temporary table to the production name (facility)
  5. Drop the backup table

Here's what that looked like in code:

BEGIN;
    DROP TABLE IF EXISTS public.facility_bkp;
    ALTER TABLE IF EXISTS public.facility RENAME TO facility_bkp;
    ALTER TABLE public.facility_tmp RENAME TO facility;
    DROP TABLE IF EXISTS public.facility_bkp;
COMMIT;

The Hidden Costs

This approach seemed elegant at first—atomic swaps, minimal downtime, easy rollback. But it came with severe hidden costs:

1. Doctrine ORM Incompatibility

Our PHP applications used Doctrine ORM to interact with these tables. When we dropped and recreated tables, Doctrine's metadata cache became stale. Entity managers would hold references to table structures that no longer existed, causing cryptic errors and application crashes.

2. Foreign Key Violations

Tables with foreign key relationships to our ETL tables would find themselves pointing to non-existent tables during the swap. Even though the swap was atomic, the brief moment when the old table was renamed but the new one wasn't yet in place caused cascading failures.

3. Service Disruptions

Monitoring services, reporting dashboards, and dependent applications would all fail during the table swap. What should have been a transparent data refresh became a known source of system instability.

Statistics from our incident logs:
- ~5 Doctrine compatibility issues per week
- ~2 foreign key violations per week
- Overall ETL success rate: ~85%

Clearly, something had to change.

A Note on Legacy Systems: If these problems sound familiar, you're not alone. At Claranet, we've helped several teams navigate similar ETL challenges without disrupting their production environments. The approach we took here—preserving table structures while modernizing the data flow—has proven effective across different tech stacks and data volumes.

The Solution: Preserve, Don't Replace

Core Architectural Principle

The breakthrough came from a simple realization: we don't need to replace the table, we just need to replace the data. This led us to our new core principle:

Preserve the table structure, indexes, sequences, and foreign keys. Only touch the data.

TRUNCATE Instead of DROP/CREATE

The centerpiece of our new approach is using TRUNCATE instead of DROP TABLE:

def truncate_table_if_full_load(engine, schema, table, is_incremental):
    if not is_incremental:
        with engine.begin() as conn:
            logging.info(f"FULL_LOAD_OVERRIDE active: truncating {schema}.{table}")
            conn.execute(sa_text(f"TRUNCATE TABLE {schema}.{table}"))

This single change solved our Doctrine problems. The table never disappears—its structure, constraints, and relationships stay intact. We just remove all rows and load new ones.

Why This Works

Table Metadata Preserved:
- Doctrine's entity metadata remains valid
- No cache invalidation needed
- No application restarts required

Foreign Keys Intact:
- Referencing tables never lose their relationships
- ON DELETE CASCADE behaviors work correctly
- No constraint validation errors

Sequences Maintained:
- AUTO_INCREMENT and SERIAL columns keep their sequences
- No sequence ownership issues
- No duplicate key errors on subsequent inserts

Technology Stack Evolution

The rewrite involved more than just changing our approach to table management. We made several strategic technology decisions:

From Polars to Pandas

Old: Used Polars for DataFrame operations
New: Switched to Pandas

Reasoning: While Polars is faster, Pandas has a more mature ecosystem, better BigQuery integration, and more reliable date/time handling. For our use case, the slightly slower performance was worth the stability and compatibility gains.

From JSON Validation to Schema Introspection

Old: Maintained a separate JSON schema for validation
New: Query the database's information_schema at runtime

This was a game-changer for maintainability. Instead of keeping two sources of truth in sync, we now have one: the database itself.

def get_table_column_meta(engine, schema: str, table: str):
    """
    Returns:
      - expected_columns: List[str] in DB order
      - lengths: dict of varchar/text max lengths
      - date_columns: list of date/timestamp columns
    """
    sql = """
        SELECT column_name, data_type, character_maximum_length
        FROM information_schema.columns
        WHERE table_schema = :schema AND table_name = :table
        ORDER BY ordinal_position
    """
    with engine.connect() as conn:
        res = conn.execute(sa_text(sql), {'schema': schema, 'table': table}).fetchall()

    expected_columns = []
    lengths = {}
    date_columns = []

    for col, typ, maxlen in res:
        expected_columns.append(col)
        if maxlen is not None:
            lengths[col] = maxlen
        if typ in ('date', 'timestamp without time zone', 'timestamp with time zone'):
            date_columns.append(col)

    return expected_columns, lengths, date_columns

From Multiple Engines to Connection Pooling

Old: Created new SQLAlchemy engines for each operation
New: Single cached engine with QueuePool

_engine_cache: Optional[Engine] = None

def get_engine() -> Engine:
    global _engine_cache
    if _engine_cache is None:
        _engine_cache = create_engine(
            DATABASE_CONFIG['url'],
            poolclass=QueuePool,
            pool_size=10,
            max_overflow=20,
            pool_recycle=3600,
            pool_pre_ping=True,
            connect_args={
                "connect_timeout": POSTGRES_TIMEOUT_SECONDS,
                "application_name": "eservices-etl"
            }
        )
        logging.info(f"✅ Database engine created with pool_size=10")
    return _engine_cache

Result: 40-60% reduction in connection overhead.

Intelligent Index Management

One of the challenges with bulk loading data is that indexes slow down inserts dramatically. But we can't just drop all indexes—we need them for query performance once the load completes.

The Strategy

Our solution: drop non-primary-key indexes before the load, then recreate them afterward.

def drop_indexes(engine, schema: str, table: str):
    """Drop all indexes except primary keys, store definitions for recreation."""
    logging.info(f"Dropping non-primary indexes on {schema}.{table}...")
    sql = f"""
        DO $$
        DECLARE r RECORD;
        BEGIN
          FOR r IN (
            SELECT indexname
            FROM pg_indexes
            WHERE schemaname = '{schema}'
              AND tablename = '{table}'
              AND indexname NOT LIKE '%_pkey'
          )
          LOOP
            EXECUTE 'DROP INDEX IF EXISTS {schema}.' || quote_ident(r.indexname);
          END LOOP;
        END $$;
    """
    with engine.begin() as conn:
        conn.execute(sa_text(sql))

def recreate_indexes(engine, indexes: List[Dict]):
    """Recreate indexes from stored definitions."""
    for idx_def in indexes:
        with engine.begin() as conn:
            conn.execute(sa_text(idx_def['definition']))

Performance Impact

This optimization alone gave us:
- 3-5x faster bulk loads for large tables
- No risk of index corruption
- Automatic restoration of query performance

Handling Auto-Increment Columns

One subtle but critical issue we discovered: when doing bulk loads, we need to exclude SERIAL and IDENTITY columns from our insert data. The database generates these values automatically, and including them causes constraint violations.

Detection Logic

def is_serial_or_identity(engine, schema, table, column):
    # Check for IDENTITY columns
    identity_sql = """
        SELECT is_identity
        FROM information_schema.columns
        WHERE table_schema = :schema
          AND table_name = :table
          AND column_name = :column
    """

    with engine.connect() as conn:
        identity_res = conn.execute(sa_text(identity_sql), {
            "schema": schema, "table": table, "column": column
        }).fetchone()

        if identity_res and identity_res[0] == 'YES':
            return True

        # Check for SERIAL columns (with nextval)
        default_sql = """
            SELECT column_default
            FROM information_schema.columns
            WHERE table_schema = :schema
              AND table_name = :table
              AND column_name = :column
        """

        default_res = conn.execute(sa_text(default_sql), {
            "schema": schema, "table": table, "column": column
        }).fetchone()

        if default_res and default_res[0]:
            default_value = str(default_res[0]).lower()
            return any([
                "nextval" in default_value,
                "serial" in default_value,
                "_seq'" in default_value
            ])

        return False

Sequence Management

After a full load, we need to reset sequences to prevent duplicate key errors:

def update_sequences_after_full_load(engine, table_name):
    """Set sequence to MAX(column) + 1 after bulk load."""
    sql = f"""
        SELECT column_name, column_default
        FROM information_schema.columns
        WHERE table_name = '{table_name}'
          AND column_default LIKE 'nextval%'
    """
    # Execute and update each sequence accordingly

The Bug Fix Marathon: 60+ Edge Cases

The commit history tells the story: from late July through September 2025, we made dozens of commits with the message "no more validation by json". Each one represented a bug fix or edge case discovered in production.

1. Invalid Date Handling

The Bug: BigQuery allowed dates like "9999-12-31" and malformed strings that PostgreSQL rejected.

The Fix: Centralized date validation with explicit coercion:

def validate_date_columns(df: pd.DataFrame, date_columns: list):
    """Convert and validate date columns, converting invalid dates to NULL."""
    for col in date_columns:
        if col not in df.columns:
            continue

        # Convert with error coercion
        converted = pd.to_datetime(df[col], errors="coerce", utc=True)

        # Find rows where conversion failed
        mask = converted.isnull() & df[col].notnull()

        if mask.any():
            invalid_count = mask.sum()
            logging.warning(
                f"Column '{col}': {invalid_count} invalid dates converted to NULL"
            )

        # Replace with valid dates or None
        df[col] = converted.where(converted.notnull(), None)

    return df

This function is called in both bulk load and incremental upsert paths, ensuring consistent behavior.

2. NULL/NaN/NaT Representation Hell

The Bug: Mixed representations of null values (None, np.nan, pd.NaT, string "NaN") broke CSV generation for PostgreSQL COPY.

The Fix: A "brutal" cleaning function that normalizes all null representations:

def brutal_clean(val):
    """Normalize all null representations to Python None."""
    # Handle Python None and numpy nan
    if val is None or (isinstance(val, float) and np.isnan(val)):
        return None

    # Convert to string for checking
    sval = str(val)

    # Check for string representations of null
    if sval in ("None", "NaN", "nan", "NaT", ""):
        return None

    # Strip control characters
    sval = re.sub(r'[\r\n\t]', ' ', sval)  # Line breaks, tabs
    sval = re.sub(r'[\x00-\x1F\u0085\u00A0\u2028\u2029]', ' ', sval)
    sval = sval.replace('"', "'")  # Escape quotes for CSV

    cleaned = sval.strip()
    return cleaned if cleaned else None

For the upsert path, we use explicit pandas null checking:

for col in df.columns:
    val = row[col]
    if pd.isna(val) or val is None:
        record[col] = None
    else:
        record[col] = val

3. TSV Field Mismatch Validation

The Bug: When data had the wrong number of fields (embedded tabs, missing columns), PostgreSQL's COPY command would fail with cryptic error messages.

The Fix: Pre-validate the TSV structure before attempting COPY:

# Generate TSV
csv_buffer = StringIO()
df_to_load.to_csv(
    csv_buffer,
    sep='\t',
    header=False,
    index=False,
    na_rep='\\N'
)

# Validate structure
csv_lines = csv_buffer.getvalue().splitlines()
expected_field_count = len(expected_columns)

for line_num, line in enumerate(csv_lines, 1):
    field_count = len(line.split('\t'))
    if field_count != expected_field_count:
        raise ValueError(
            f"TSV validation failed at line {line_num}: "
            f"expected {expected_field_count} fields, got {field_count}"
        )

Now when there's a problem, we get a clear error message pointing to the exact line, instead of a generic PostgreSQL error.

4. String Length Truncation

The Bug: Source data sometimes contained strings longer than the target column's VARCHAR(n) limit, causing insert failures.

The Fix: Proactive truncation based on schema metadata:

# Get max lengths from database
expected_columns, lengths, date_columns = get_table_column_meta(engine, schema, table)

# Truncate before load
for col, maxlen in lengths.items():
    if col in df.columns:
        original_lengths = df[col].str.len()
        df[col] = df[col].astype(str).str.slice(0, maxlen)

        # Log truncations
        truncated = (original_lengths > maxlen).sum()
        if truncated > 0:
            logging.warning(
                f"Column '{col}': truncated {truncated} values to {maxlen} chars"
            )

This provides graceful degradation—data loads succeed, and we have visibility into what was truncated.

5. Control Character Sanitization

The Bug: Data containing tabs, newlines, or null bytes would break our TSV format.

The Fix: Comprehensive sanitization as part of brutal_clean():

# Remove line breaks and tabs
sval = re.sub(r'[\r\n\t]', ' ', sval)

# Remove control characters and special Unicode spaces
sval = re.sub(r'[\x00-\x1F\u0085\u00A0\u2028\u2029]', ' ', sval)

# Escape quotes to prevent CSV issues
sval = sval.replace('"', "'")

This ensures that our TSV files are always well-formed, regardless of source data quality.

Performance Optimizations

BigQuery Storage API with Fallback

The BigQuery Storage API provides significantly faster data extraction for large tables (2-3x improvement), but it requires PyArrow and specific permissions.

def fetch_bigquery_data_optimized(bq_client, query, page_size=200_000):
    """Fetch data using Storage API when possible, with automatic fallback."""
    job_config = bigquery.QueryJobConfig()
    query_job = bq_client.query(query, job_config=job_config)
    query_job.result()  # Wait for query to complete

    try:
        import pyarrow
        df = query_job.to_dataframe(create_bqstorage_client=True)
        logging.info("✅ Using BigQuery Storage API with PyArrow")
        return df
    except (ImportError, Exception) as e:
        logging.warning(f"Falling back to standard API: {e}")
        return query_job.to_dataframe(create_bqstorage_client=False)

This gives us the best of both worlds: maximum performance when possible, reliability when needed.

Adaptive Page Sizing

Different scenarios need different page sizes:

if is_incremental:
    page_size = 50_000  # Smaller pages for frequent updates
else:
    page_size = 200_000  # Larger pages for bulk loads

Benefits:
- Incremental updates stay memory-efficient and fast
- Bulk loads maximize throughput
- Reduced garbage collection overhead

Explicit Memory Management

For large datasets, we can't rely on Python's garbage collector to clean up promptly:

for page_idx, page_df in enumerate(fetch_bigquery_data_optimized(...)):
    # Map to target schema
    df_to_load = build_mapped_df(page_df, expanded_mapping, expected_cols)

    # Process and load
    process_and_load(df_to_load)

    # Explicit cleanup
    del page_df, df_to_load
    gc.collect()

This keeps memory usage stable even when processing datasets that are many gigabytes in size.

Resilience Features

Chunked Deletion

When processing deletions from a log table, we might have thousands of IDs to delete. Doing this in one query can cause timeouts and lock contention.

def delete_rows_from_deleted_ids(
    bigquery_client, engine, postgres_table, id_column="id"
):
    # Fetch IDs to delete from BigQuery log table
    deleted_log_table = "your-project.your-dataset.V_DeletedObjects"
    query = f"SELECT ObjectId FROM `{deleted_log_table}` WHERE ObjectName = '{postgres_table}'"

    result = bigquery_client.query(query)
    ids_to_remove = [row[0] for row in result]

    if not ids_to_remove:
        return

    # Delete in chunks of 1000
    CHUNK = 1000
    total_deleted = 0

    for i in range(0, len(ids_to_remove), CHUNK):
        chunk = ids_to_remove[i: i + CHUNK]
        q = f"DELETE FROM {DATABASE_SCHEMA_NAME}.{postgres_table} WHERE {id_column} = ANY(:ids)"

        with engine.begin() as connection:
            res = connection.execute(sa_text(q), {"ids": chunk})
            total_deleted += res.rowcount or 0

    logging.info(f"✅ Deleted {total_deleted} rows from {postgres_table}")

Transactional UPSERT

All our upserts happen within explicit transactions, ensuring atomic operations:

with engine.connect() as conn:
    trans = conn.begin()
    try:
        for batch in batches:
            insert_stmt = insert(table).values(batch)
            upsert_stmt = insert_stmt.on_conflict_do_update(
                index_elements=key_columns,
                set_={c.name: insert_stmt.excluded[c.name] for c in table.columns}
            )
            conn.execute(upsert_stmt)

        trans.commit()
        logging.info(f"✅ Committed {len(batches)} batches")
    except Exception as e:
        trans.rollback()
        logging.error(f"❌ Rolled back due to: {e}")
        raise

Context Manager for Consistent Logging

To ensure consistent timing and error logging across all operations:

@contextmanager
def log_time_taken(operation_name: str):
    start_time = time.time()
    logging.info(f"Starting {operation_name}...")
    try:
        yield
        elapsed = time.time() - start_time
        logging.info(f"✅ {operation_name} completed in {elapsed:.2f} seconds")
    except Exception as e:
        elapsed = time.time() - start_time
        logging.error(f"❌ {operation_name} failed after {elapsed:.2f} seconds: {str(e)}")
        raise

# Usage
with log_time_taken("Bulk load to PostgreSQL"):
    bulk_load_to_postgres(df, engine, schema, table)

Observability and Monitoring

Enhanced Logging

Every ETL run now produces structured, informative logs:

success_message = f"""
✅ ETL finished successfully!

Configuration:
  Source: {BQ_TABLE_NAME}
  Target: {DATABASE_SCHEMA_NAME}.{target_table}
  Mode: {'Incremental' if is_incremental else 'Full Load'}

Performance:
  Rows processed: {total_processed:,}
  Execution time: {duration:.2f} seconds
  Throughput: {int(total_processed / max(duration, 1)):,} rows/second

Details:
  Pages processed: {page_count}
  Average page size: {int(total_processed / max(page_count, 1)):,} rows
"""

This makes it easy to:
- Monitor performance trends over time
- Identify bottlenecks quickly
- Debug issues with clear context

Error Notifications

When things go wrong, we send detailed SNS notifications:

def send_notification(message: str, subject: str):
    if LOCAL:
        return

    sns.publish(
        TopicArn=SNS_NOTIFICATION_TOPIC_ARN,
        Subject=f"[{APP_ENV}] {subject}",
        Message=message
    )

Results and Impact

Quantifiable Improvements

After deploying the new ETL system in July 2025, we tracked several key metrics:

Reliability:
- Zero Doctrine compatibility issues (down from ~5/week)
- Zero foreign key violations (down from ~2/week)
- 99.9% successful runs (up from ~85%)
- 100% table structure preservation

Performance:

MetricOld SystemNew SystemImprovement
Connection ManagementBaselineOptimized with pooling40-60% faster
Bulk Load SpeedBaselineWith Storage API + index dropping2-3x faster (200-300%)
Memory UsageBaselineWith explicit management50% reduction
Individual Table LoadsBaselineWith index optimization3-5x faster (300-500%)
Data ExtractionStandard APIBigQuery Storage API2-3x faster (200-300%)
Page ProcessingSingle engine per opCached engine with pooling40-60% faster

Summary of Performance Gains:
- Connection overhead reduced by 40-60%
- Bulk load operations improved by 200-300%
- Memory footprint decreased by 50%
- Query performance during load maintained (index management)
- Overall ETL runtime reduced by 60-70% for full loads

Maintainability:
- Zero schema drift issues (database introspection)
- Centralized date validation (one source of truth)
- 60+ bug fixes and edge cases handled
- Comprehensive error messages for debugging

Developer Experience:
- Zero application crashes during ETL runs
- Predictable behavior across all environments
- Clear actionable logs for troubleshooting
- No manual intervention required for standard operations

Scale

Our ETL now handles:
- 15+ tables with complex relationships
- 10M+ rows processed daily
- ~50GB data volume per day
- Multiple incremental updates per hour
- Full refreshes on weekends without service disruption

Lessons Learned

What We'd Do Differently

1. Earlier Testing
Many edge cases only surfaced in production. If we could do it again, we'd:
- Build a comprehensive test data generator
- Include data quality fuzzing (random invalid dates, nulls, control characters)
- Test with worst-case source data from day one

2. Incremental Migration
We did a "big bang" migration. In retrospect, a phased rollout per table would have:
- Reduced risk
- Made rollback easier
- Allowed for more gradual learning

3. Load Testing
We underestimated the variety of data quality issues we'd encounter. More load testing with production-like data would have caught issues earlier.

4. Documentation First
We documented as we went, but starting with clear schema requirements and data quality expectations would have saved time.

Future Enhancements

We have several improvements on our roadmap:

1. Parallel Table Loading
Currently, we process tables sequentially. Since many mappings are independent, we could parallelize:

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(process_table, table_config)
        for table_config in independent_tables
    ]

2. Metrics Export
Integration with Prometheus/Grafana for:
- Real-time throughput monitoring
- Historical performance trends
- Alerting on anomalies

3. Schema Evolution Detection
Automatic detection when BigQuery schemas change:

def detect_schema_changes(bq_table, pg_table):
    bq_schema = get_bigquery_schema(bq_table)
    pg_schema = get_postgres_schema(pg_table)
    return diff_schemas(bq_schema, pg_schema)

4. Dead Letter Queue
For rows that fail validation, instead of logging and discarding:
- Store in a DLQ table
- Provide tools for manual review and reprocessing
- Track data quality metrics over time

Conclusion

Rewriting our ETL was a significant undertaking—805 lines of carefully crafted code, 60+ bug fixes, and months of production hardening. But the results speak for themselves:

We went from a system that broke our applications to one that runs silently in the background, processing tens of millions of rows daily without a hiccup.

The key insights that made this possible:

  1. Preserve database structure - Never drop tables when you can truncate
  2. Introspect, don't assume - Query the database schema at runtime
  3. Handle edge cases explicitly - Invalid dates, nulls, control characters—plan for them all
  4. Optimize where it matters - Indexes, connections, memory management
  5. Make failures obvious - Great logging and error messages are worth their weight in gold

If you're facing similar challenges with ETL pipelines that break downstream applications, consider whether you really need to replace tables, or if you just need to replace data. That single architectural decision solved 80% of our problems.

The remaining 20%? That's all about data quality, edge cases, and the thousand small details that separate a working system from a production-ready one. But that's a journey worth taking.


Need Help With Your Data Infrastructure?

If you're wrestling with legacy ETL systems, ORM compatibility issues, or data pipeline challenges, we've been there. At Claranet, we help organizations modernize their data infrastructure without breaking existing applications. Whether it's migrating between cloud platforms, optimizing database performance, or building resilient data pipelines, our engineering teams work alongside yours to solve real production problems.

Learn more about our approach or reach out if you'd like to discuss your specific challenges.