Part 4 — Ingesting Data: Fivetran, Airbyte, Snowpipe, Autoloader, Glue, ADF, and CDC

Part 4 — Ingesting Data: Fivetran, Airbyte, Snowpipe, Autoloader, Glue, ADF, and CDC – SQLYARD
Deep Technical Series — Building a Modern Data Warehouse and Lakehouse

Part 4 — Ingesting Data: Fivetran, Airbyte, Snowpipe, Autoloader, Glue, ADF, and CDC


Part 4 of 14 — Deep Technical Series: Building a Modern Data Warehouse and Lakehouse. Series index (Part 0) · ← Part 3: Bronze, Silver, and Gold

Ingestion is where everything begins. Before the dimensional model, before the medallion layers, and before any transformation logic runs, data must first arrive reliably, completely, and with clear metadata. In modern cloud architectures, ingestion no longer means a single ETL tool running a nightly job. Instead, you combine managed SaaS connectors, file-based pipelines, CDC frameworks, and event-driven automations that populate your Bronze layer continuously and predictably.

This part covers every ingestion strategy used in enterprise environments. You will learn how batch, incremental, and CDC ingestion work, how to design resilient pipelines, and when to choose Fivetran, Airbyte, Snowpipe, Databricks Autoloader, AWS Glue, Azure Data Factory, Fabric Pipelines, BigQuery Transfer Service, or AWS DMS. Every section includes working examples across Snowflake, Fabric, Databricks, AWS, and BigQuery.

How this connects to the series: Ingestion tools write to the Bronze layer defined in Part 3. The data that lands in Bronze is transformed into Silver in Parts 5 and 6. The choice of ingestion tool affects latency, schema handling, CDC capability, and cost — all of which influence how your Silver transformations need to be designed. Get ingestion right and the rest of the pipeline is reliable. Get it wrong and you are constantly debugging data gaps and schema breaks.

1 The Four Types of Ingestion Beginner

Understanding which ingestion type fits your source system and latency requirements is the first decision to make before selecting any tool. Each type has different infrastructure requirements, cost profiles, and implications for how your Bronze layer is structured.

1. Batch Ingestion

Moves data at scheduled intervals — hourly, daily, or weekly. Simple, reliable, and ideal for source systems that do not support logs or streaming. CSV exports, API pulls, and flat files. Best for non-time-sensitive analytics where overnight latency is acceptable.

2. Incremental Ingestion

Extracts only new or updated records since the last run using an updated_at timestamp, high-watermark column, or partition key. More efficient than full batch — lower cost, lower latency. Works well for Silver-layer incremental models in dbt.

3. CDC (Change Data Capture)

Reads row-level inserts, updates, and deletes directly from the source database transaction log. Near-real-time. Captures every change including deletes — which incremental ingestion misses entirely. Required for accurate SCD Type 2 dimension tracking.

4. Streaming Ingestion

Handles events in near real time from Kafka, Kinesis, Event Hub, or IoT sources. Data arrives continuously and lands in Bronze within seconds. Best for operational reporting, fraud detection, or any use case where minutes of latency are unacceptable.

TypeLatencyCaptures Deletes?InfrastructureBest For
BatchHoursNoLowNightly analytics, historical loads
IncrementalMinutes to hoursNoLowDaily refreshes, dbt incremental models
CDCSeconds to minutesYesMediumSCD Type 2, real-time fact updates
StreamingSub-secondYesHighEvent pipelines, operational BI

Incremental ingestion misses deletes. If a customer is deleted from your CRM and you use timestamp-based incremental ingestion, that deletion never reaches your warehouse. The customer continues to exist in your Silver and Gold layers indefinitely. If your use case requires accurate deletion tracking — GDPR right-to-erasure compliance, churn analysis, subscription cancellations — you need CDC, not incremental ingestion.

2 Fivetran — Managed SaaS and Database ELT Beginner

Fivetran is the industry-standard managed ELT service for SaaS and transactional database ingestion. It handles connectors, schema drift, incremental loads, and CDC automatically — with near-zero operational overhead. You configure a source and a destination, and Fivetran manages everything in between.

Reference: Fivetran Documentation

