Blueprints

Event-driven workflow based on a Snowflake query condition

Source

yaml
id: snowflake-query-trigger
namespace: company.team

tasks:
  - id: each
    type: io.kestra.plugin.core.flow.ForEach
    values: "{{ trigger.rows }}"
    tasks:
      - id: automated_process
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - echo "{{ json(taskrun.value) }}"
          - echo "Welcome to Kestra {{ json(taskrun.value).FIRST_NAME }} {{
            json(taskrun.value).LAST_NAME }}"

  - id: update
    type: io.kestra.plugin.jdbc.snowflake.Query
    description: Update rows to avoid double trigger
    sql: |
      UPDATE KESTRA.PUBLIC.EMPLOYEES
      SET UPDATE_TIMESTAMP = SYSDATE()
      WHERE START_DATE = CURRENT_DATE();

pluginDefaults:
  - type: io.kestra.plugin.jdbc.snowflake.Trigger
    values:
      url: jdbc:snowflake://your_account_id.snowflakecomputing.com?warehouse=DEMO
      username: your_username
      password: "{{ secret('SNOWFLAKE_PASSWORD') }}"

  - type: io.kestra.plugin.jdbc.snowflake.Query
    values:
      url: jdbc:snowflake://your_account_id.snowflakecomputing.com?warehouse=DEMO
      username: your_username
      password: "{{ secret('SNOWFLAKE_PASSWORD') }}"

triggers:
  - id: wait
    type: io.kestra.plugin.jdbc.snowflake.Trigger
    sql: SELECT * FROM KESTRA.PUBLIC.EMPLOYEES WHERE START_DATE = CURRENT_DATE() and
      UPDATE_TIMESTAMP IS NULL;
    interval: PT1M
    fetchType: FETCH

About this blueprint

Trigger Snowflake Data

This workflow will be executed only when the query returns any results. The flow checks if there are any new rows that match a specific criteria (e.g. you may query a table storing new events that haven't been processed yet). If so, the flow performs some actions as defined in the tasks e.g.:

  • loading that data to another table - sending a message about the new event - triggering some automated process - updating or deleting the event from that original table to avoid that it will trigger another (duplicated) run. Make sure that you delete, update or move the rows that triggered the flow. This avoids duplicated runs. Here, we simply use the same WHERE clause in the SQL Event Trigger and in the UPDATE/DELETE statement:
  • Trigger: SELECT * FROM KESTRA.PUBLIC.EMPLOYEES WHERE START_DATE = CURRENT_DATE()
  • Update: UPDATE KESTRA.PUBLIC.EMPLOYEES SET UPDATE_TIMESTAMP = SYSDATE() WHERE START_DATE = CURRENT_DATE();
  • Delete: DELETE FROM KESTRA.PUBLIC.EMPLOYEES WHERE START_DATE = CURRENT_DATE()

For Each

Commands

Process

Query

Trigger

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra