Source
yaml
id: snowflake-query-trigger
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.rows }}"
tasks:
- id: automated_process
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- echo "{{ json(taskrun.value) }}"
- echo "Welcome to Kestra {{ json(taskrun.value).FIRST_NAME }} {{
json(taskrun.value).LAST_NAME }}"
- id: update
type: io.kestra.plugin.jdbc.snowflake.Query
description: Update rows to avoid double trigger
sql: |
UPDATE KESTRA.PUBLIC.EMPLOYEES
SET UPDATE_TIMESTAMP = SYSDATE()
WHERE START_DATE = CURRENT_DATE();
pluginDefaults:
- type: io.kestra.plugin.jdbc.snowflake.Trigger
values:
url: jdbc:snowflake://your_account_id.snowflakecomputing.com?warehouse=DEMO
username: your_username
password: "{{ secret('SNOWFLAKE_PASSWORD') }}"
- type: io.kestra.plugin.jdbc.snowflake.Query
values:
url: jdbc:snowflake://your_account_id.snowflakecomputing.com?warehouse=DEMO
username: your_username
password: "{{ secret('SNOWFLAKE_PASSWORD') }}"
triggers:
- id: wait
type: io.kestra.plugin.jdbc.snowflake.Trigger
sql: SELECT * FROM KESTRA.PUBLIC.EMPLOYEES WHERE START_DATE = CURRENT_DATE() and
UPDATE_TIMESTAMP IS NULL;
interval: PT1M
fetchType: FETCH
About this blueprint
Trigger Snowflake Data
This workflow will be executed only when the query returns any results. The flow checks if there are any new rows that match a specific criteria (e.g. you may query a table storing new events that haven't been processed yet). If so, the flow performs some actions as defined in the tasks e.g.:
- loading that data to another table - sending a message about the new event - triggering some automated process - updating or deleting the event from that original table to avoid that it will trigger another (duplicated) run. Make sure that you delete, update or move the rows that triggered the flow. This avoids duplicated runs. Here, we simply use the same WHERE clause in the SQL Event Trigger and in the UPDATE/DELETE statement:
- Trigger:
SELECT * FROM KESTRA.PUBLIC.EMPLOYEES WHERE START_DATE = CURRENT_DATE()
- Update:
UPDATE KESTRA.PUBLIC.EMPLOYEES SET UPDATE_TIMESTAMP = SYSDATE() WHERE START_DATE = CURRENT_DATE();
- Delete:
DELETE FROM KESTRA.PUBLIC.EMPLOYEES WHERE START_DATE = CURRENT_DATE()