Strengths

  • Near-zero maintenance — no infrastructure to run or patch
  • Massive connector catalog — 500+ connectors as of 2026
  • Log-based CDC for MySQL, PostgreSQL, SQL Server, Oracle
  • Automatic schema drift handling — new columns in source are automatically added to destination
  • Incremental loads handled automatically with no configuration
  • Supports Snowflake, Databricks, BigQuery, Synapse, Redshift, and Fabric as destinations
  • Automated retries, monitoring, and alerting built in

Limitations

  • Cost increases with data volume — MAR (Monthly Active Rows) pricing can become expensive at scale
  • Limited customization in the transformation layer — use dbt for post-ingestion transforms
  • Some connectors are batch-only even when the source supports CDC

Common Use Cases

  • CRM data — Salesforce, HubSpot, Pipedrive
  • ERP — NetSuite, SAP, Microsoft Dynamics
  • Marketing — Facebook Ads, Google Ads, LinkedIn Ads
  • Payments — Stripe, PayPal, Braintree
  • Databases — MySQL, SQL Server, PostgreSQL, Oracle, MongoDB

Fivetran to Data Lake — Bronze Landing Pattern

-- Fivetran to ADLS / S3 / GCS: files land in Bronze automatically
-- Configure destination as: Azure Data Lake Gen2, S3, or GCS
-- Fivetran writes Parquet files with Hive-style partitioning:

/lake/bronze/crm/customer/
    year=2025/month=01/day=15/
        customer_20250115_001.parquet
        customer_20250115_002.parquet

-- No code required for Bronze landing
-- Fivetran handles partitioning, compression, and schema metadata

-- Fivetran to Snowflake: writes directly into staging tables
-- Fivetran creates tables with the following naming convention:
--   {schema}.{table}          -- current data
--   {schema}.{table}_fivetran_audit  -- sync metadata

Handling Schema Drift with Fivetran

-- Fivetran automatically handles schema drift by default
-- When a new column appears in the source:
-- 1. Fivetran detects the new column on next sync
-- 2. It automatically adds the column to the destination table
-- 3. Historical rows show NULL for the new column
-- 4. A schema change notification is sent to configured alerts

-- You can configure schema drift behavior per connector:
-- ALLOW  -- automatically add new columns (default, recommended for Bronze)
-- BLOCK  -- reject syncs when schema changes are detected
-- IGNORE -- skip new columns entirely (not recommended)

-- Best practice: allow schema drift in Bronze, enforce schema in Silver
-- Bronze absorbs all changes; Silver applies the schema contract

3 Airbyte — Open Source ELT Beginner

Airbyte is the open-source alternative to Fivetran. It provides a similar connector catalog and ELT capability but is self-hosted (or available as Airbyte Cloud), fully customizable, and significantly cheaper at scale. It is the right choice when Fivetran’s MAR-based pricing becomes prohibitive, when you need custom connectors, or when your organization prefers open-source tooling.

Reference: Airbyte Documentation

Choose Fivetran When

Operational overhead must be minimal. Team is small and cannot manage infrastructure. You need the most reliable, polished connectors. Time-to-production matters more than cost at your current scale.

Choose Airbyte When

Fivetran MAR costs are too high at your data volume. You need custom connectors not available in Fivetran. Open-source preference or data residency requirements. Your engineering team can maintain the infrastructure.

# Airbyte: configuration is done via UI or YAML config files
# After configuring a connection, Airbyte writes incremental Parquet files to:
/lake/bronze/ecommerce/orders/
    year=2025/month=01/day=15/
        orders_ab_20250115.parquet

# Airbyte also supports normalization -- basic Silver-like cleaning post-ingestion
# But for production pipelines, use dbt for all Silver transformations
# Keep Airbyte focused on Bronze ingestion only

# Monitor Airbyte sync status via API:
import requests

response = requests.get(
    "https://api.airbyte.com/v1/connections/{connectionId}/jobs",
    headers={"Authorization": "Bearer your-api-key"}
)
print(response.json())

4 Fivetran vs Airbyte — Which to Choose Beginner

