Blueprints
Snowflake ETL: load files to internal stage, copy from stage to a table and run analytical SQL queries
Source
yaml
id: snowflake
namespace: company.team
tasks:
- id: create_database
type: io.kestra.plugin.jdbc.snowflake.Query
sql: CREATE OR REPLACE DATABASE kestra;
- id: create_table
type: io.kestra.plugin.jdbc.snowflake.Query
sql: |
CREATE OR REPLACE TABLE KESTRA.PUBLIC.EMPLOYEES (
first_name STRING ,
last_name STRING ,
email STRING ,
streetaddress STRING ,
city STRING ,
start_date DATE
);
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/employees/employees00.csv
- id: load_to_internal_stage
type: io.kestra.plugin.jdbc.snowflake.Upload
from: "{{ outputs.extract.uri }}"
fileName: employees00.csv
prefix: raw
stageName: "@kestra.public.%employees"
compress: true
- id: load_from_stage_to_table
type: io.kestra.plugin.jdbc.snowflake.Query
sql: >
COPY INTO KESTRA.PUBLIC.EMPLOYEES FROM @kestra.public.%employees
FILE_FORMAT = (type = csv field_optionally_enclosed_by='"' skip_header =
1) PATTERN = '.*employees0[0-9].csv.gz' ON_ERROR = 'skip_file';
- id: analyze
type: io.kestra.plugin.jdbc.snowflake.Query
description: Growth of new hires per month
sql: >
SELECT year(START_DATE) as year, monthname(START_DATE) as month, count(*)
as nr_employees FROM kestra.public.EMPLOYEES GROUP BY year(START_DATE),
monthname(START_DATE) ORDER BY nr_employees desc;
fetchType: STORE
- id: csv_report
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.analyze.uri }}"
pluginDefaults:
- type: io.kestra.plugin.jdbc.snowflake.Query
values:
url: jdbc:snowflake://accountID.snowflakecomputing.com?warehouse=COMPUTE_WH
username: yourSnowflakeUser
password: "{{ secret('SNOWFLAKE_PASSWORD') }}"
- type: io.kestra.plugin.jdbc.snowflake.Upload
values:
url: jdbc:snowflake://accountID.snowflakecomputing.com?warehouse=COMPUTE_WH
username: yourSnowflakeUser
password: "{{ secret('SNOWFLAKE_PASSWORD') }}"
About this blueprint
Ingest Snowflake Data SQL
This flow is an end-to-end tutorial for Snowflake. It creates a new database and a table. It then extracts data from an external source, and loads that data as a CSV file into Snowflake's internal stage. The CSV file uploaded to stage is then loaded into the table. Finally, we do some analytics by aggregating (imaginary) new hires at Kestra over time. The final result is fetched into the Kestra's internal storage and converted to a CSV file that you can download from the Outputs tab on the Execution's page.