Goal

Extraction

id: postgres_taxi
namespace: zoomcamp

# select between yellow and green taxis

inputs: 
	- id: taxi
		displayName: "Select taxi type" # changes prompt for selecting input 
		type: SELECT
		values: ['yellow', 'green']
		defaults: 'yellow'
		
	- id: year
		displayName: "Select year"
		type: SELECT
		values: ['2019', '2020']
		defaults: '2019'
		
	- id: month
		displayName: 'Select month'
		type: SELECT
		values: ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
		defaults: '01'
		
variables:
  # file we are downloading - note this only works if the file name is standardized as such. e.g. yellow_tripdata_2019-01.csv.gz
  file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
  # where we load monthly data initially
  staging_table: "public.{{inputs.taxi}}_tripdata_staging"
  # has all the data in one table
  table: "public.{{inputs.taxi}}_tripdata"
  # the data itself which comes from the extract task
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"

tasks:
	- id: set_label # Adds labels (file and taxi) for better traceability of workflow runs.
		type: io.kestra.plugin.core.execution.Labels
		Labels:
			file: "{{render(vars.file)}}"
			taxi: "{{inputs.taxi}}"
	
	- id: extract
		type: io.kestra.plugin.scripts.shell.Commands
		outputFiles:
			- "*.csv"
		taskRunner:
			type: io.kestra.plugin.core.runner.Process
		commands:
			- wget -qO- <https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}>}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
		
		# for context, this is an URL of one of the green taxi months: <https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-01.csv.gz>
		
		
		

Creating the Postgres DB for this ETL

version: "3.8"
services:
  postgres:
    image: postgres
    container_name: postgres-db
    environment:
      POSTGRES_USER: kestra
      POSTGRES_PASSWORD: k3str4
      POSTGRES_DB: postgres-zoomcamp
    ports:
      - "5432:5432"
    volumes:
      - postgres-data:/var/lib/postgresql/data
volumes:
  postgres-data:

We can access this DB through PGAdmin by using it locally (it won’t work by accessing localhost:5432)

image.png

Now, add the green taxi table to Postgres using a queries task

id: postgres_taxi
namespace: zoomcamp

inputs: 
	- id: taxi
		displayName: "Select taxi type"
		type: SELECT
		values: ['yellow', 'green']
		defaults: 'yellow'
		
	- id: year
		displayName: "Select year"
		type: SELECT
		values: ['2019', '2020']
		defaults: '2019'
		
	- id: month
		displayName: 'Select month'
		type: SELECT
		values: ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
		defaults: '01'
		
variables:
  file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
  staging_table: "public.{{inputs.taxi}}_tripdata_staging"
  table: "public.{{inputs.taxi}}_tripdata"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"

tasks:
	- id: set_label
		type: io.kestra.plugin.core.execution.Labels
		Labels:
			file: "{{render(vars.file)}}"
			taxi: "{{inputs.taxi}}"
	
	- id: extract
		type: io.kestra.plugin.scripts.shell.Commands
		outputFiles:
			- "*.csv"
		taskRunner:
			type: io.kestra.plugin.core.runner.Process
		commands:
			- wget -qO- <https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}>}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
		
		- id: green_create_table
		        type: io.kestra.plugin.jdbc.postgresql.Queries
		        sql: |
		          CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
		              unique_row_id          text,
		              filename               text,
		              VendorID               text,
		              lpep_pickup_datetime   timestamp,
		              lpep_dropoff_datetime  timestamp,
		              store_and_fwd_flag     text,
		              RatecodeID             text,
		              PULocationID           text,
		              DOLocationID           text,
		              passenger_count        integer,
		              trip_distance          double precision,
		              fare_amount            double precision,
		              extra                  double precision,
		              mta_tax                double precision,
		              tip_amount             double precision,
		              tolls_amount           double precision,
		              ehail_fee              double precision,
		              improvement_surcharge  double precision,
		              total_amount           double precision,
		              payment_type           integer,
		              trip_type              integer,
		              congestion_surcharge   double precision
		          );
		
		# note that unique_row_id and filename are two extra columns 
		# unique_row_id is generated based on MD5 hash based on data in that row to help prevent adding duplicates when re-running the same data
		
		- id: green_create_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              lpep_pickup_datetime   timestamp,
              lpep_dropoff_datetime  timestamp,
              store_and_fwd_flag     text,
              RatecodeID             text,
              PULocationID           text,
              DOLocationID           text,
              passenger_count        integer,
              trip_distance          double precision,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              ehail_fee              double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              payment_type           integer,
              trip_type              integer,
              congestion_surcharge   double precision
          );
		
		# the staging table will be truncated to remove all the data, then it'll be populated with new data everytime we add another month 
		
	pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
      url: jdbc:postgresql://host.docker.internal:5432/postgres-zoomcamp # 
      username: kestra # the user and pass match what we have in our docker-compose.yaml file
      password: k3str4
  
 # note that the sql task requires a URL and user/pass. 
 # we will use plugins to set all Postgres tasks (which are multiple) to the same creds and URL so we don't have to copy it over and over.