CriteriaFivetranAirbyte
Pricing modelMAR-based (cost scales with volume)Open source self-hosted or per-credit cloud
Connector count500+ (most mature)400+ (growing fast)
Log-based CDCYes — core featureYes — select connectors only
Schema drift handlingAutomatic, configurableManual or normalized mode
Operational overheadNear-zero (fully managed)Self-hosted requires maintenance
Custom connectorsLimited (SDK available)Full custom connector support
Open sourceNoYes (MIT license)
Best forSmall teams, speed to productionCost sensitivity, custom sources, engineering-mature teams

5 Snowpipe — Auto-Ingestion for Snowflake Intermediate

Snowpipe automatically loads files into Snowflake as soon as they land in cloud storage. Event notifications (S3 SQS, Azure Event Grid, or GCS Pub/Sub) trigger Snowpipe to copy data from the stage into the target Bronze table within seconds of file arrival. No scheduling infrastructure required once configured.

Reference: Snowflake Snowpipe Documentation

-- Complete Snowpipe setup for continuous Bronze ingestion

-- Step 1: Create external stage pointing to cloud storage
CREATE OR REPLACE STAGE bronze_stage
  URL = 's3://your-bucket/landing/crm/customer/'
  CREDENTIALS = (AWS_ROLE_ARN = 'arn:aws:iam::account:role/SnowflakeRole')
  FILE_FORMAT = (TYPE = 'PARQUET');

-- Step 2: Create the Bronze target table
CREATE TABLE IF NOT EXISTS bronze.crm_customer (
  raw_data        VARIANT,
  source_file     VARCHAR(500),
  ingested_at     TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Step 3: Create Snowpipe with AUTO_INGEST enabled
CREATE OR REPLACE PIPE crm_customer_pipe
  AUTO_INGEST = TRUE
AS
COPY INTO bronze.crm_customer (raw_data, source_file)
FROM (
  SELECT $1, METADATA$FILENAME
  FROM @bronze_stage
);

-- Step 4: Get the SQS queue ARN to configure S3 event notifications
SHOW PIPES;
-- Copy the notification_channel value and configure it as an
-- S3 bucket event notification (ObjectCreated trigger)

-- Monitor Snowpipe status
SELECT SYSTEM$PIPE_STATUS('crm_customer_pipe');

-- Check recent load history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME   => 'bronze.crm_customer',
  START_TIME   => DATEADD('hour', -24, CURRENT_TIMESTAMP())
))
ORDER BY LAST_LOAD_TIME DESC;

6 Databricks Auto Loader — File-Based Bronze Ingestion Intermediate

Databricks Auto Loader is the recommended Bronze ingestion pattern for Databricks and Fabric Spark environments. It monitors cloud storage for new files, processes them incrementally using a checkpoint, handles schema inference and evolution automatically, and scales to billions of files without listing overhead. It is the most production-ready file ingestion solution for large-scale lakehouses.

Reference: Databricks Auto Loader Documentation

# Auto Loader: continuous Bronze ingestion with schema inference
# Handles JSON, CSV, Parquet, Avro, ORC, text, and binary formats

from pyspark.sql.functions import current_timestamp, input_file_name

df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    # Schema location: Auto Loader saves inferred schema here
    # and detects schema evolution on subsequent runs
    .option("cloudFiles.schemaLocation", "/checkpoints/crm_customer_schema")
    .option("cloudFiles.inferColumnTypes", "true")
    # Enable schema evolution: new columns absorbed automatically
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("/mnt/landing/crm/customer/")
)

# Add ingestion metadata columns
df = (
    df
    .withColumn("_ingested_at",   current_timestamp())
    .withColumn("_source_file",   input_file_name())
)

# Write to Bronze Delta table
(
    df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/bronze_crm_customer")
    .option("mergeSchema", "true")
    # trigger=availableNow: process all pending files then stop (batch mode)
    # trigger=ProcessingTime("10 minutes"): process every 10 minutes (micro-batch)
    .trigger(availableNow=True)
    .table("bronze.crm_customer")
)

Auto Loader vs COPY INTO

-- Databricks also supports COPY INTO for simpler batch Bronze loading
-- Use COPY INTO when: simple one-time or scheduled batch loads
-- Use Auto Loader when: continuous streaming, schema evolution, large file counts

-- COPY INTO: idempotent -- skips files already loaded
COPY INTO bronze.crm_customer
FROM '/mnt/landing/crm/customer/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('inferSchema' = 'true', 'mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true', 'force' = 'false');
-- force = false: skip already-loaded files (idempotent)

