Part 10 — Orchestration and Automation
By this point in the series your system has a dimensional model, a medallion lake, ingestion pipelines, transformations, a physical warehouse or lakehouse, and a semantic layer. Everything is built. Now it needs to run — reliably, on schedule, in the right order, with automatic recovery when something fails. That is orchestration.
Without orchestration, every pipeline is a manual process. Someone has to remember to run Silver before Gold, to trigger the semantic model refresh after Gold completes, to retry when a source API is temporarily unavailable, and to alert the team when something fails silently. Orchestration automates all of this and makes your data platform operationally maintainable at scale.
How this connects to the series: Orchestration ties together every layer built in Parts 3 through 9. A well-designed orchestrator runs: Bronze ingestion → Silver transformation → Gold dimensional models → semantic model refresh → data quality gates → notifications. Every pipeline in the series becomes a task in your orchestrator. This part shows you how to build that coordination layer across every major platform.
- Why Orchestration Matters — What Breaks Without It
- Core Concepts: DAGs, Tasks, Triggers, and Dependencies
- Choosing the Right Orchestrator
- Apache Airflow — The Enterprise Standard
- Microsoft Fabric Data Pipelines
- dbt Cloud Job Orchestration
- Databricks Workflows
- AWS Glue Workflows and Step Functions
- Google Cloud Composer
1 Why Orchestration Matters — What Breaks Without It Beginner
Without orchestration, data pipelines are fragile. They run in the wrong order, fail silently, and produce stale or incorrect data that analysts trust without knowing it is wrong. Here is what specifically breaks when orchestration is absent:
- Out-of-order execution. Silver transformation runs before Bronze ingestion completes. Gold loads yesterday’s Silver data because today’s Silver was still processing. The dashboard shows correct numbers for yesterday’s data, wrong numbers for today — and nobody knows.
- Silent failures. A source API times out at 2 AM. The pipeline appears to complete because the Bronze table already exists from yesterday. Silver and Gold load successfully from stale data. Analysts trust the dashboard until someone notices the numbers have not changed in two days.
- No recovery path. A transformation fails halfway through. Some dimension rows are loaded, others are not. The next pipeline run does not know where to pick up. Manual intervention is required to determine the state of the data and restart from a safe point.
- No visibility. Nobody knows how long the pipeline took, whether it finished, or whether it is currently running. When a dashboard is stale, the first 20 minutes are spent figuring out whether the pipeline is still running, already failed, or never started.
What good orchestration provides: Guaranteed execution order (Bronze before Silver before Gold). Automatic retries on transient failures. Clear success/failure visibility with timestamps and duration. Notifications to the right people when something needs attention. Idempotent re-runs that safely recover from any failure point without manual intervention.
2 Core Concepts: DAGs, Tasks, Triggers, and Dependencies Beginner
DAG — Directed Acyclic Graph
A DAG is the data structure that represents a pipeline. Each node in the graph is a task (a unit of work). Each edge is a dependency (task B cannot start until task A completes). The “acyclic” part means there are no loops — you cannot have task A depend on task B which depends on task A. The DAG defines the execution order, and the orchestrator schedules and monitors it.
Key Concepts
- Task — a single unit of work: run a SQL query, execute a dbt model, trigger a notebook, call an API
- Dependency — an upstream task that must succeed before a downstream task can start
- Schedule — when the DAG runs: cron expression, fixed interval, or event-triggered
- Trigger — what initiates a DAG run: schedule, file arrival, API call, upstream DAG completion, or manual
- Retry — automatic re-execution when a task fails, with configurable delay and maximum attempts
- SLA — the maximum time a task or DAG is allowed to take before alerting
- Idempotency — the property that re-running a task produces the same result as running it once, regardless of how many times it has already run
Design every task to be idempotent. When a task fails and is retried, it must be safe to re-run without producing duplicate data or leaving the system in a broken state. Use MERGE instead of INSERT, use CREATE OR REPLACE instead of CREATE, use watermarks that track the last successful run. If a task is not idempotent, a retry can corrupt your data in ways that are very difficult to detect.
3 Choosing the Right Orchestrator Beginner
Apache Airflow
The most widely adopted orchestrator for complex multi-cloud pipelines. Python-based DAGs, thousands of operators, extensive ecosystem. Best for: complex dependencies, multi-cloud, custom Python logic, existing DevOps tooling.
Fabric Data Pipelines
GUI-first orchestration inside Microsoft Fabric. Integrates natively with all Fabric workloads. Best for: Fabric-centric teams, no-code/low-code orchestration, combined ingestion and orchestration in one tool.
dbt Cloud Jobs
Built-in scheduler for dbt projects. Handles model ordering automatically via the DAG. Best for: transformation-only orchestration where all logic is in dbt models and the warehouse handles compute.
Databricks Workflows
Native orchestration for Databricks notebooks, SQL queries, and Delta Live Tables. Best for: Databricks-centric medallion pipelines where notebooks, Spark, and SQL tasks all run on Databricks.
AWS Step Functions + Glue Workflows
AWS-native orchestration for Glue ETL jobs and serverless pipelines. Best for: AWS-primary teams, Glue-heavy pipelines, serverless event-driven architectures.
Google Cloud Composer
Google Cloud’s managed Airflow service. Eliminates Airflow infrastructure management. Best for: Google Cloud-primary teams who want Airflow without the operational overhead.
| Orchestrator | Language | Best For | Managed? |
|---|---|---|---|
| Apache Airflow | Python | Complex multi-cloud, custom logic | Self-hosted or MWAA/Composer |
| Fabric Pipelines | GUI + JSON | Fabric-first, no-code teams | Fully managed |
| dbt Cloud | YAML + SQL | dbt-only transformation workflows | Fully managed |
| Databricks Workflows | JSON + notebooks | Databricks-native pipelines | Fully managed |
| AWS Step Functions | JSON (ASL) | Serverless AWS-native pipelines | Fully managed |
| Cloud Composer | Python (Airflow) | GCP-primary teams | Managed Airflow |
4 Apache Airflow — The Enterprise Standard Intermediate
Airflow is the most widely adopted orchestrator for enterprise data pipelines. Its Python-based DAG definitions are version-controlled, testable, and flexible enough to coordinate any combination of SQL, Spark, API calls, dbt runs, and custom logic. The operator ecosystem covers virtually every tool and platform covered in this series.
Reference: Apache Airflow Documentation
Complete Medallion Pipeline DAG
# Airflow DAG: complete Bronze → Silver → Gold → Semantic refresh pipeline
# File: dags/medallion_daily_refresh.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import json
# Default task arguments -- applied to every task unless overridden
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # 5 min, 10 min, 20 min
'email_on_failure': True,
'email': ['data-team@company.com'],
'sla': timedelta(hours=4), # alert if DAG not done in 4 hours
}
with DAG(
dag_id = 'medallion_daily_refresh',
description = 'Daily Bronze → Silver → Gold → Semantic refresh',
default_args = default_args,
schedule_interval = '0 3 * * *', # 3 AM daily
start_date = days_ago(1),
catchup = False, # do not backfill missed runs
max_active_runs = 1, # prevent overlapping runs
tags = ['production', 'medallion', 'daily'],
) as dag:
# ── BRONZE INGESTION (parallel) ──────────────────────────────────────
bronze_crm = BashOperator(
task_id = 'bronze_ingest_crm',
bash_command = 'python /pipelines/bronze/ingest_crm.py',
)
bronze_orders = BashOperator(
task_id = 'bronze_ingest_orders',
bash_command = 'python /pipelines/bronze/ingest_orders.py',
)
bronze_products = BashOperator(
task_id = 'bronze_ingest_products',
bash_command = 'python /pipelines/bronze/ingest_products.py',
)
# ── SILVER TRANSFORMATION (waits for all Bronze) ─────────────────────
silver_transform = BashOperator(
task_id = 'silver_transform',
bash_command = 'dbt run --select staging --target prod',
)
# ── SILVER DATA QUALITY GATE ──────────────────────────────────────────
silver_quality = BashOperator(
task_id = 'silver_quality_check',
bash_command = 'dbt test --select staging --target prod',
)
# ── GOLD DIMENSIONS (parallel, all wait for Silver quality) ──────────
gold_dim_customer = BashOperator(
task_id = 'gold_dim_customer',
bash_command = 'dbt run --select dim_customer --target prod',
)
gold_dim_product = BashOperator(
task_id = 'gold_dim_product',
bash_command = 'dbt run --select dim_product --target prod',
)
gold_dim_date = BashOperator(
task_id = 'gold_dim_date',
bash_command = 'dbt run --select dim_date --target prod',
)
# ── GOLD FACT TABLE (waits for all dimensions) ────────────────────────
gold_fct_orders = BashOperator(
task_id = 'gold_fct_order_line',
bash_command = 'dbt run --select fct_order_line --target prod',
)
# ── GOLD DATA QUALITY GATE ────────────────────────────────────────────
gold_quality = BashOperator(
task_id = 'gold_quality_check',
bash_command = 'dbt test --select marts --target prod',
)
# ── SEMANTIC MODEL REFRESH ────────────────────────────────────────────
def refresh_power_bi_dataset(**context):
"""Trigger Power BI dataset refresh via REST API"""
import requests
token = context['var']['value'].get('pbi_bearer_token')
group_id = context['var']['value'].get('pbi_workspace_id')
ds_id = context['var']['value'].get('pbi_dataset_id')
r = requests.post(
f"https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{ds_id}/refreshes",
headers={"Authorization": f"Bearer {token}"}
)
r.raise_for_status()
semantic_refresh = PythonOperator(
task_id = 'semantic_model_refresh',
python_callable = refresh_power_bi_dataset,
provide_context = True,
)
# ── DEFINE EXECUTION ORDER ────────────────────────────────────────────
# All Bronze tasks run in parallel
[bronze_crm, bronze_orders, bronze_products] >> silver_transform
# Silver quality gate must pass before any Gold runs
silver_transform >> silver_quality
# All Gold dimensions run in parallel after Silver quality passes
silver_quality >> [gold_dim_customer, gold_dim_product, gold_dim_date]
# Fact table waits for all dimensions
[gold_dim_customer, gold_dim_product, gold_dim_date] >> gold_fct_orders
# Gold quality gate, then semantic refresh
gold_fct_orders >> gold_quality >> semantic_refresh
5 Microsoft Fabric Data Pipelines Beginner
Fabric Data Pipelines provide GUI-based orchestration integrated directly with all Fabric workloads — Lakehouses, Warehouses, Notebooks, Dataflows Gen2, and semantic models. They use the same activity model as Azure Data Factory so teams migrating from ADF will find the interface familiar. For Fabric-centric teams, Pipelines eliminate the need for a separate orchestration tool.
Reference: Microsoft Fabric Data Factory Documentation
Medallion Pipeline in Fabric — Activity Sequence
-- Fabric Pipeline: JSON representation of medallion activity sequence
-- (Actual configuration done in the Fabric UI -- this shows the logical structure)
{
"name": "medallion_daily_refresh",
"activities": [
{
"name": "Bronze_CRM_Copy",
"type": "Copy",
"dependsOn": [],
"source": { "type": "RestSource" },
"sink": { "type": "LakehouseTable", "tableName": "bronze_crm_customer" }
},
{
"name": "Silver_Notebook",
"type": "SparkNotebook",
"dependsOn": [
{ "activity": "Bronze_CRM_Copy", "dependencyConditions": ["Succeeded"] }
],
"notebook": "notebooks/silver_transform"
},
{
"name": "Gold_Dim_Customer",
"type": "SparkNotebook",
"dependsOn": [
{ "activity": "Silver_Notebook", "dependencyConditions": ["Succeeded"] }
],
"notebook": "notebooks/gold_dim_customer_scd2"
},
{
"name": "Gold_Fact_Orders",
"type": "SparkNotebook",
"dependsOn": [
{ "activity": "Gold_Dim_Customer", "dependencyConditions": ["Succeeded"] }
],
"notebook": "notebooks/gold_fct_order_line"
},
{
"name": "Refresh_Semantic_Model",
"type": "RefreshDataset",
"dependsOn": [
{ "activity": "Gold_Fact_Orders", "dependencyConditions": ["Succeeded"] }
],
"workspaceId": "",
"datasetId": ""
}
],
"trigger": {
"type": "ScheduleTrigger",
"recurrence": { "frequency": "Day", "interval": 1, "startTime": "03:00" }
}
}
Error Handling in Fabric Pipelines
-- Fabric Pipeline: conditional branching on activity failure
-- Add a failure path that runs a notification activity when any task fails
-- In the Fabric UI:
-- 1. Connect an activity with a red (failure) dependency line
-- 2. Add a "Web Activity" or "Send Email" activity on the failure path
-- 3. The failure activity runs when the upstream activity fails
-- Web Activity for Teams notification on failure:
{
"name": "Notify_Teams_On_Failure",
"type": "WebActivity",
"dependsOn": [
{ "activity": "Gold_Fact_Orders", "dependencyConditions": ["Failed"] }
],
"url": "https://your-org.webhook.office.com/webhookb2/...",
"method": "POST",
"body": {
"text": "Pipeline failed: Gold_Fact_Orders. Check Fabric monitoring for details."
}
}
6 dbt Cloud Job Orchestration Beginner
dbt Cloud’s built-in job scheduler is the right orchestration tool when your entire transformation layer is managed by dbt. dbt automatically resolves model execution order from the dependency graph — you do not need to specify which models run before others. You define what to run, and dbt figures out the order.
Reference: dbt Cloud Jobs Documentation
# dbt Cloud job configuration (via UI or API)
# A typical production job runs three dbt commands in sequence:
# Job: Daily Production Refresh
# Schedule: 3:30 AM daily (after Bronze ingestion completes)
# Step 1: Check source freshness -- fail if Bronze data is too old
dbt source freshness --select source:crm_bronze source:ecommerce_bronze
# Step 2: Run all models from staging through marts
dbt run --target prod --select staging+ marts+
# The + suffix means: run the selected models AND everything downstream
# dbt resolves execution order automatically from the model dependency graph
# Step 3: Run all tests -- fail the job if any critical test fails
dbt test --target prod --select staging+ marts+
# Optional Step 4: Generate and upload documentation
dbt docs generate --target prod
# dbt Cloud: trigger a job via API after an upstream process completes
# Use this when Airflow, Fabric Pipelines, or another orchestrator
# needs to trigger a dbt Cloud job as part of a broader pipeline
import requests
ACCOUNT_ID = "your-dbt-cloud-account-id"
JOB_ID = "your-dbt-cloud-job-id"
API_TOKEN = "your-dbt-cloud-api-token"
response = requests.post(
f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/jobs/{JOB_ID}/run/",
headers = {
"Authorization": f"Token {API_TOKEN}",
"Content-Type": "application/json"
},
json = {
"cause": "Triggered by Airflow after Bronze ingestion",
"git_branch": "main"
}
)
run_id = response.json()["data"]["id"]
print(f"dbt Cloud job started: run_id = {run_id}")
# Poll for completion (or use webhooks for event-driven notification)
import time
while True:
status = requests.get(
f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/runs/{run_id}/",
headers={"Authorization": f"Token {API_TOKEN}"}
).json()["data"]["status"]
if status == 10: # 10 = Success
print("dbt Cloud job succeeded")
break
elif status == 20: # 20 = Error
raise Exception("dbt Cloud job failed")
time.sleep(30)
7 Databricks Workflows Intermediate
Databricks Workflows orchestrate notebooks, SQL queries, Delta Live Tables pipelines, and ML jobs natively within Databricks. They provide task-level dependency management, automatic retries, cluster management per task, and real-time monitoring — all integrated with Unity Catalog governance. For teams running the full medallion architecture on Databricks, Workflows is the natural orchestration choice.
Reference: Databricks Workflows Documentation
# Databricks Workflow: medallion pipeline as JSON definition
# Deploy via Databricks CLI, REST API, or Terraform
{
"name": "medallion_daily_refresh",
"schedule": {
"quartz_cron_expression": "0 0 3 * * ?", # 3 AM daily
"timezone_id": "UTC"
},
"max_concurrent_runs": 1,
"tasks": [
{
"task_key": "bronze_ingest_crm",
"notebook_task": {
"notebook_path": "/Repos/pipelines/bronze/ingest_crm",
"base_parameters": { "execution_date": "{{job.start_time.iso_date}}" }
},
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"num_workers": 2
},
"max_retries": 3,
"min_retry_interval_millis": 300000 # 5 minutes between retries
},
{
"task_key": "silver_transform",
"depends_on": [{ "task_key": "bronze_ingest_crm" }],
"notebook_task": {
"notebook_path": "/Repos/pipelines/silver/silver_transform"
},
"existing_cluster_id": "silver-cluster-id" # reuse existing cluster
},
{
"task_key": "gold_dim_customer",
"depends_on": [{ "task_key": "silver_transform" }],
"sql_task": {
"query": { "query_id": "dim-customer-scd2-query-id" },
"warehouse_id": "gold-sql-warehouse-id"
}
},
{
"task_key": "gold_fct_orders",
"depends_on": [{ "task_key": "gold_dim_customer" }],
"notebook_task": {
"notebook_path": "/Repos/pipelines/gold/fct_order_line"
}
}
],
"email_notifications": {
"on_failure": ["data-team@company.com"],
"on_success": []
}
}
8 AWS Glue Workflows and Step Functions Intermediate
AWS provides two orchestration services for data pipelines. Glue Workflows coordinate Glue ETL jobs, crawlers, and triggers — ideal for Glue-heavy Bronze and Silver pipelines. Step Functions provide more sophisticated orchestration with branching, parallel execution, error handling, and integration with the full AWS service catalog — ideal when your pipeline includes Lambda functions, Redshift queries, SageMaker training jobs, and other AWS services alongside Glue.
References: AWS Glue Workflows · AWS Step Functions
# AWS Step Functions: medallion pipeline state machine (ASL definition)
# Coordinates Glue jobs, Athena queries, and Lambda functions
{
"Comment": "Medallion daily refresh pipeline",
"StartAt": "ParallelBronzeIngestion",
"States": {
"ParallelBronzeIngestion": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "BronzeCRM",
"States": {
"BronzeCRM": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": { "JobName": "bronze-crm-ingestion" },
"End": true
}
}
},
{
"StartAt": "BronzeOrders",
"States": {
"BronzeOrders": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": { "JobName": "bronze-orders-ingestion" },
"End": true
}
}
}
],
"Next": "SilverTransform"
},
"SilverTransform": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": { "JobName": "silver-transformation" },
"Retry": [{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 300,
"MaxAttempts": 3,
"BackoffRate": 2.0
}],
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "NotifyFailure"
}],
"Next": "GoldDimensionalLoad"
},
"GoldDimensionalLoad": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": { "JobName": "gold-dimensional-load" },
"Next": "NotifySuccess"
},
"NotifySuccess": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:account:pipeline-alerts",
"Message": "Medallion pipeline completed successfully"
},
"End": true
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:account:pipeline-alerts",
"Message.$": "States.Format('Pipeline failed: {}', $.Error)"
},
"End": true
}
}
}
9 Google Cloud Composer Intermediate
Cloud Composer is Google Cloud’s managed Apache Airflow service. It eliminates the operational burden of running Airflow — no cluster to manage, no Python environment to maintain, automatic upgrades. For GCP-primary teams, it provides the full power of Airflow with the operational simplicity of a managed service.
Reference: Google Cloud Composer Documentation
# Cloud Composer DAG: BigQuery medallion pipeline
# Identical Airflow Python syntax -- just deployed to Cloud Composer
# Uses BigQuery operators from the apache-airflow-providers-google package
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCheckOperator
)
from airflow.providers.google.cloud.operators.dataproc import (
DataprocSubmitJobOperator
)
from datetime import datetime, timedelta
PROJECT_ID = "your-gcp-project"
DATASET = "gold"
with DAG(
dag_id = 'bigquery_medallion_refresh',
schedule_interval = '0 3 * * *',
start_date = datetime(2025, 1, 1),
catchup = False,
default_args = {'retries': 2, 'retry_delay': timedelta(minutes=5)}
) as dag:
# Silver transformation in BigQuery
silver_customer = BigQueryInsertJobOperator(
task_id = 'silver_customer',
project_id = PROJECT_ID,
configuration = {
"query": {
"query": """
CREATE OR REPLACE TABLE silver.customer AS
SELECT
SAFE_CAST(customer_id AS INT64) AS customer_id,
INITCAP(TRIM(customer_name)) AS customer_name,
LOWER(TRIM(email)) AS email,
COALESCE(city, 'Unknown') AS city,
updated_at
FROM bronze.crm_customer
QUALIFY ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY updated_at DESC
) = 1
WHERE customer_id IS NOT NULL
""",
"useLegacySql": False
}
}
)
# Gold dimension load
gold_dim_customer = BigQueryInsertJobOperator(
task_id = 'gold_dim_customer',
project_id = PROJECT_ID,
configuration = {
"query": {
"query": """
MERGE gold.dim_customer AS tgt
USING silver.customer AS src
ON tgt.customer_natural_key = src.customer_id
WHEN MATCHED AND (
tgt.customer_name != src.customer_name
) THEN UPDATE SET
customer_name = src.customer_name,
updated_at = src.updated_at
WHEN NOT MATCHED THEN INSERT ROW
""",
"useLegacySql": False
}
}
)
# Data quality check using BigQueryCheckOperator
# Fails the task if any rows are returned (grain violation check)
quality_check = BigQueryCheckOperator(
task_id = 'gold_quality_check',
project_id = PROJECT_ID,
sql = """
SELECT COUNT(*) = 0
FROM (
SELECT customer_natural_key, COUNT(*) AS cnt
FROM gold.dim_customer
WHERE is_current = TRUE
GROUP BY customer_natural_key
HAVING cnt > 1
)
"""
)
silver_customer >> gold_dim_customer >> quality_check
10 The Medallion Orchestration Pattern Beginner
Regardless of which orchestrator you use, the medallion pipeline follows the same dependency pattern. This pattern is the template for every production data pipeline in this series.
11 Error Handling and Recovery Intermediate
Error handling is not an afterthought — it is a first-class design concern. Every task in your orchestration DAG should have a defined behavior for three scenarios: transient failure (retry automatically), permanent failure (alert and stop), and partial failure (alert, stop downstream, preserve upstream state).
Retry Strategy
# Airflow: retry configuration per task
# Exponential backoff prevents hammering a temporarily unavailable service
from airflow.operators.bash import BashOperator
from datetime import timedelta
bronze_ingest = BashOperator(
task_id = 'bronze_ingest_crm',
bash_command = 'python /pipelines/bronze/ingest_crm.py',
retries = 3,
retry_delay = timedelta(minutes=5),
retry_exponential_backoff = True, # 5, 10, 20 minutes
# Do not retry on data validation errors -- only on transient failures
# Use task callbacks to distinguish error types:
on_failure_callback = lambda context: send_alert(context, "Bronze ingestion failed")
)
Dead Letter Pattern — Capture Failed Records
# When a transformation fails for specific records, capture them
# for investigation rather than failing the entire pipeline
# In your Silver transformation:
try:
silver_df = transform_bronze_to_silver(bronze_df)
silver_df.write.format("delta").mode("append").saveAsTable("silver.customer")
except Exception as e:
# Write failed records to a dead-letter table for investigation
failed_records = bronze_df.filter(col("customer_id").isNull())
failed_records \
.withColumn("failure_reason", lit(str(e))) \
.withColumn("failed_at", current_timestamp()) \
.write.format("delta").mode("append") \
.saveAsTable("silver.customer_failed")
# Re-raise to fail the task and trigger retry/alert
raise
Checkpoint Pattern — Resume From Failure Point
-- Checkpoint table: tracks last successful completion per pipeline stage
-- When a pipeline fails and is re-run, it picks up from the last checkpoint
-- rather than reprocessing everything from the beginning
CREATE TABLE IF NOT EXISTS orchestration.pipeline_checkpoint (
pipeline_name VARCHAR(200) NOT NULL,
stage VARCHAR(100) NOT NULL,
last_success TIMESTAMP NOT NULL,
watermark_key VARCHAR(200), -- optional: last processed key
CONSTRAINT PK_checkpoint PRIMARY KEY (pipeline_name, stage)
);
-- Update checkpoint after successful stage completion
MERGE INTO orchestration.pipeline_checkpoint AS tgt
USING (SELECT
'medallion_daily' AS pipeline_name,
'silver_customer' AS stage,
CURRENT_TIMESTAMP AS last_success
) AS src
ON tgt.pipeline_name = src.pipeline_name
AND tgt.stage = src.stage
WHEN MATCHED THEN UPDATE SET last_success = src.last_success
WHEN NOT MATCHED THEN INSERT VALUES (src.pipeline_name, src.stage, src.last_success, NULL);
-- Read checkpoint at pipeline start to determine incremental watermark
SELECT last_success
FROM orchestration.pipeline_checkpoint
WHERE pipeline_name = 'medallion_daily'
AND stage = 'silver_customer';
12 Triggering BI Refreshes After Pipeline Completion Intermediate
The final step of every medallion pipeline is refreshing the semantic layer and BI tools so that dashboards reflect the newly loaded Gold data. This is triggered programmatically via REST API as the last task in the DAG — after the Gold quality gate passes.
# Power BI: trigger semantic model refresh via REST API
# Called as the final task in the orchestration DAG
import requests
import msal # pip install msal
def refresh_power_bi_semantic_model(
tenant_id: str,
client_id: str,
client_secret: str,
workspace_id: str,
dataset_id: str
):
# Authenticate using service principal
app = msal.ConfidentialClientApplication(
client_id,
authority = f"https://login.microsoftonline.com/{tenant_id}",
client_credential = client_secret
)
token = app.acquire_token_for_client(
scopes = ["https://analysis.windows.net/powerbi/api/.default"]
)["access_token"]
# Trigger dataset refresh
response = requests.post(
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes",
headers = {"Authorization": f"Bearer {token}"},
json = {"notifyOption": "MailOnFailure"}
)
response.raise_for_status()
print(f"Power BI refresh triggered: HTTP {response.status_code}")
# Looker: trigger content invalidation cache clear after Gold refresh
def refresh_looker_cache(
looker_url: str,
client_id: str,
client_secret: str
):
import looker_sdk # pip install looker-sdk
sdk = looker_sdk.init40()
# Clear all Looker dashboard cache
sdk.all_running_queries() # check no long-running queries first
sdk.clear_cache()
print("Looker cache cleared -- next dashboard load will query fresh Gold data")
13 SLA Monitoring and Alerting Intermediate
SLA monitoring answers the question every data team gets asked: “Is the dashboard up to date?” A well-monitored pipeline gives you the answer before users ask the question.
# Airflow SLA monitoring: alert when a task takes longer than expected
from airflow.models import DAG
from datetime import timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when any task misses its SLA"""
message = f"SLA MISS: {[t.task_id for t in task_list]} in DAG {dag.dag_id}"
send_slack_alert(message) # implement with SlackWebhookOperator
send_pagerduty_alert(message) # for critical pipelines
with DAG(
dag_id = 'medallion_daily_refresh',
sla_miss_callback = sla_miss_callback,
default_args = {
'sla': timedelta(hours=2) # each task must complete within 2 hours
}
) as dag:
pass # tasks defined above
-- Data freshness monitoring table: query this for a freshness dashboard
-- Updated by the pipeline after each successful Gold load
CREATE TABLE orchestration.data_freshness (
table_name VARCHAR(200) NOT NULL PRIMARY KEY,
last_refreshed TIMESTAMP NOT NULL,
row_count BIGINT,
sla_hours INT NOT NULL, -- expected refresh frequency
is_stale AS (
CASE WHEN DATEDIFF('hour', last_refreshed, CURRENT_TIMESTAMP()) > sla_hours
THEN 1 ELSE 0 END
)
);
-- Update after each pipeline run
MERGE INTO orchestration.data_freshness AS tgt
USING (VALUES
('gold.fct_order_line', CURRENT_TIMESTAMP(), (SELECT COUNT(*) FROM gold.fct_order_line), 24),
('gold.dim_customer', CURRENT_TIMESTAMP(), (SELECT COUNT(*) FROM gold.dim_customer), 24),
('silver.customer', CURRENT_TIMESTAMP(), (SELECT COUNT(*) FROM silver.customer), 24)
) AS src (table_name, last_refreshed, row_count, sla_hours)
ON tgt.table_name = src.table_name
WHEN MATCHED THEN UPDATE SET last_refreshed = src.last_refreshed, row_count = src.row_count
WHEN NOT MATCHED THEN INSERT VALUES (src.table_name, src.last_refreshed, src.row_count, src.sla_hours);
-- Query freshness dashboard
SELECT
table_name,
last_refreshed,
DATEDIFF('hour', last_refreshed, CURRENT_TIMESTAMP()) AS hours_since_refresh,
sla_hours,
is_stale,
row_count
FROM orchestration.data_freshness
ORDER BY is_stale DESC, hours_since_refresh DESC;
14 Workshops
Novice
Build a Simple Pipeline
- Choose one tool: Fabric Pipelines or dbt Cloud
- Create a three-task pipeline: Bronze → Silver → Gold
- Add an email notification on failure
- Schedule it to run daily at 3 AM
- Simulate a failure by breaking the Silver task
- Verify the failure notification arrives and the pipeline stops
Intermediate
Build an Airflow Medallion DAG
- Install Airflow locally or use Astronomer/MWAA
- Create the medallion DAG from this part
- Add Bronze tasks as parallel (no dependencies between them)
- Add Silver as downstream of all Bronze tasks
- Add a quality gate using
BashOperatorrunningdbt test - Test the retry behavior: fail a task and watch automatic retries
Advanced
Build a Cross-Platform Orchestrator
- Use Airflow or Databricks Workflows as the master orchestrator
- Trigger a dbt Cloud job via API after Bronze ingestion
- After dbt succeeds, trigger a Power BI semantic model refresh via REST API
- Add SLA monitoring: alert if the full pipeline takes more than 3 hours
- Build the data freshness monitoring table and populate it on each run
- Create a simple freshness dashboard in Power BI or Looker on the monitoring table
References
- Apache Airflow Documentation
- Airflow — DAGs and Task Dependencies
- Microsoft Fabric Data Factory Documentation
- dbt Cloud — Jobs Documentation
- dbt — Source Freshness
- Databricks Workflows Documentation
- Databricks — Workflow Monitoring
- AWS Glue Workflows
- AWS Step Functions
- Google Cloud Composer
- Power BI REST API — Refresh Dataset
Discover more from SQLYARD
Subscribe to get the latest posts sent to your email.


