This is Yin

Yin Yin Chan

Data Workflow Part 3: From Airflow & DAGs to Cloud Data Lake with AWS

by Yin Yin Chan đŸ€“ June 2026

TL;DR

We now have Airflow and our first DAG running properly with our guide, Data Workflow Part 2: Data Ingestion running DAGs inside Airflow.

Taking the next step in Part 3 of our Data Workflow series, we extract data from the Socrata API, convert to Parquet, and ship it off to AWS S3.

We'll go full circle as we deploy Terraform and adapt code from our barebones data pipeline.

Have a question?

Ask away! I always do my best to help, and if I can't, I'm sure we can find the answer together.

Email me

Cloud Data Lake with AWS

Prerequisite skills

Docker Compose Python & packages API usage AWS basics Databases Debugging

To summarize what we’ve done so far, we’ve set up Apache Airflow in Docker and successfully ran our first task with data ingestion into PostgreSQL on Docker.

We need to take it one step further and transition from our PostgreSQL set up into our Data Lake on AWS. We’ll implement with the first dataset and then implement similar tasks for our last 2 datasets.

Setting Up

We need to make sure we have the following essentials to move data onto AWS:

  1. Python libraries boto3 and pyarrow. boto3 is the official AWS SDK for Python. pyarrow will have what we need to compress our data into high-performance Parquet files. First, let’s update our requirements.txt file.
# In orchestration/requirements.txt

# AWS Provider & SDK
apache-airflow-providers-amazon
boto3

# Data Compression & Serialization
pyarrow

# Core Database Driver required by SQLAlchemy to connect to Postgres 18
psycopg2-binary
  1. Make sure your .env and docker-compose.yml includes all the AWS credentials you’ll need:
# In .env
AWS_ACCESS_KEY_ID=aws_access_key_id_example
AWS_SECRET_ACCESS_KEY=aws_secret_access_key_example
AWS_DEFAULT_REGION=us-west-1
AWS_S3_BUCKET=ladot-meter-parking-de-project-aws-data-lake # or your-project-name-aws-data-lake
# In docker-compose.yml
  environment:
    &airflow-common-env
    ...
    AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
    AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
    AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
    AWS_S3_BUCKET: ${AWS_S3_BUCKET}

With that update, we’ll need to re-compile our Docker image layers. In your terminal:

ls
# make sure you're in your ./orchestration directory

docker compose down
docker compose build --no-cache
docker compose up

Reload your local Airflow dashboard localhost:8080. If necessary, enter your username / password: airflow / airflow.

If you don’t see your DAG in the dashboard, you can manually rescan for it in a different terminal tab:

cd orchestration/
docker compose exec airflow-scheduler airflow dags reserialize

Modify DAG for AWS S3 Ingestion

In the earlier sections, we were dealing with a DAG that saved our data into a local PostgreSQL setup on Docker. For this section, we’ll work with the ladot_parking_meter_ingestion_daily.py file.

ls
cd dags/
# You need to be in your dags directory 
# e.g. /your-project-repo/orchestration/dags

touch ladot_parking_meter_ingestion_daily.py

If you need a side-by-side of the before and after, diff between my versions of ladot_parking_ingestion_daily_sql.py and ladot_parking_meter_ingestion_daily.py.

Working with ladot_parking_meter_ingestion_daily.py, instead of saving our data down to the local PostgreSQL, we’ll compress our data into Parquet and push to S3.

First, let’s make sure we have the imports we need. You’ll notice we removed create_engine and added BytesIO and boto3.

# At the top of ladot_parking_meter_ingestion_daily.py

from airflow.decorators import dag, task
import requests
import os
import pandas as pd
import boto3
from io import StringIO, BytesIO

Then, we update our @task and modify our to_sql() code block to to compress data into a Parquet and send off to S3:

@dag(
    ...
)
...
# In ladot_parking_meter_ingestion_daily.py
# Our previous sql-version Line 32 ~ Line 88 are replaced with:

    #1. set up variables to insert into request
    app_token = os.getenv("APP_TOKEN")
    s3_bucket = os.getenv("AWS_S3_BUCKET") # new
    
    current_date = datetime.now().strftime('%Y%m%d_%H%M%S') # new
    file_name = f"meter_occupancy/run_{current_date}.parquet" # new

    headers = {
        "X-App-Token": app_token,
        "Content-Type": "application/json",
    }
    payload_csv = {
        "query": "SELECT *",
        "orderingSpecifier": "discard"
    }
    meter_occupancy_csv = "https://data.lacity.org/api/v3/views/e7h6-4a3e/export.csv"

    #2. retrieve data from API
    meter_occupancy_response = requests.post(meter_occupancy_csv, headers=headers, json=payload_csv, timeout=100)
    meter_occupancy_response.raise_for_status() # Check if the request was successful

    #3. convert to dataframe
    df = pd.read_csv(StringIO(meter_occupancy_response.text)) # changed, we don't need to iterate or chunk anymore

    #4. write to parquet in memory
    parquet_buffer = BytesIO() # new
    df.to_parquet(parquet_buffer, index=False, engine="pyarrow", compression="snappy") # new

    #5. stream parquet file to S3
    s3_client = boto3.client("s3")  # new, the entire for loop is removed, and we send the data in one shot
    try:
        response =  s3_client.put_object(
            Bucket=s3_bucket,
            Key=file_name,
            Body=parquet_buffer.getvalue()
        )

        print(f"Successfully uploaded {file_name} to S3 bucket {s3_bucket}")
    except Exception as e:
        print(f"Error uploading {file_name} to S3 bucket {s3_bucket}: {e}")
        raise