7 AWS Glue — ETL and Ingestion for S3 Intermediate

AWS Glue is a serverless ETL service that handles file ingestion, format conversion, and basic transformation on S3. It uses PySpark under the hood, making it compatible with Delta Lake and Iceberg table formats. Glue Crawlers automatically discover and catalog new data in S3, keeping the Glue Data Catalog (the AWS equivalent of Unity Catalog) up to date without manual schema registration.

Reference: AWS Glue Documentation

# AWS Glue Job: JSON Bronze ingestion to S3 Parquet
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import current_timestamp, input_file_name
from datetime import date

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read raw JSON from S3 landing zone
raw_df = (
    spark.read
    .option("multiLine", "true")
    .json("s3://your-bucket/landing/crm/customer/")
)

# Add ingestion metadata
raw_df = (
    raw_df
    .withColumn("_ingested_at", current_timestamp())
    .withColumn("_source_file", input_file_name())
)

# Write to Bronze with date partitioning
today = date.today()
(
    raw_df.write
    .mode("append")
    .partitionBy("_ingested_date")
    .parquet(f"s3://your-bucket/bronze/crm/customer/")
)

job.commit()

Glue Crawler — Auto-Catalog Bronze Data

# Configure a Glue Crawler to automatically catalog Bronze files
# This keeps the Glue Data Catalog current as new partitions land

# Via AWS CLI:
aws glue create-crawler \
  --name "bronze-crm-customer-crawler" \
  --role "arn:aws:iam::account:role/GlueRole" \
  --database-name "bronze" \
  --targets '{"S3Targets": [{"Path": "s3://your-bucket/bronze/crm/customer/"}]}' \
  --schedule "cron(0 6 * * ? *)"  # Run daily at 6 AM UTC

# After crawling, tables appear in Glue Catalog and are queryable via:
# - Amazon Athena (serverless SQL)
# - Redshift Spectrum (from Redshift)
# - EMR Spark jobs
# - Apache Hive

8 Azure Data Factory and Fabric Data Pipelines Beginner

Azure Data Factory (ADF) and Microsoft Fabric Data Pipelines provide GUI-based and code-assisted ingestion for Azure and Fabric environments. They share the same underlying engine and activity types — if you know ADF, Fabric Pipelines will feel familiar. Fabric Pipelines are the strategic direction for new Microsoft projects as they integrate directly with OneLake, Lakehouses, Warehouses, and semantic models.

Reference: Microsoft Fabric Data Pipelines

When to Use ADF / Fabric Pipelines

  • Batch file ingestion from HTTP, SFTP, SharePoint, or REST APIs
  • SQL Server or Azure SQL database to lake ingestion (lift-and-shift)
  • Orchestrating multiple datasets with dependency management
  • ETL modernization from SSIS packages
  • Copy Activity for bulk data movement to OneLake
  • Dataflow Gen2 for low-code visual transformations

Fabric Pipeline Structure — Bronze Ingestion

-- Fabric Pipeline activity sequence for Bronze ingestion:
-- 1. Copy Activity: source → OneLake Files (Bronze zone)
-- 2. Notebook Activity: register files as Delta table in Tables zone
-- 3. Set Variable Activity: capture row counts for monitoring

-- Example: Copy Activity configuration (JSON representation)
{
  "name": "CopyCustomersToBronze",
  "type": "Copy",
  "source": {
    "type": "RestSource",
    "httpRequestTimeout": "00:05:00",
    "requestInterval": "00.00:00:00.010",
    "requestMethod": "GET"
  },
  "sink": {
    "type": "JsonSink",
    "storeSettings": {
      "type": "LakehouseWriteSettings"
    },
    "formatSettings": {
      "type": "JsonWriteSettings"
    }
  },
  "dataIntegrationUnits": 4
}
# Fabric Notebook Activity: register landed files as Delta table
# (runs after Copy Activity in the same pipeline)
from pyspark.sql.functions import current_timestamp, input_file_name

# Read files landed by Copy Activity
df = (
    spark.read
    .option("multiLine", "true")
    .json("Files/bronze/crm/customer/")
)

