Goal
id: postgres_taxi
namespace: zoomcamp
# select between yellow and green taxis
inputs:
- id: taxi
displayName: "Select taxi type" # changes prompt for selecting input
type: SELECT
values: ['yellow', 'green']
defaults: 'yellow'
- id: year
displayName: "Select year"
type: SELECT
values: ['2019', '2020']
defaults: '2019'
- id: month
displayName: 'Select month'
type: SELECT
values: ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
defaults: '01'
variables:
# file we are downloading - note this only works if the file name is standardized as such. e.g. yellow_tripdata_2019-01.csv.gz
file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
# where we load monthly data initially
staging_table: "public.{{inputs.taxi}}_tripdata_staging"
# has all the data in one table
table: "public.{{inputs.taxi}}_tripdata"
# the data itself which comes from the extract task
data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
tasks:
- id: set_label # Adds labels (file and taxi) for better traceability of workflow runs.
type: io.kestra.plugin.core.execution.Labels
Labels:
file: "{{render(vars.file)}}"
taxi: "{{inputs.taxi}}"
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- <https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}>}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
# for context, this is an URL of one of the green taxi months: <https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-01.csv.gz>
version: "3.8"
services:
postgres:
image: postgres
container_name: postgres-db
environment:
POSTGRES_USER: kestra
POSTGRES_PASSWORD: k3str4
POSTGRES_DB: postgres-zoomcamp
ports:
- "5432:5432"
volumes:
- postgres-data:/var/lib/postgresql/data
volumes:
postgres-data:
id: postgres_taxi
namespace: zoomcamp
inputs:
- id: taxi
displayName: "Select taxi type"
type: SELECT
values: ['yellow', 'green']
defaults: 'yellow'
- id: year
displayName: "Select year"
type: SELECT
values: ['2019', '2020']
defaults: '2019'
- id: month
displayName: 'Select month'
type: SELECT
values: ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
defaults: '01'
variables:
file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
staging_table: "public.{{inputs.taxi}}_tripdata_staging"
table: "public.{{inputs.taxi}}_tripdata"
data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
tasks:
- id: set_label
type: io.kestra.plugin.core.execution.Labels
Labels:
file: "{{render(vars.file)}}"
taxi: "{{inputs.taxi}}"
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- <https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}>}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
- id: green_create_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
unique_row_id text,
filename text,
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
# note that unique_row_id and filename are two extra columns
# unique_row_id is generated based on MD5 hash based on data in that row to help prevent adding duplicates when re-running the same data
- id: green_create_staging_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
unique_row_id text,
filename text,
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
# the staging table will be truncated to remove all the data, then it'll be populated with new data everytime we add another month
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql
values:
url: jdbc:postgresql://host.docker.internal:5432/postgres-zoomcamp #
username: kestra # the user and pass match what we have in our docker-compose.yaml file
password: k3str4
# note that the sql task requires a URL and user/pass.
# we will use plugins to set all Postgres tasks (which are multiple) to the same creds and URL so we don't have to copy it over and over.