...
ladot_dag = ladot_parking_meter_ingestion_daily()

With these new @task updates, we need to get our task updates up on Airflow now:

docker compose exec airflow-scheduler airflow dags reserialize

Together with Terraform

We’re moving step-by-step so it’s easier to catch errors and breaks in the process.

Now, we’re ready to put another piece together: let’s set up our AWS infrastructure with Terraform.

In a different terminal tab:

cd terraform/
terraform init

# set your secret keys. replace the below values with your own
export AWS_ACCESS_KEY_ID="your_access_key_id_from_csv"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key_from_csv"
export AWS_DEFAULT_REGION="us-west-1"

terraform plan
# check that the plan looks as it should

terraform apply
# Enter 'yes' after the prompt

Log in to your AWS dashboard to make sure your S3 bucket name created through Terraform matches the one you set in your orchestration/.env for key AWS_S3_BUCKET.

Back to your Airflow dashabord, manually run your DAG ladot_parking_meter_ingestion_daily. Click the > play icon, select “Single Run” and then click “Trigger”.

Your DAG should show a green “success” checkmark ✅. Go into your AWS dashboard again, look for your S3 bucket, if you click into it, you should be able to see your meter_occupancy table

Within your S3 bucket should be the Parquet with a filename equivalent of f"meter_occupancy/run_{current_date}.parquet".

If you see it there, it’s a success!

Extract & Load All Datasets

Great! Our DAG-to-AWS works well. Let’s get it set up for the other 2 LADOT datasets we are going to work with.

We’re going to create our main ladot_parking_ingestion_daily.py file to orchestrate all 3 datasets from LADOT using the Socrata Open Data API since these datasets are specifcally powered by Socrata.

ls
cd dags/
# Again, you need to be in your dags directory 
# e.g. /your-project-repo/orchestration/dags

touch ladot_parking_ingestion_daily.py

Using Socrata API

Here, we’re going to refactor our ingestion function (from pipeline/pipeline-sodapy.py which worked for 1 dataset) into an IngestionEngine class with a method to handle the extract and load.

This way we can create 3 instances (and maybe more if we find another LADOT dataset useful to us) without rewriting large blocks of the same code. By using the Object Oriented approach in this specific case, we:

  • have zero code duplication, making it much easier to maintain and
  • make our code scalable so that adding new datasets into our pipeline only requires a few lines of code using configurations.

We don’t need to change the imports from the above section. Let’s highlight a few of the important parts of ladot_parking_ingestion_daily.py.

The reason we picked “LADOT Parking Meter Occupancy” to try out initially is because it only had ~4213 rows of data. Now that we’re creating a Python class to also run ingestion for the 2 other datasets where one exceeds 25 million rows of data, we’ll need to process in chunks.

  1. Let’s make sure we have the additional packages we’ll need:
...
from sodapy import Socrata
from itertools import count
import json

Since intertools and json are built-in Python modules, we only need to add sodapy to our requirements.txt

...
# To connect to the Socrata Open Data API
sodapy

Because of this change, we’ll need to rebuild Docker:

docker compose down
docker compose build --no-cache
docker compose up

You’ll notice in this exercise, we had to move away from the def chunked_iterable function which was chunking after we called the Socrata API for the entire dataset. This was creating 500 response errors for our call to the 25 million rows dataset. We needed the make an adjustment to our code so that we’re chunking before calling the Socrata API.

# The class declaration
class IngestionEngine:
    def __init__(self, dataset_name: str, dataset_id: str):
        """We set our attribute definitions"""
        ...
        self.socrata_client = Socrata("data.lacity.org", self.app_token, timeout=120) # we need to increase the timeout in the case of extracting millions of rows.

    # This is our main data injection method and where we break out the data extraction into manageable chunks
    def extract_and_load_data(self, chunk_size: int = 20000):
        ...
        try:
            for i in count(start=0): # iterating through infinity (and conditionally breaking)
                offset = i * chunk_size

                chunked = self.socrata_client.get( # we are calling the Socrata API in chunks defined by chunk_size
                    self.dataset_id, 
                    limit=chunk_size,
                    offset=offset, # each time the loop runs, we make sure to start the extraction from where we left off previously
                    order=":id"
                )

                print(f"Fetched rows {offset} to {offset + chunk_size} in {self.dataset_name}")
                
                if not chunked or len(chunked) == 0:
                    break  # make sure we have a break so the loop doesn't run forever

                self._upload_chunk_to_s3(chunked, current_date, i) # we take the extraction and load it into S3

        except Exception as e:
            print(f"Error interruption while ingesting {self.dataset_name}: {e}")
            raise
        ...