df = (
    df
    .withColumn("_ingested_at", current_timestamp())
    .withColumn("_source_file", input_file_name())
)

# Write to Lakehouse Tables section (becomes a Delta table)
(
    df.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .saveAsTable("bronze_crm_customer")
)

print(f"Rows loaded: {df.count()}")

ADF / Fabric Pipeline — Incremental SQL Ingestion

-- ADF/Fabric: incremental load pattern using watermark table
-- Watermark table tracks the last successfully ingested timestamp per source

-- Step 1: Create watermark table (run once in your metadata database)
CREATE TABLE dbo.IngestWatermark (
    SourceName      VARCHAR(100) NOT NULL,
    LastLoadedAt    DATETIME2    NOT NULL,
    CONSTRAINT PK_IngestWatermark PRIMARY KEY (SourceName)
);
INSERT INTO dbo.IngestWatermark VALUES ('crm_customer', '2020-01-01');

-- Step 2: Pipeline reads watermark, uses it to filter source
-- Pipeline variable: @activity('GetWatermark').output.firstRow.LastLoadedAt
-- Copy Activity source query:
SELECT *
FROM CRM.dbo.Customer
WHERE UpdatedAt > '@{activity(''GetWatermark'').output.firstRow.LastLoadedAt}'
AND   UpdatedAt <= GETDATE()

-- Step 3: After successful copy, update watermark
UPDATE dbo.IngestWatermark
SET LastLoadedAt = GETDATE()
WHERE SourceName = 'crm_customer';

9 BigQuery Data Transfer Service Beginner

BigQuery Data Transfer Service (DTS) is Google's built-in managed ingestion service for moving data into BigQuery from Google products and selected third-party sources. It requires no infrastructure — configure it in the BigQuery console or via API and transfers run on schedule automatically.

Reference: BigQuery Transfer Service Documentation

Supported Sources

  • Google Ads, Display & Video 360, Campaign Manager, Search Ads 360
  • YouTube Analytics, Google Play
  • Google Cloud Storage (GCS) — scheduled file ingestion
  • Amazon S3 — cross-cloud ingestion into BigQuery
  • Teradata, Redshift — database migration transfers
  • Salesforce, Facebook Ads (via partner connectors)
-- BigQuery: create a scheduled GCS transfer via API
-- Configure via Google Cloud Console → BigQuery → Data Transfers → Create Transfer

-- After setup, files landing in GCS are automatically loaded into BigQuery:
gsutil cp customer_export_20250115.csv gs://your-bucket/landing/crm/customer/

-- BigQuery DTS automatically loads this into:
-- project.bronze.crm_customer (creates table if not exists)

-- Monitor transfer status
SELECT
  state,
  start_time,
  end_time,
  error_status
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE job_type = 'LOAD'
AND creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY creation_time DESC;

10 AWS DMS — Change Data Capture Intermediate

AWS Database Migration Service (DMS) replicates source database changes into S3, Redshift, Snowflake, or other destinations by reading the source database's transaction log. It supports MySQL, PostgreSQL, SQL Server, Oracle, MongoDB, and many others as sources. DMS is the right choice for near-real-time CDC from AWS-hosted databases when Fivetran log-based CDC is not available or not cost-effective.

Reference: AWS DMS Documentation

# AWS DMS produces CDC output files in S3 with this structure:
# Each file contains a set of change records with operation type

/lake/bronze/pos/orders/
    year=2025/month=01/day=15/
        LOAD00000001.csv   -- initial full load
        20250115-120000-1.csv  -- CDC incremental changes

# Each CDC record includes an operation column:
# I = INSERT, U = UPDATE, D = DELETE

# DMS CSV output format (default):
# Op, table_name, col1, col2, col3 ...
# I, orders, 12345, 2025-01-15, 99.99
# U, orders, 12345, 2025-01-15, 109.99
# D, orders, 12347, 2025-01-14, 49.99
# Process DMS CDC output in PySpark Silver transformation
from pyspark.sql.functions import col, max as spark_max

# Read DMS output -- includes Op column
cdc_bronze = spark.read.parquet("s3://lake/bronze/pos/orders/")

# Separate operation types
inserts    = cdc_bronze.filter(col("Op") == "I")
updates    = cdc_bronze.filter(col("Op") == "U")
deletes    = cdc_bronze.filter(col("Op") == "D")

