Data storage and processing
Manage data processed by tasks.
Kestra's primary purpose is to orchestrate data processing via tasks, so data is central to each flow's execution.
Depending on the task, data can be stored inside the execution context or inside Kestra's internal storage. You can also manually store data inside Kestra's KV store by using dedicated tasks.
Some tasks give you the choice of where you want to store the data, usually using a fetchType
property or the three fetch
/fetchOne
/store
properties.
For example, using the DynamoDB Query task:
id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: FETCH
The fetchType
property can have four values:
FETCH_ONE
: will fetch the first row and set it in a task output attribute (therow
attribute for DynamoDB); the data will be stored inside the execution context.FETCH
: will fetch all rows and set them in a task output attribute (therows
attribute for DynamoDB); the data will be stored inside the execution context.STORE
: will store all rows inside Kestra's internal storage. The internal storage will return a URI usually set in the task output attributeuri
and that can be used to retrieve the file from the internal storage.NONE
: will do nothing.
The three fetch
/fetchOne
/store
properties will do the same but using three different task properties instead of a single one.
Storing data
Storing data inside the flow execution context
Data can be stored as variables inside the flow execution context. This can be convenient for sharing data between tasks.
To do so, tasks store data as output attributes that are then available inside the flow via Pebble expressions like {{outputs.taskName.attributeName}}
.
Be careful that when the size of the data is significant, this will increase the size of the flow execution context, which can lead to slow execution and increase the size of the execution storage inside Kestra's repository.
Depending on the Kestra internal queue and repository implementation, there can be a hard limit on the size of the flow execution context as it is stored as a single row/message. Usually, this limit is around 1MB, so this is important to avoid storing large amounts of data inside the flow execution context.
Storing data inside the internal storage
Kestra has an internal storage that can store data of any size. By default, the internal storage uses the host filesystem, but plugins exist to use other implementations like Amazon S3, Google Cloud Storage, or Microsoft Azure Blobs storage. See internal storage configuration.
When using the internal storage, data is, by default, stored using Amazon Ion format.
Tasks that can store data inside the internal storage usually have an output attribute named uri
that can be used to access this file in following tasks.
The following example uses the DynamoDB Query task to query a table and the FTP Upload task to send the retrieved rows to an external FTP server.
tasks:
- id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: STORE
- id: upload
type: io.kestra.plugin.fs.ftp.Upload
host: localhost
port: 80
from: "{{ outputs.query.uri }}"
to: "/upload/file.ion"
If you need to access data from the internal storage, you can use the read()
function to read the file's content as a string.
Dedicated tasks allow managing the files stored inside the internal storage:
- Concat: concat multiple files.
- Delete: delete a file.
- Size: get the size of a file.
- Split: split a file into multiple files depending on the size of the file or the number of rows.
This should be the main method for storing and carrying large data from task to task. As an example, if you know that a HTTP Request will return a heavy payload, you should consider using HTTP Download along with a Serdes instead of carrying raw data in Flow Execution Context
Storing data inside the KV store
Dedicated tasks can store data inside Kestra's KV store. The KV store transparently uses Kestra's internal storage as its backend store.
The KV store allows storing data that will be shared by all executions of the same namespace. You can think of it as a key/value store dedicated to a namespace.
The following tasks are available:
Example:
tasks:
- id: set_data
type: io.kestra.plugin.core.kv.Set
key: name
value: John Doe
- id: get_data
type: io.kestra.plugin.core.kv.Get
key: name
In the next example, the flow will Set
, Get
and Delete
the data:
Processing data
For basic data processing, you can leverage Kestra's Pebble templating engine.
For more complex data transformations, Kestra offers various data processing plugins incl. transform tasks or custom scripts.
Converting files
Files from the internal storage can be converted from/to the Ion format to/from another format using the Serdes plugin.
The following formats are currently available: Avro, JSON, XML, and Parquet.
Each format offers a reader to read an Ion serialized data file and write it in the target format and a writer to read a file in a specific format and write it as an Ion serialized data file.
For example, to convert an Ion file to CSV, then back to Ion:
tasks:
- id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: STORE
- id: convertToCsv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{outputs.query.uri}}"
- id: convertBackToIon
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{outputs.convertToCsv.uri}}""
Processing data using scripts
Kestra can launch scripts written in Python, R, Node.js, Shell and Powershell. Depending on the runner
, they can run directly in a local process on the host or inside Docker containers.
Those script tasks are available in the Scripts Plugin. Here is documentation for each of them:
- The Python task will run a Python script in a Docker container or in a local process.
- The Node task will run a Node.js script in a Docker container or in a local process.
- The R task will run an R script in a Docker container or in a local process.
- The Shell task will execute a single Shell command, or a list of commands that you provide.
- The PowerShell task will execute a single PowerShell command, or a list of commands that you provide.
The following example will query the BigQuery public dataset with Wikipedia page views to find the top 10 pages, convert it to CSV, and use the CSV file inside a Python task for further transformations using Pandas.
id: wikipedia-top-ten-python-panda
namespace: company.team
description: analyze top 10 Wikipedia pages
tasks:
- id: query
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = 'en'
ORDER BY datehour desc, views desc
LIMIT 10
store: true
projectId: geller
serviceAccount: "{{envs.gcp_creds}}"
- id: write-csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{outputs.query.uri}}"
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
inputFiles:
data.csv: "{{outputs['write-csv'].uri}}"
tasks:
- id: pandas
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import pandas as pd
from kestra import Kestra
df = pd.read_csv("data.csv")
views = df['views'].sum()
Kestra.outputs({'views': int(views)})
Kestra offers several plugins for ingesting and transforming data — check the Plugin list for more details.
Make sure to also check:
- The Script documentation for a detailed overview of how to work with Python, R, Node.js, Shell and Powershell scripts and how to integrate them with Git and Docker.
- The Blueprints catalog — simply search for the relevant language (e.g. Python, R, Rust) or use case (ETL, Git, dbt, etc.) to find the relevant examples.
Processing data using file transform
Kestra can process data row by row using file transform tasks. The transformation will be done with a small script written in Python, JavaScript, or Groovy.
- The Jython FileTransform task allows transforming rows with Python.
- The Nashorn FileTransform task allows transforming rows with JavaScript.
- The Groovy FileTransform task allows transforming rows with Groovy.
The following example will query the BigQuery public dataset for Wikipedia pages, convert it row by row with the Nashorn FileTransform, and write it in a CSV file.
id: wikipedia-top-ten-file-transform
namespace: company.team
description: A flow that loads wikipedia top 10 EN pages
tasks:
- id: query-top-ten
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = 'en'
ORDER BY datehour desc, views desc
LIMIT 10
store: true
- id: file-transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{outputs['query-top-ten'].uri}}"
script: |
logger.info('row: {}', row)
if (row['title'] === 'Main_Page' || row['title'] === 'Special:Search' || row['title'] === '-') {
// remove un-needed row
row = null
} else {
// add a 'time' column
row['time'] = String(row['date']).substring(11)
// modify the 'date' column to only keep the date part
row['date'] = String(row['date']).substring(0, 10)
}
- id: write-csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{outputs['file-transform'].uri}}"
The script can access a logger to log messages. Each row is available in a row
variable where each column is accessible using the dictionary notation row['columnName']
.
Purging data
The PurgeExecution task can purge all the files stored inside the internal context by a flow execution. It can be used at the end of a flow to purge all its generated files.
tasks:
- id: "purge-execution"
type: "io.kestra.plugin.core.storage.PurgeExecution"
The execution context itself will not be available after the end of the execution and will be automatically deleted from Kestra's repository after a retention period (by default, seven days) that can be changed; see configurations.
Also, the Purge task can be used to purge storages, logs, executions of previous execution. For example, this flow will purge all of these every day:
id: purge
namespace: company.team
tasks:
- id: "purge"
type: "io.kestra.plugin.core.storage.Purge"
endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"
FAQ
Internal Storage FAQ
How to read a file from the internal storage as a string?
The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI. Note that when using inputs, outputs or trigger variables, you don't need any extra quotation marks. Here is how you can use such variables along with the 'read' function:
{{ read(inputs.file) }}
for a FILE-type input variable namedfile
{{ read(outputs.mytaskid.uri) }}
for an outputuri
from a task namedmytaskid
{{ read(trigger.uri) }}
for auri
of many triggers incl. Kafka, AWS SQS, GCP PubSub, etc.{{ read(trigger.objects | jq('.[].uri')) }}
for auri
of a trigger that returns a list of detected objects, e.g. AWS S3, GCP GCS, etc.
Note that the read function can only read files within the same execution. If you try to read a file from a previous execution, you will get an Unauthorized error.
How to read a Namespace File as a string?
So far, you've seen how to read a file from the internal storage as a string. However, you can use the same read()
function to read a Namespace File as a string. This is especially useful when you want to execute a Python script or a long SQL query stored in a dedicated SQL file.
The read()
function takes the absolute path to the file you want to read. The path must point to a file stored in the same namespace as the flow you are executing.
Here is a simple example showing how you can read a file named hello.py
stored in the scripts
directory of the company.team
namespace:
id: hello
namespace: company.team
tasks:
- id: my_python_script
type: io.kestra.plugin.scripts.python.Script
script: "{{ read('scripts/hello.py') }}"
The same syntax applies to SQL queries, custom scripts, and many more. Check the Namespace Files documentation for more details.
How to read a file from the internal storage as a JSON object?
You can use the Pebble function {{ fromJson(myvar) }}
and a {{ myvar | toJson }}
filter to process JSON data.
Was this page helpful?