Source
yaml
id: sensitive-data
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: transform
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
data.csv: "{{ outputs.extract.uri }}"
sql: >
CREATE TABLE orders_pii AS
SELECT order_id,
hash(customer_name) as customer_name_hash,
md5(customer_email) as customer_email_hash,
product_id,
price,
quantity,
total
FROM read_csv_auto('{{ workingDir }}/data.csv');
COPY (SELECT * FROM orders_pii) TO '{{ outputFiles.csv }}' (HEADER,
DELIMITER ',');
outputFiles:
- "*.csv"
- id: load
type: io.kestra.plugin.gcp.bigquery.Load
from: "{{ outputs.transform.outputFiles.csv }}"
serviceAccount: "{{ secret('GCP_CREDS') }}"
projectId: yourGcpProject
destinationTable: yourGcpProject.stage.orders_pii
format: CSV
autodetect: true
csvOptions:
fieldDelimiter: ","
About this blueprint
BigQuery Data SQL DuckDB
This flow has three tasks: extract
, transform
and load
.
- The
extract
task here is a simple HTTP Download task, but you can replace it with any task or custom script that extracts data. - The
transform
task reads the output of theextract
task and transforms it using DuckDB. Specifically, it hashes sensitive values so that they are masked before being loaded to the final destination. - The
load
task loads that extracted and transformed data to BigQuery. If you use MotherDuck, use Kestra Secret to store the MotherDuck service token. Then, add theurl
property to point the task to your MotherDuck database.
yaml
- id: transform
type: io.kestra.plugin.jdbc.duckdb.Query
url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"