Data Workflow Part 2: Data Ingestion running DAGs inside Airflow
by Yin Yin Chan 🤓 June 2026
TL;DR
We now have Airflow running locally based on our guide, Data Workflow Part 1: Workflow Orchestration with Apache Airflow 3.2.x.
In Part 2 of our Data Workflow series, we will be combining a few concepts from earlier exercises to get our first DAG up and running.
Have a question?
Ask away! I always do my best to help, and if I can't, I'm sure we can find the answer together.
Email meContents
DAGs
Prerequisite skills
A DAG (Directed Acyclic Graph) is the code representation of a data pipeline workflow. It is a collection of all the tasks you want to run organized in a way that reflects their relationships and dependencies.
Create your first DAG file
ls
cd dags/
# You need to be in your dags directory
# e.g. /your-project-repo/orchestration/dags
touch ladot_parking_ingestion_daily_sql.py
- Import Airflow decorators (
@dagand@task) along with calendar tracking tools
# In ladot_parking_ingestion_daily_sql.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
- Decorate your main wrapper function to define pipeline settings. I’ve gone ahead and commented on further descriptions of certain options we need.
@dag(
dag_id="ladot_parking_ingestion_daily_sql",
start_date=datetime(2026, 6, 8), # set to a past date to allow immediate execution
catchup=False, # don't backfill missed runs
schedule=timedelta(days=1), # use timedelta for daily schedule precision
default_args={
"owner": "airflow",
"depends_on_past": False, # if yesterday's run fails, it won't block today's run
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
)
- Write out your main pipeline ingestion logic. To start, we copy the raw Python from
pipeline.pyin an earlier pipeline exercise and paste into our new piepline function
@dag(
...
)
def ladot_parking_ingestion_daily_sql():
@task(task_id="ingest_meter_occupancy_data")
def extract_and_load_meter_occupancy_data():
# paste the code from pipeline.py here
app_token = os.getenv("APP_TOKEN")
headers = {
...
}
...
df_chunk.to_sql(
...
)
...
Import our pipeline tools at the top of the file. Notice I removed tqdm from this script (to reduce complications when doing this test run of Airflow). Also removed the tqdm method call around Line 79.
# In ladot_parking_ingestion_daily_sql.py
import requests
import os
import pandas as pd
from io import StringIO
from sqlalchemy import create_engine
The purpose of this exercise is to get our DAG running in Airflow and catch any related errors in between, so we’re going to keep it simple and ingest data into our local PostgreSQL.
Note that we’ll need to default our db host to postgres since we’ll be running in Docker.
# Line ~63 in ladot_parking_ingestion_daily_sql.py
psql_host = os.getenv("POSTGRES_HOST", "postgres")
And make sure APP_TOKEN is in your docker-compose.yml environment variables:
x-airflow-common: &airflow-common
build:
context: .
dockerfile: Dockerfile
environment:
...
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
AIRFLOW__API_AUTH__JWT_SECRET: ${AIRFLOW__API_AUTH__JWT_SECRET:-airflow_jwt_secret}
AIRFLOW__API_AUTH__JWT_ISSUER: ${AIRFLOW__API_AUTH__JWT_ISSUER:-airflow}
...
APP_TOKEN: ${APP_TOKEN}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
...
- Instantiate the execution and call the main entrypoint function
# Instantiate the task(s)
run_meter_occupancy_data = extract_and_load_meter_occupancy_data()
# Instantiate DAG
ladot_dag = ladot_parking_ingestion_daily_sql()
- Let’s give it a go. Since we’ve updated the
.envfile, we’ll ned to bring Docker down then back up again
docker compose down
docker compose build --no-cache
docker compose up
- Reload your local Airflow dashboard
localhost:8080. If necessary, enter your username / password: airflow / airflow.
If you don’t see your DAG in the dashboard, you can manually rescan for it in a different terminal tab:
cd orchestration/
docker compose exec airflow-scheduler airflow dags reserialize
You should see your DAG ladot_parking_ingestion_daily_sql listed on your dash but with the trigger toggle disabled (this is default). Manually start your dag by clicking the > play.
- Airflow commands:
You can run this command to show a list of dags in service
# List all the dags
docker compose exec airflow-scheduler airflow dags list
If your dag hasn’t shown up yet, run this command to have your dag instantly show up in your dashboard without waiting.
# Force an immediate rescan (force the running scheduler to process newly fixed code in your dag file)
docker compose exec airflow-scheduler airflow dags reserialize
DAG in Airflow Dashboard
Success here means you can see your DAG within your localhost:8080/dags Airflow Dashboard (select the All tab).
Running your DAG
You’ll notice your DAG’s auto-trigger toggle is in the “off” position. You can turn it on for it to run on the next scheduled time. But since we are simply running an example exercise, you can manually trigger the dag to test that it works. Click on the > play button to get it going. Select “Single Run” and have it start immediately.
Success here will quite literally show a green “success” checkmark ✅. You should also be able to see the print() outputs in your DAG’s task logs within Airflow dashboard.
Let’s check the database. In a new terminal tab:
ls
cd orchestration # if necessary
# make sure you're in the /your-project/orchestration directory first
docker compose exec postgres psql -U airflow -d la_meter_parking_db_airflow
# You should be in psql now, with something like `your_db_airflow=#`
SELECT * FROM meter_occupancy LIMIT 10;
Great! Our Airflow and DAG work, next, we take it into AWS.
