Data Workflow Part 3: From Airflow & DAGs to Cloud Data Lake with AWS
by Yin Yin Chan đ€ June 2026
TL;DR
We now have Airflow and our first DAG running properly with our guide, Data Workflow Part 2: Data Ingestion running DAGs inside Airflow.
Taking the next step in Part 3 of our Data Workflow series, we extract data from the Socrata API, convert to Parquet, and ship it off to AWS S3.
We'll go full circle as we deploy Terraform and adapt code from our barebones data pipeline.
Have a question?
Ask away! I always do my best to help, and if I can't, I'm sure we can find the answer together.
Email meContents
Cloud Data Lake with AWS
Prerequisite skills
To summarize what weâve done so far, weâve set up Apache Airflow in Docker and successfully ran our first task with data ingestion into PostgreSQL on Docker.
We need to take it one step further and transition from our PostgreSQL set up into our Data Lake on AWS. Weâll implement with the first dataset and then implement similar tasks for our last 2 datasets.
Setting Up
We need to make sure we have the following essentials to move data onto AWS:
- Python libraries
boto3andpyarrow.boto3is the official AWS SDK for Python.pyarrowwill have what we need to compress our data into high-performance Parquet files. First, letâs update ourrequirements.txtfile.
# In orchestration/requirements.txt
# AWS Provider & SDK
apache-airflow-providers-amazon
boto3
# Data Compression & Serialization
pyarrow
# Core Database Driver required by SQLAlchemy to connect to Postgres 18
psycopg2-binary
- Make sure your
.envanddocker-compose.ymlincludes all the AWS credentials youâll need:
# In .env
AWS_ACCESS_KEY_ID=aws_access_key_id_example
AWS_SECRET_ACCESS_KEY=aws_secret_access_key_example
AWS_DEFAULT_REGION=us-west-1
AWS_S3_BUCKET=ladot-meter-parking-de-project-aws-data-lake # or your-project-name-aws-data-lake
# In docker-compose.yml
environment:
&airflow-common-env
...
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_S3_BUCKET: ${AWS_S3_BUCKET}
With that update, weâll need to re-compile our Docker image layers. In your terminal:
ls
# make sure you're in your ./orchestration directory
docker compose down
docker compose build --no-cache
docker compose up
Reload your local Airflow dashboard localhost:8080. If necessary, enter your username / password: airflow / airflow.
If you donât see your DAG in the dashboard, you can manually rescan for it in a different terminal tab:
cd orchestration/
docker compose exec airflow-scheduler airflow dags reserialize
Modify DAG for AWS S3 Ingestion
In the earlier sections, we were dealing with a DAG that saved our data into a local PostgreSQL setup on Docker. For this section, weâll work with the ladot_parking_meter_ingestion_daily.py file.
ls
cd dags/
# You need to be in your dags directory
# e.g. /your-project-repo/orchestration/dags
touch ladot_parking_meter_ingestion_daily.py
If you need a side-by-side of the before and after, diff between my versions of ladot_parking_ingestion_daily_sql.py and ladot_parking_meter_ingestion_daily.py.
Working with ladot_parking_meter_ingestion_daily.py, instead of saving our data down to the local PostgreSQL, weâll compress our data into Parquet and push to S3.
First, letâs make sure we have the imports we need. Youâll notice we removed create_engine and added BytesIO and boto3.
# At the top of ladot_parking_meter_ingestion_daily.py
from airflow.decorators import dag, task
import requests
import os
import pandas as pd
import boto3
from io import StringIO, BytesIO
Then, we update our @task and modify our to_sql() code block to to compress data into a Parquet and send off to S3:
@dag(
...
)
...
# In ladot_parking_meter_ingestion_daily.py
# Our previous sql-version Line 32 ~ Line 88 are replaced with:
#1. set up variables to insert into request
app_token = os.getenv("APP_TOKEN")
s3_bucket = os.getenv("AWS_S3_BUCKET") # new
current_date = datetime.now().strftime('%Y%m%d_%H%M%S') # new
file_name = f"meter_occupancy/run_{current_date}.parquet" # new
headers = {
"X-App-Token": app_token,
"Content-Type": "application/json",
}
payload_csv = {
"query": "SELECT *",
"orderingSpecifier": "discard"
}
meter_occupancy_csv = "https://data.lacity.org/api/v3/views/e7h6-4a3e/export.csv"
#2. retrieve data from API
meter_occupancy_response = requests.post(meter_occupancy_csv, headers=headers, json=payload_csv, timeout=100)
meter_occupancy_response.raise_for_status() # Check if the request was successful
#3. convert to dataframe
df = pd.read_csv(StringIO(meter_occupancy_response.text)) # changed, we don't need to iterate or chunk anymore
#4. write to parquet in memory
parquet_buffer = BytesIO() # new
df.to_parquet(parquet_buffer, index=False, engine="pyarrow", compression="snappy") # new
#5. stream parquet file to S3
s3_client = boto3.client("s3") # new, the entire for loop is removed, and we send the data in one shot
try:
response = s3_client.put_object(
Bucket=s3_bucket,
Key=file_name,
Body=parquet_buffer.getvalue()
)
print(f"Successfully uploaded {file_name} to S3 bucket {s3_bucket}")
except Exception as e:
print(f"Error uploading {file_name} to S3 bucket {s3_bucket}: {e}")
raise
...
ladot_dag = ladot_parking_meter_ingestion_daily()
With these new @task updates, we need to get our task updates up on Airflow now:
docker compose exec airflow-scheduler airflow dags reserialize
Together with Terraform
Weâre moving step-by-step so itâs easier to catch errors and breaks in the process.
Now, weâre ready to put another piece together: letâs set up our AWS infrastructure with Terraform.
In a different terminal tab:
cd terraform/
terraform init
# set your secret keys. replace the below values with your own
export AWS_ACCESS_KEY_ID="your_access_key_id_from_csv"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key_from_csv"
export AWS_DEFAULT_REGION="us-west-1"
terraform plan
# check that the plan looks as it should
terraform apply
# Enter 'yes' after the prompt
Log in to your AWS dashboard to make sure your S3 bucket name created through Terraform matches the one you set in your orchestration/.env for key AWS_S3_BUCKET.
Back to your Airflow dashabord, manually run your DAG ladot_parking_meter_ingestion_daily. Click the > play icon, select âSingle Runâ and then click âTriggerâ.
Your DAG should show a green âsuccessâ checkmark â
. Go into your AWS dashboard again, look for your S3 bucket, if you click into it, you should be able to see your meter_occupancy table
Within your S3 bucket should be the Parquet with a filename equivalent of f"meter_occupancy/run_{current_date}.parquet".
If you see it there, itâs a success!
Extract & Load All Datasets
Great! Our DAG-to-AWS works well. Letâs get it set up for the other 2 LADOT datasets we are going to work with.
Weâre going to create our main ladot_parking_ingestion_daily.py file to orchestrate all 3 datasets from LADOT using the Socrata Open Data API since these datasets are specifcally powered by Socrata.
ls
cd dags/
# Again, you need to be in your dags directory
# e.g. /your-project-repo/orchestration/dags
touch ladot_parking_ingestion_daily.py
Using Socrata API
Here, weâre going to refactor our ingestion function (from pipeline/pipeline-sodapy.py which worked for 1 dataset) into an IngestionEngine class with a method to handle the extract and load.
This way we can create 3 instances (and maybe more if we find another LADOT dataset useful to us) without rewriting large blocks of the same code. By using the Object Oriented approach in this specific case, we:
- have zero code duplication, making it much easier to maintain and
- make our code scalable so that adding new datasets into our pipeline only requires a few lines of code using configurations.
We donât need to change the imports from the above section.
Letâs highlight a few of the important parts of ladot_parking_ingestion_daily.py.
The reason we picked âLADOT Parking Meter Occupancyâ to try out initially is because it only had ~4213 rows of data. Now that weâre creating a Python class to also run ingestion for the 2 other datasets where one exceeds 25 million rows of data, weâll need to process in chunks.
- Letâs make sure we have the additional packages weâll need:
...
from sodapy import Socrata
from itertools import count
import json
Since intertools and json are built-in Python modules, we only need to add sodapy to our requirements.txt
...
# To connect to the Socrata Open Data API
sodapy
Because of this change, weâll need to rebuild Docker:
docker compose down
docker compose build --no-cache
docker compose up
Youâll notice in this exercise, we had to move away from the def chunked_iterable function which was chunking after we called the Socrata API for the entire dataset. This was creating 500 response errors for our call to the 25 million rows dataset. We needed the make an adjustment to our code so that weâre chunking before calling the Socrata API.
# The class declaration
class IngestionEngine:
def __init__(self, dataset_name: str, dataset_id: str):
"""We set our attribute definitions"""
...
self.socrata_client = Socrata("data.lacity.org", self.app_token, timeout=120) # we need to increase the timeout in the case of extracting millions of rows.
# This is our main data injection method and where we break out the data extraction into manageable chunks
def extract_and_load_data(self, chunk_size: int = 20000):
...
try:
for i in count(start=0): # iterating through infinity (and conditionally breaking)
offset = i * chunk_size
chunked = self.socrata_client.get( # we are calling the Socrata API in chunks defined by chunk_size
self.dataset_id,
limit=chunk_size,
offset=offset, # each time the loop runs, we make sure to start the extraction from where we left off previously
order=":id"
)
print(f"Fetched rows {offset} to {offset + chunk_size} in {self.dataset_name}")
if not chunked or len(chunked) == 0:
break # make sure we have a break so the loop doesn't run forever
self._upload_chunk_to_s3(chunked, current_date, i) # we take the extraction and load it into S3
except Exception as e:
print(f"Error interruption while ingesting {self.dataset_name}: {e}")
raise
...
Similar to what we did in the standalone pipeline when extracting with the Socrata API, which did not integrate with Airflow nor AWS, we take our chunk, converted it to a DataFrame, and flattened dictionaries and lists before inserting into the db.
def _upload_chunk_to_s3(self, records: list, timestamp: str, chunk_idx: int):
s3_filename = f"{self.dataset_name}/run_{timestamp}_chunk_{chunk_idx:05d}.parquet"
#3. convert to dataframe with datatypes that won't break db injection
df_chunk = pd.DataFrame.from_records(records)
df_chunk = df_chunk.map(
lambda x: json.dumps(x) if isinstance(x, (dict, list)) else x # convert any nested dictionaries or lists to JSON strings to avoid issues when inserting into the database
)
#4. write to parquet in memory
parquet_buffer = BytesIO()
df_chunk.to_parquet(parquet_buffer, index=False, engine="pyarrow", compression="snappy")
#5. stream parquet file to S3
try:
response = self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=s3_filename,
Body=parquet_buffer.getvalue()
)
print(f"Successfully uploaded {s3_filename} to S3 bucket {self.s3_bucket}")
except Exception as e:
print(f"Error uploading {s3_filename} to S3 bucket {self.s3_bucket}: {e}")
raise
You can reference the entire IngestionEngine class in my ladot_parking_ingestion_daily.py file.
For each dataset, we create its own @task and an individual instance for each.
@task(task_id="ingest_meter_occupancy_data")
def extract_and_load_meter_occupancy_data():
"""
For the LADOT Parking Meter Occupancy dataset
"""
engine = IngestionEngine(
dataset_name="meter_occupancy",
dataset_id="e7h6-4a3e"
)
engine.extract_and_load_data()
@task(task_id="ingest_parking_citations_data")
def extract_and_load_parking_citations_data():
""
For the Parking Citations dataset:
""
engine = IngestionEngine(
dataset_name="parking_citations",
dataset_id="4f5p-udkv"
)
engine.extract_and_load_data()
@task(task_id="ingest_parking_inventory_policies_data")
def extract_and_load_parking_inventory_policies_data():
"""
For the LADOT Metered Parking Inventory & Policies dataset
"""
engine = IngestionEngine(
dataset_name="parking_inventory_policies",
dataset_id="s49e-q6j2"
)
engine.extract_and_load_data()
# We need to instantiate each task
run_meter_occupancy_data = extract_and_load_meter_occupancy_data()
run_parking_citations_data = extract_and_load_parking_citations_data()
run_parking_inventory_policies_data = extract_and_load_parking_inventory_policies_data()
# this is Airflow's way of knowing to run these tasks one after the other
run_meter_occupancy_data >> run_parking_inventory_policies_data >> run_parking_citations_data
Notice this line: run_meter_occupancy_data >> run_parking_inventory_policies_data >> run_parking_citations_data. We tell Airflow to run each task one after the other so the Socrata client doesnât reject our connection for running so many calls at the same time.
Now, we need to get our task updates up on Airflow now:
docker compose exec airflow-scheduler airflow dags reserialize
In your Airflow dashboard, when you go to Dags localhost:8080/dags, you should see ladot_parking_ingestion_daily listed. Click into that DAG and then click on âTasksâ. Youâll see our 3 tasks on that list.
Letâs run them. Click on the > play button to trigger a manual run. âSingle Runâ > âTriggerâ.
A couple measures of success here:
- In your Airflow dashboard, go to Dags
localhost:8080/dags, click into this DAG weâre working with:ladot_parking_ingestion_daily.- Click on âRunsâ
- Click into the most recent run âmanual__2026-âŠ+00:00â
- Each task should be listed on this next screen. How many ââ Successesâ do you have?
- If just one or two, give it some time. The 25 million entry dataset will take a while
- Once you see 3x ââ Successesâ, you did it!
- In your AWS S3 dashboard, go to âGeneral purpose bucketsâ
- Click on âladot-meter-parking-de-project-aws-data-lakeâ (or the data lake name you created)
- You should see 3x directories listed here, each of your dataset_name (e.g. đ meter_occupancy/)
- Click into one whoâs task was âSuccessâ in Airflow.
- Do you see the parquet file there? (e.g.
run_20260611_201807_chunk_00000.parquet) - If so, youâre good to go and ready for AWS Glue!
