Goal: extract from CSV but instead of adding to postgres, it’s going to GC storage (Data lake). Then we’ll use bigquery to create tables from it and then we process, run queries…
Advantage = we can leverage cloud to work on much larger datasets compared to local.
plugins we need: GCP service account, project id, location / region, bucket.
We’re going to use a KV store (like env variables). we can access values in workflow and update them but we won’t see them in the workflow which is good for hiding sensitive info.
id: 04_gcp_kv
namespace: zoomcamp
tasks:
- id: gcp_creds
type: io.kestra.plugin.core.kv.Set
key: GCP_CREDS
kvType: JSON
value: |
{
"type": "service_account",
"project_id": "kestra-project-449307",
...
}
- id: gcp_project_id
type: io.kestra.plugin.core.kv.Set
key: GCP_PROJECT_ID
kvType: STRING
value: kestra-sandbox # TODO replace with your project id
- id: gcp_location
type: io.kestra.plugin.core.kv.Set
key: GCP_LOCATION
kvType: STRING
value: northamerica-northeast2
- id: gcp_bucket_name
type: io.kestra.plugin.core.kv.Set
key: GCP_BUCKET_NAME
kvType: STRING
value: ag-de-zoomcamp-bucket # TODO make sure it's globally unique!
- id: gcp_dataset
type: io.kestra.plugin.core.kv.Set
key: GCP_DATASET
kvType: STRING
value: zoomcamp
at this point we’ve added:
execute this flow to add these KV to the store
Next, we’ll use the set up flow that will read the KV store and create our bucket and dataset.
id: 05_gcp_setup
namespace: zoomcamp
tasks:
- id: create_gcs_bucket
type: io.kestra.plugin.gcp.gcs.CreateBucket
ifExists: SKIP
storageClass: REGIONAL
name: "{{kv('GCP_BUCKET_NAME')}}"
- id: create_bq_dataset
type: io.kestra.plugin.gcp.bigquery.CreateDataset
name: "{{kv('GCP_DATASET')}}"
ifExists: SKIP
pluginDefaults:
- type: io.kestra.plugin.gcp
values:
serviceAccount: "{{kv('GCP_CREDS')}}"
projectId: "{{kv('GCP_PROJECT_ID')}}"
location: "{{kv('GCP_LOCATION')}}"
bucket: "{{kv('GCP_BUCKET_NAME')}}"