Data storage and processing​Data storage and processing

Manage data processed by tasks.

Kestra's primary purpose is to orchestrate data processing via tasks, so data is central to each flow's execution.

Depending on the task, data can be stored inside the execution context or inside Kestra's internal storage. You can also manually store data inside Kestra's KV store by using dedicated tasks.

Some tasks give you the choice of where you want to store the data, usually using a fetchType property or the three fetch/fetchOne/store properties.

For example, using the DynamoDB Query task:

yaml
id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
  :id: "1"
fetchType: FETCH

The fetchType property can have four values:

  • FETCH_ONE: will fetch the first row and set it in a task output attribute (the row attribute for DynamoDB); the data will be stored inside the execution context.
  • FETCH: will fetch all rows and set them in a task output attribute (the rows attribute for DynamoDB); the data will be stored inside the execution context.
  • STORE: will store all rows inside Kestra's internal storage. The internal storage will return a URI usually set in the task output attribute uri and that can be used to retrieve the file from the internal storage.
  • NONE: will do nothing.

The three fetch/fetchOne/store properties will do the same but using three different task properties instead of a single one.

Storing data

Storing data inside the flow execution context

Data can be stored as variables inside the flow execution context. This can be convenient for sharing data between tasks.

To do so, tasks store data as output attributes that are then available inside the flow via Pebble expressions like {{outputs.taskName.attributeName}}.

Be careful that when the size of the data is significant, this will increase the size of the flow execution context, which can lead to slow execution and increase the size of the execution storage inside Kestra's repository.

Storing data inside the internal storage

Kestra has an internal storage that can store data of any size. By default, the internal storage uses the host filesystem, but plugins exist to use other implementations like Amazon S3, Google Cloud Storage, or Microsoft Azure Blobs storage. See internal storage configuration.

When using the internal storage, data is, by default, stored using Amazon Ion format.

Tasks that can store data inside the internal storage usually have an output attribute named uri that can be used to access this file in following tasks.

The following example uses the DynamoDB Query task to query a table and the FTP Upload task to send the retrieved rows to an external FTP server.

yaml
tasks:
- id: query
  type: io.kestra.plugin.aws.dynamodb.Query
  tableName: persons
  keyConditionExpression: id = :id
  expressionAttributeValues:
    :id: "1"
  fetchType: STORE

- id: upload
  type: io.kestra.plugin.fs.ftp.Upload
  host: localhost
  port: 80
  from: "{{ outputs.query.uri }}"
  to: "/upload/file.ion"

If you need to access data from the internal storage, you can use the read() function to read the file's content as a string.

Dedicated tasks allow managing the files stored inside the internal storage:

  • Concat: concat multiple files.
  • Delete: delete a file.
  • Size: get the size of a file.
  • Split: split a file into multiple files depending on the size of the file or the number of rows.

Storing data inside the KV store

Dedicated tasks can store data inside Kestra's KV store. The KV store transparently uses Kestra's internal storage as its backend store.

The KV store allows storing data that will be shared by all executions of the same namespace. You can think of it as a key/value store dedicated to a namespace.

The following tasks are available:

  • Set: set data in key/value pair.
  • Get: get data from key/value pair.
  • Delete: delete a key/value pair.

Example:

yaml
tasks:
- id: set_data
  type: io.kestra.plugin.core.kv.Set
  key: name
  value: John Doe

- id: get_data
  type: io.kestra.plugin.core.kv.Get
  key: name

In the next example, the flow will Set, Get and Delete the data:

Example Flow

Processing data

For basic data processing, you can leverage Kestra's Pebble templating engine.

For more complex data transformations, Kestra offers various data processing plugins incl. transform tasks or custom scripts.

Converting files

Files from the internal storage can be converted from/to the Ion format to/from another format using the Serdes plugin.

The following formats are currently available: Avro, JSON, XML, and Parquet.

Each format offers a reader to read an Ion serialized data file and write it in the target format and a writer to read a file in a specific format and write it as an Ion serialized data file.

For example, to convert an Ion file to CSV, then back to Ion:

yaml
tasks:
- id: query
  type: io.kestra.plugin.aws.dynamodb.Query
  tableName: persons
  keyConditionExpression: id = :id
  expressionAttributeValues:
    :id: "1"
  fetchType: STORE

- id: convertToCsv
  type: io.kestra.plugin.serdes.csv.IonToCsv
  from: "{{outputs.query.uri}}"

- id: convertBackToIon
  type: io.kestra.plugin.serdes.csv.CsvToIon
  from: "{{outputs.convertToCsv.uri}}""

Processing data using scripts

Kestra can launch scripts written in Python, R, Node.js, Shell and Powershell. Depending on the runner, they can run directly in a local process on the host or inside Docker containers.

Those script tasks are available in the Scripts Plugin. Here is documentation for each of them:

  • The Python task will run a Python script in a Docker container or in a local process.
  • The Node task will run a Node.js script in a Docker container or in a local process.
  • The R task will run an R script in a Docker container or in a local process.
  • The Shell task will execute a single Shell command, or a list of commands that you provide.
  • The PowerShell task will execute a single PowerShell command, or a list of commands that you provide.

The following example will query the BigQuery public dataset with Wikipedia page views to find the top 10 pages, convert it to CSV, and use the CSV file inside a Python task for further transformations using Pandas.