Similar to what we did in the standalone pipeline when extracting with the Socrata API, which did not integrate with Airflow nor AWS, we take our chunk, converted it to a DataFrame, and flattened dictionaries and lists before inserting into the db.

def _upload_chunk_to_s3(self, records: list, timestamp: str, chunk_idx: int):
    s3_filename = f"{self.dataset_name}/run_{timestamp}_chunk_{chunk_idx:05d}.parquet"

    #3. convert to dataframe with datatypes that won't break db injection
    df_chunk = pd.DataFrame.from_records(records)
    df_chunk = df_chunk.map(
        lambda x: json.dumps(x) if isinstance(x, (dict, list)) else x # convert any nested dictionaries or lists to JSON strings to avoid issues when inserting into the database
    )

    #4. write to parquet in memory
    parquet_buffer = BytesIO()
    df_chunk.to_parquet(parquet_buffer, index=False, engine="pyarrow", compression="snappy")

    #5. stream parquet file to S3
    try:
        response = self.s3_client.put_object(
            Bucket=self.s3_bucket,
            Key=s3_filename,
            Body=parquet_buffer.getvalue()
        )
        print(f"Successfully uploaded {s3_filename} to S3 bucket {self.s3_bucket}")
    except Exception as e:
        print(f"Error uploading {s3_filename} to S3 bucket {self.s3_bucket}: {e}")
        raise

You can reference the entire IngestionEngine class in my ladot_parking_ingestion_daily.py file.

For each dataset, we create its own @task and an individual instance for each.

    @task(task_id="ingest_meter_occupancy_data")
    def extract_and_load_meter_occupancy_data():
        """
        For the LADOT Parking Meter Occupancy dataset
        """
        engine = IngestionEngine(
            dataset_name="meter_occupancy",
            dataset_id="e7h6-4a3e"
        )
        engine.extract_and_load_data()

    @task(task_id="ingest_parking_citations_data")
    def extract_and_load_parking_citations_data():
        ""
        For the Parking Citations dataset:
        ""
        engine = IngestionEngine(
            dataset_name="parking_citations",
            dataset_id="4f5p-udkv"
        )
        engine.extract_and_load_data()

    @task(task_id="ingest_parking_inventory_policies_data")
    def extract_and_load_parking_inventory_policies_data():
        """
        For the LADOT Metered Parking Inventory & Policies dataset
        """
        engine = IngestionEngine(
            dataset_name="parking_inventory_policies",
            dataset_id="s49e-q6j2"
        )
        engine.extract_and_load_data()

    # We need to instantiate each task
    run_meter_occupancy_data = extract_and_load_meter_occupancy_data()
    run_parking_citations_data = extract_and_load_parking_citations_data()
    run_parking_inventory_policies_data = extract_and_load_parking_inventory_policies_data()

    # this is Airflow's way of knowing to run these tasks one after the other
    run_meter_occupancy_data >> run_parking_inventory_policies_data >> run_parking_citations_data

Notice this line: run_meter_occupancy_data >> run_parking_inventory_policies_data >> run_parking_citations_data. We tell Airflow to run each task one after the other so the Socrata client doesn’t reject our connection for running so many calls at the same time.

Now, we need to get our task updates up on Airflow now:

docker compose exec airflow-scheduler airflow dags reserialize

In your Airflow dashboard, when you go to Dags localhost:8080/dags, you should see ladot_parking_ingestion_daily listed. Click into that DAG and then click on “Tasks”. You’ll see our 3 tasks on that list.

Let’s run them. Click on the > play button to trigger a manual run. “Single Run” > “Trigger”.

A couple measures of success here:

  1. In your Airflow dashboard, go to Dags localhost:8080/dags, click into this DAG we’re working with: ladot_parking_ingestion_daily.
    • Click on “Runs”
    • Click into the most recent run “manual__2026-
+00:00”
    • Each task should be listed on this next screen. How many “✅ Successes” do you have?
    • If just one or two, give it some time. The 25 million entry dataset will take a while
    • Once you see 3x “✅ Successes”, you did it!
  2. In your AWS S3 dashboard, go to “General purpose buckets”
    • Click on “ladot-meter-parking-de-project-aws-data-lake” (or the data lake name you created)
    • You should see 3x directories listed here, each of your dataset_name (e.g. 📁 meter_occupancy/)
    • Click into one who’s task was “Success” in Airflow.
    • Do you see the parquet file there? (e.g. run_20260611_201807_chunk_00000.parquet)
    • If so, you’re good to go and ready for AWS Glue!