Source
yaml
id: azure-blob-to-bigquery
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: "{{ trigger.blobs | jq('.[].uri') }}"
tasks:
- id: upload_from_file
type: io.kestra.plugin.gcp.bigquery.Load
destinationTable: gcpProject.dataset.table
from: "{{ taskrun.value }}"
writeDisposition: WRITE_APPEND
projectId: yourGcpProject
serviceAccount: "{{ secret('GCP_CREDS') }}"
ignoreUnknownValues: true
autodetect: true
format: CSV
csvOptions:
allowJaggedRows: true
encoding: UTF-8
fieldDelimiter: ","
- id: dbt_cloud_job
type: io.kestra.plugin.dbt.cloud.TriggerRun
accountId: "{{ secret('DBT_CLOUD_ACCOUNT_ID') }}"
token: "{{ secret('DBT_CLOUD_API_TOKEN') }}"
jobId: "366381"
wait: true
triggers:
- id: watch
type: io.kestra.plugin.azure.storage.blob.Trigger
interval: PT30S
endpoint: https://kestra.blob.core.windows.net
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
container: stage
prefix: marketplace/
action: MOVE
moveTo:
container: stage
name: archive/marketplace/
About this blueprint
Trigger Ingest Azure BigQuery dbt
When a new file arrives in Azure Blob Storage, this flow processes that file and uploads it to BigQuery. The flow is configured to process multiple files in parallel if multiple files are uploaded to Azure BLOB container. Then, the flow runs a dbt Cloud job and waits for its completion. Kestra retrieves dbt Cloud job's artifacts and dynamically parses them within the Gantt view. This way, you can inspect how long it took to run each dbt model and dbt test. This flow assumes that sensitive information such as Azure Connection string, Google Service Account, and dbt Cloud API token are stored as secrets. For testing, you can copy-paste those directly into the flow.