This is Yin

Yin Yin Chan

Data Workflow Part 2: Data Ingestion running DAGs inside Airflow

by Yin Yin Chan 🤓 June 2026

TL;DR

We now have Airflow running locally based on our guide, Data Workflow Part 1: Workflow Orchestration with Apache Airflow 3.2.x.

In Part 2 of our Data Workflow series, we will be combining a few concepts from earlier exercises to get our first DAG up and running.

Have a question?

Ask away! I always do my best to help, and if I can't, I'm sure we can find the answer together.

Email me

DAGs

Prerequisite skills

Docker Compose Python & packages API usage AWS basics Databases Debugging

A DAG (Directed Acyclic Graph) is the code representation of a data pipeline workflow. It is a collection of all the tasks you want to run organized in a way that reflects their relationships and dependencies.

Create your first DAG file

ls
cd dags/
# You need to be in your dags directory 
# e.g. /your-project-repo/orchestration/dags

touch ladot_parking_ingestion_daily_sql.py
  1. Import Airflow decorators (@dag and @task) along with calendar tracking tools
# In ladot_parking_ingestion_daily_sql.py

from datetime import datetime, timedelta
from airflow.decorators import dag, task
  1. Decorate your main wrapper function to define pipeline settings. I’ve gone ahead and commented on further descriptions of certain options we need.
@dag(
    dag_id="ladot_parking_ingestion_daily_sql",
    start_date=datetime(2026, 6, 8), # set to a past date to allow immediate execution
    catchup=False, # don't backfill missed runs
    schedule=timedelta(days=1), # use timedelta for daily schedule precision
    default_args={
        "owner": "airflow",
        "depends_on_past": False, # if yesterday's run fails, it won't block today's run
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
)
  1. Write out your main pipeline ingestion logic. To start, we copy the raw Python from pipeline.py in an earlier pipeline exercise and paste into our new piepline function
@dag(
    ...
)
def ladot_parking_ingestion_daily_sql():
    @task(task_id="ingest_meter_occupancy_data")
    def extract_and_load_meter_occupancy_data():
        # paste the code from pipeline.py here
        app_token = os.getenv("APP_TOKEN")
        headers = {
            ...
        }
        ...
        df_chunk.to_sql(
            ...
        )
        ...

Import our pipeline tools at the top of the file. Notice I removed tqdm from this script (to reduce complications when doing this test run of Airflow). Also removed the tqdm method call around Line 79.

# In ladot_parking_ingestion_daily_sql.py

import requests
import os
import pandas as pd
from io import StringIO
from sqlalchemy import create_engine

The purpose of this exercise is to get our DAG running in Airflow and catch any related errors in between, so we’re going to keep it simple and ingest data into our local PostgreSQL.

Note that we’ll need to default our db host to postgres since we’ll be running in Docker.

# Line ~63 in ladot_parking_ingestion_daily_sql.py

psql_host = os.getenv("POSTGRES_HOST", "postgres")

And make sure APP_TOKEN is in your docker-compose.yml environment variables:

x-airflow-common: &airflow-common
  build:
    context: .
    dockerfile: Dockerfile
  environment:
    ...
    AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
    AIRFLOW__API_AUTH__JWT_SECRET: ${AIRFLOW__API_AUTH__JWT_SECRET:-airflow_jwt_secret}
    AIRFLOW__API_AUTH__JWT_ISSUER: ${AIRFLOW__API_AUTH__JWT_ISSUER:-airflow}
    ...
    APP_TOKEN: ${APP_TOKEN}
    POSTGRES_USER: ${POSTGRES_USER}
    POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    POSTGRES_DB: ${POSTGRES_DB}
    ...
  1. Instantiate the execution and call the main entrypoint function
    # Instantiate the task(s)
    run_meter_occupancy_data = extract_and_load_meter_occupancy_data()

# Instantiate DAG
ladot_dag = ladot_parking_ingestion_daily_sql()
  1. Let’s give it a go. Since we’ve updated the .env file, we’ll ned to bring Docker down then back up again
docker compose down
docker compose build --no-cache
docker compose up
  1. Reload your local Airflow dashboard localhost:8080. If necessary, enter your username / password: airflow / airflow.

If you don’t see your DAG in the dashboard, you can manually rescan for it in a different terminal tab:

cd orchestration/
docker compose exec airflow-scheduler airflow dags reserialize

You should see your DAG ladot_parking_ingestion_daily_sql listed on your dash but with the trigger toggle disabled (this is default). Manually start your dag by clicking the > play.

  1. Airflow commands:

You can run this command to show a list of dags in service

# List all the dags
docker compose exec airflow-scheduler airflow dags list

If your dag hasn’t shown up yet, run this command to have your dag instantly show up in your dashboard without waiting.

# Force an immediate rescan (force the running scheduler to process newly fixed code in your dag file)
docker compose exec airflow-scheduler airflow dags reserialize

DAG in Airflow Dashboard

Success here means you can see your DAG within your localhost:8080/dags Airflow Dashboard (select the All tab).

Running your DAG

You’ll notice your DAG’s auto-trigger toggle is in the “off” position. You can turn it on for it to run on the next scheduled time. But since we are simply running an example exercise, you can manually trigger the dag to test that it works. Click on the > play button to get it going. Select “Single Run” and have it start immediately.

Success here will quite literally show a green “success” checkmark ✅. You should also be able to see the print() outputs in your DAG’s task logs within Airflow dashboard.

Let’s check the database. In a new terminal tab:

ls
cd orchestration # if necessary
# make sure you're in the /your-project/orchestration directory first

docker compose exec postgres psql -U airflow -d la_meter_parking_db_airflow

# You should be in psql now, with something like `your_db_airflow=#`

SELECT * FROM meter_occupancy LIMIT 10;

Great! Our Airflow and DAG work, next, we take it into AWS.