# Apply to Silver: MERGE inserts and updates, mark deletes
from delta.tables import DeltaTable

silver = DeltaTable.forPath(spark, "s3://lake/silver/pos/orders/")

# Merge inserts and updates
silver.alias("tgt").merge(
    inserts.union(updates).alias("src"),
    "tgt.order_id = src.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Handle deletes: mark as deleted rather than physical delete
# (preserves audit trail and supports SCD Type 2 history)
silver.alias("tgt").merge(
    deletes.alias("src"),
    "tgt.order_id = src.order_id AND tgt.is_deleted = 0"
).whenMatchedUpdate(set={"is_deleted": "1", "deleted_at": "current_timestamp()"}) \
 .execute()

11 CDC Design Patterns Advanced

CDC is the most powerful ingestion type but also the most complex to design correctly. There are three patterns in common use, each with different trade-offs between complexity, accuracy, and infrastructure requirements.

Pattern 1 — Timestamp-Based Incremental

The simplest approach. Filter the source table by an updated_at column to pull only records changed since the last load. Works without any special database configuration. Misses hard deletes.

-- Timestamp-based incremental: pull records updated since last watermark
-- Works on any database with an updated_at column
-- Does NOT capture deletes

SELECT *
FROM source.orders
WHERE updated_at > (
    SELECT MAX(updated_at) FROM silver.orders
)
AND updated_at <= CURRENT_TIMESTAMP();

-- Limitation: if a row is deleted from source, it is never reflected in Silver
-- Solution: use soft deletes (is_deleted flag) in the source, or switch to log-based CDC

Pattern 2 — Log-Based CDC

Reads directly from the database transaction log — the authoritative record of every change. Captures inserts, updates, AND deletes. Near real-time. Requires source database configuration (MySQL binlog enabled, SQL Server CDC enabled, PostgreSQL logical replication slot configured).

-- SQL Server: enable CDC on source database (run as sysadmin)
-- This is the source-side configuration for Fivetran or DMS log-based CDC

-- Enable CDC on the database
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on a specific table
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'Orders',
    @role_name     = NULL,           -- NULL = no role restriction
    @capture_instance = N'dbo_Orders',
    @supports_net_changes = 1;

-- Verify CDC is enabled
SELECT name, is_cdc_enabled
FROM sys.databases
WHERE name = DB_NAME();

SELECT t.name, ct.capture_instance, ct.is_tracked_by_cdc
FROM sys.tables t
JOIN cdc.change_tables ct ON ct.source_object_id = t.object_id;

-- PostgreSQL: enable logical replication for Debezium or Fivetran
-- (run as superuser on the source PostgreSQL instance)
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 5;
SELECT pg_reload_conf();

Pattern 3 — MERGE Pattern at Silver

Regardless of how CDC records arrive in Bronze (as DMS CSV files, Fivetran tables, or Kafka messages), the Silver layer applies them using a MERGE operation that handles inserts, updates, and deletes idempotently.

-- Silver MERGE: apply CDC records to Silver table
-- Works the same across Snowflake, Fabric, BigQuery, and Databricks Delta
-- This is the pattern that feeds your SCD Type 2 Gold dimensions

-- Snowflake example:
MERGE INTO silver.customer AS tgt
USING bronze.customer_cdc AS src
    ON tgt.customer_id = src.customer_id
WHEN MATCHED AND src.op_type = 'D' THEN
    UPDATE SET
        tgt.is_deleted   = 1,
        tgt.deleted_at   = CURRENT_TIMESTAMP()
WHEN MATCHED AND src.op_type IN ('U', 'I') THEN
    UPDATE SET
        tgt.customer_name = src.customer_name,
        tgt.email         = src.email,
        tgt.updated_at    = src.updated_at
WHEN NOT MATCHED AND src.op_type != 'D' THEN
    INSERT (customer_id, customer_name, email, updated_at, is_deleted)
    VALUES (src.customer_id, src.customer_name, src.email, src.updated_at, 0);

12 Ingestion Tool to Medallion Layer Mapping Beginner

Every ingestion tool has a natural home in the medallion architecture. No ingestion tool should write directly to Gold — that layer is owned by transformation logic, not ingestion pipelines.

Tool Bronze Silver Gold Notes
Fivetran Optional Fivetran normalized output can land in Silver-like tables, but use dbt for production Silver
Airbyte Optional Same recommendation as Fivetran — Bronze only, dbt for Silver
Snowpipe Raw file ingestion only — no transformation capability
Auto Loader File discovery and Bronze landing only
AWS Glue Optional Glue ETL can handle Silver cleaning but dbt is preferred for maintainability
ADF / Fabric Pipelines Optional Dataflow Gen2 can do Silver transforms; dbt preferred for complex logic
BigQuery DTS Raw load into BigQuery Bronze tables only
AWS DMS CDC output lands in Bronze — Silver MERGE handled separately
dbt Transformation only — never used for ingestion

13 Schema Drift — Handling Source Changes Gracefully Advanced

Schema drift occurs when a source system adds, removes, or renames a column without warning. In traditional ETL systems this breaks pipelines immediately. In a well-designed medallion architecture, schema drift in Bronze is expected and handled automatically — but it must be managed explicitly at the Silver layer where the schema contract is enforced.

The Schema Drift Strategy

  • Bronze: absorb all changes. Configure all ingestion tools to accept new columns automatically. Bronze never rejects data due to schema changes. New columns appear as nulls in historical rows — that is acceptable because Bronze is an archive.
  • Silver: enforce the contract. Silver's SELECT statement explicitly names every column it uses. New columns in Bronze that are not in the Silver SELECT are ignored until you deliberately add them. This protects downstream Gold models and semantic layers from surprise schema changes.
  • Gold: stable and versioned. Gold schema changes are coordinated with BI teams and semantic model owners before deployment. Use dbt's on_schema_change configuration to control behavior when upstream Silver changes.
-- dbt: control Silver behavior when Bronze schema changes
-- models/staging/stg_crm_customer.sql

{{ config(
    materialized = 'incremental',
    unique_key   = 'customer_id',
    -- Options: 'ignore' | 'fail' | 'append_new_columns' | 'sync_all_columns'
    on_schema_change = 'append_new_columns'
    -- append_new_columns: add new columns from source to the model automatically
    -- sync_all_columns: add new + remove dropped columns (use carefully)
    -- fail: stop pipeline if schema changes (use for Gold or critical Silver models)
    -- ignore: silently ignore schema changes (not recommended)
) }}

SELECT
    customer_id,
    customer_name,
    email,
    city,
    state,
    country,
    updated_at
FROM {{ source('crm_bronze', 'customer') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

14 Workshops

Novice

Ingest to Bronze

  • Create a container or bucket in ADLS, S3, or GCS
  • Drop a CSV or JSON file into the landing zone
  • Use Auto Loader, Snowpipe, or ADF Copy Activity to load Bronze
  • Query the Bronze table and verify row count matches source file
  • Check that ingestion metadata columns (_ingested_at, _source_file) are populated
  • Add a second file and confirm it is picked up automatically

Intermediate

Build Incremental Silver

  • Build a Bronze table with an updated_at column
  • Insert 1,000 rows representing initial load
  • Build a Silver MERGE using the incremental watermark pattern
  • Insert 100 more rows with updated updated_at values
  • Run Silver MERGE and validate only new rows were processed
  • Introduce a schema change (new column) and verify Silver handles it

Advanced

Implement a CDC Pipeline

  • Enable CDC on a SQL Server or PostgreSQL source table
  • Configure Fivetran log-based CDC or AWS DMS to capture changes
  • Land CDC records into Bronze (with Op column: I, U, D)
  • Build a Silver MERGE that handles all three operation types
  • Insert a row, update it, then delete it — verify each operation flows to Silver correctly
  • Verify that the delete appears as is_deleted = 1 in Silver, not as a missing row

References

Up next → Part 5: Transformations Part 1: ELT Strategy, dbt Modeling, and SQL Pipelines — ELT vs ETL, dbt staging and incremental models, MERGE patterns across all platforms, JSON flattening, and surrogate key generation.

Discover more from SQLYARD

Subscribe to get the latest posts sent to your email.

Leave a Reply

Discover more from SQLYARD

Subscribe now to keep reading and get access to the full archive.

Continue reading