yaml
id: wikipedia-top-ten-python-panda
namespace: company.team
description: analyze top 10 Wikipedia pages

tasks:
  - id: query
    type: io.kestra.plugin.gcp.bigquery.Query
    sql: |
      SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
      WHERE DATE(datehour) = current_date() and wiki = 'en'
      ORDER BY datehour desc, views desc
      LIMIT 10
    store: true
    projectId: geller
    serviceAccount: "{{envs.gcp_creds}}"

  - id: write-csv
    type: io.kestra.plugin.serdes.csv.IonToCsv
    from: "{{outputs.query.uri}}"

  - id: wdir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    inputFiles:
      data.csv: "{{outputs['write-csv'].uri}}"
    tasks:
    - id: pandas
      type: io.kestra.plugin.scripts.python.Script
      warningOnStdErr: false
      containerImage: ghcr.io/kestra-io/pydata:latest
      script: |
        import pandas as pd
        from kestra import Kestra

        df = pd.read_csv("data.csv")
        views = df['views'].sum()
        Kestra.outputs({'views': int(views)})

Kestra offers several plugins for ingesting and transforming data — check the Plugin list for more details.

Make sure to also check:

  1. The Script documentation for a detailed overview of how to work with Python, R, Node.js, Shell and Powershell scripts and how to integrate them with Git and Docker.
  2. The Blueprints catalog — simply search for the relevant language (e.g. Python, R, Rust) or use case (ETL, Git, dbt, etc.) to find the relevant examples.

Processing data using file transform

Kestra can process data row by row using file transform tasks. The transformation will be done with a small script written in Python, JavaScript, or Groovy.

The following example will query the BigQuery public dataset for Wikipedia pages, convert it row by row with the Nashorn FileTransform, and write it in a CSV file.

yaml
id: wikipedia-top-ten-file-transform
namespace: company.team
description: A flow that loads wikipedia top 10 EN pages
tasks:
  - id: query-top-ten
    type: io.kestra.plugin.gcp.bigquery.Query
    sql: |
      SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
      WHERE DATE(datehour) = current_date() and wiki = 'en'
      ORDER BY datehour desc, views desc
      LIMIT 10
    store: true

  - id: file-transform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{outputs['query-top-ten'].uri}}"
    script: |
      logger.info('row: {}', row)

      if (row['title'] === 'Main_Page' || row['title'] === 'Special:Search' || row['title'] === '-') {
        // remove un-needed row
        row = null
      } else {
        // add a 'time' column
        row['time'] = String(row['date']).substring(11)
        // modify the 'date' column to only keep the date part
        row['date'] = String(row['date']).substring(0, 10)
      }

  - id: write-csv
    type: io.kestra.plugin.serdes.csv.IonToCsv
    from: "{{outputs['file-transform'].uri}}"

Purging data

The PurgeExecution task can purge all the files stored inside the internal context by a flow execution. It can be used at the end of a flow to purge all its generated files.

yaml
tasks:
  - id: "purge-execution"
    type: "io.kestra.plugin.core.storage.PurgeExecution"

The execution context itself will not be available after the end of the execution and will be automatically deleted from Kestra's repository after a retention period (by default, seven days) that can be changed; see configurations.

Also, the Purge task can be used to purge storages, logs, executions of previous execution. For example, this flow will purge all of these every day:

yaml
id: purge
namespace: company.team

tasks:
  - id: "purge"
    type: "io.kestra.plugin.core.storage.Purge"
    endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 0 * * *"

FAQ

Internal Storage FAQ

How to read a file from the internal storage as a string?

The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI. Note that when using inputs, outputs or trigger variables, you don't need any extra quotation marks. Here is how you can use such variables along with the 'read' function:

  • {{ read(inputs.file) }} for a FILE-type input variable named file
  • {{ read(outputs.mytaskid.uri) }} for an output uri from a task named mytaskid
  • {{ read(trigger.uri) }} for a uri of many triggers incl. Kafka, AWS SQS, GCP PubSub, etc.
  • {{ read(trigger.objects | jq('.[].uri')) }} for a uri of a trigger that returns a list of detected objects, e.g. AWS S3, GCP GCS, etc.

Note that the read function can only read files within the same execution. If you try to read a file from a previous execution, you will get an Unauthorized error.

Example using a FILE-type inputs variable
Example with the ForEachItem task reading file's content as a string

How to read a Namespace File as a string?

So far, you've seen how to read a file from the internal storage as a string. However, you can use the same read() function to read a Namespace File as a string. This is especially useful when you want to execute a Python script or a long SQL query stored in a dedicated SQL file.

The read() function takes the absolute path to the file you want to read. The path must point to a file stored in the same namespace as the flow you are executing.

Here is a simple example showing how you can read a file named hello.py stored in the scripts directory of the company.team namespace:

yaml
id: hello
namespace: company.team

tasks:
  - id: my_python_script
    type: io.kestra.plugin.scripts.python.Script
    script: "{{ read('scripts/hello.py') }}"

The same syntax applies to SQL queries, custom scripts, and many more. Check the Namespace Files documentation for more details.


How to read a file from the internal storage as a JSON object?

You can use the Pebble function {{ fromJson(myvar) }} and a {{ myvar | toJson }} filter to process JSON data.

The fromJson() function
The json filter

Was this page helpful?