Blueprints

Snowflake ETL: load files to internal stage, copy from stage to a table and run analytical SQL queries

Source

yaml
id: snowflake
namespace: company.team

tasks:
  - id: create_database
    type: io.kestra.plugin.jdbc.snowflake.Query
    sql: CREATE OR REPLACE DATABASE kestra;

  - id: create_table
    type: io.kestra.plugin.jdbc.snowflake.Query
    sql: |
      CREATE OR REPLACE TABLE KESTRA.PUBLIC.EMPLOYEES (
        first_name STRING ,
        last_name STRING ,
        email STRING ,
        streetaddress STRING ,
        city STRING ,
        start_date DATE
        );

  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: https://huggingface.co/datasets/kestra/datasets/raw/main/employees/employees00.csv

  - id: load_to_internal_stage
    type: io.kestra.plugin.jdbc.snowflake.Upload
    from: "{{ outputs.extract.uri }}"
    fileName: employees00.csv
    prefix: raw
    stageName: "@kestra.public.%employees"
    compress: true

  - id: load_from_stage_to_table
    type: io.kestra.plugin.jdbc.snowflake.Query
    sql: >
      COPY INTO KESTRA.PUBLIC.EMPLOYEES FROM @kestra.public.%employees
      FILE_FORMAT = (type = csv field_optionally_enclosed_by='"' skip_header =
      1) PATTERN = '.*employees0[0-9].csv.gz' ON_ERROR = 'skip_file';

  - id: analyze
    type: io.kestra.plugin.jdbc.snowflake.Query
    description: Growth of new hires per month
    sql: >
      SELECT year(START_DATE) as year, monthname(START_DATE) as month, count(*)
      as nr_employees FROM kestra.public.EMPLOYEES GROUP BY year(START_DATE),
      monthname(START_DATE) ORDER BY nr_employees desc;
    fetchType: STORE

  - id: csv_report
    type: io.kestra.plugin.serdes.csv.IonToCsv
    from: "{{ outputs.analyze.uri }}"

pluginDefaults:
  - type: io.kestra.plugin.jdbc.snowflake.Query
    values:
      url: jdbc:snowflake://accountID.snowflakecomputing.com?warehouse=COMPUTE_WH
      username: yourSnowflakeUser
      password: "{{ secret('SNOWFLAKE_PASSWORD') }}"

  - type: io.kestra.plugin.jdbc.snowflake.Upload
    values:
      url: jdbc:snowflake://accountID.snowflakecomputing.com?warehouse=COMPUTE_WH
      username: yourSnowflakeUser
      password: "{{ secret('SNOWFLAKE_PASSWORD') }}"

About this blueprint

Ingest Snowflake Data SQL

This flow is an end-to-end tutorial for Snowflake. It creates a new database and a table. It then extracts data from an external source, and loads that data as a CSV file into Snowflake's internal stage. The CSV file uploaded to stage is then loaded into the table. Finally, we do some analytics by aggregating (imaginary) new hires at Kestra over time. The final result is fetched into the Kestra's internal storage and converted to a CSV file that you can download from the Outputs tab on the Execution's page.

Query

Download

Upload

Ion To Csv

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra