ForEachItem​For​Each​Item

yaml
type: "io.kestra.plugin.core.flow.ForEachItem"

Execute a subflow for each batch of items

The items value must be Kestra's internal storage URI e.g. an output file from a previous task, or a file from inputs of FILE type. Two special variables are available to pass as inputs to the subflow:

  • taskrun.items which is the URI of internal storage file containing the batch of items to process
  • taskrun.iteration which is the iteration or batch number

Examples

Execute a subflow for each batch of items. The subflow orders is called from the parent flow orders_parallel using the ForEachItem task in order to start one subflow execution for each batch of items.

yaml
id: orders
namespace: company.team

inputs:
  - id: order
    type: STRING

tasks:
  - id: read_file
    type: io.kestra.plugin.scripts.shell.Commands
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - cat "{{ inputs.order }}"

  - id: read_file_content
    type: io.kestra.plugin.core.log.Log
    message: "{{ read(inputs.order) }}"
yaml
id: orders_parallel
namespace: company.team

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
    store: true

  - id: each
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.extract.uri }}"
    batch:
      rows: 1
    namespace: company.team
    flowId: orders
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    inputs:
      order: "{{ taskrun.items }}" # special variable that contains the items of the batch

Execute a subflow for each JSON item fetched from a REST API. The subflow mysubflow is called from the parent flow iterate_over_json using the ForEachItem task; this creates one subflow execution for each JSON object.

Note how we first need to convert the JSON array to JSON-L format using the JsonWriter task. This is because the items attribute of the ForEachItem task expects a file where each line represents a single item. Suitable file types include Amazon ION (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For other formats, you can use the conversion tasks available in the io.kestra.plugin.serdes module.

In this example, the subflow mysubflow expects a JSON object as input. The JsonReader task first reads the JSON array from the REST API and converts it to ION. Then, the JsonWriter task converts that ION file to JSON-L format, suitable for the ForEachItem task.

yaml
id: mysubflow
namespace: company.team

inputs:
  - id: json
    type: JSON

tasks:
  - id: debug
    type: io.kestra.plugin.core.log.Log
    message: "{{ inputs.json }}"
yaml
id: iterate_over_json
namespace: company.team

tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: "https://api.restful-api.dev/objects"
    contentType: application/json
    method: GET
    failOnEmptyResponse: true
    timeout: PT15S

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.download.uri }}"
    newLine: false # regular json

  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 1
    namespace: company.team
    flowId: mysubflow
    wait: true
    transmitFailed: true
    inputs:
      json: "{{ json(read(taskrun.items)) }}"

This example shows how to use the combination of ForEach and ForEachItem tasks to process files from an S3 bucket. The ForEach iterates over files from the S3 trigger, and the ForEachItem task is used to split each file into batches. The process_batch subflow is then called with the data input parameter set to the URI of the batch to process.

yaml
id: process_batch
namespace: company.team

inputs:
  - id: data
    type: FILE

tasks:
  - id: debug
    type: io.kestra.plugin.core.log.Log
    message: "{{ read(inputs.data) }}"
yaml
id: process_files
namespace: company.team

tasks:
  - id: loop_over_files
    type: io.kestra.plugin.core.flow.ForEach
    values: "{{ trigger.objects | jq('.[].uri') }}"
    tasks:
      - id: subflow_per_batch
        type: io.kestra.plugin.core.flow.ForEachItem
        items: "{{ trigger.uris[parent.taskrun.value] }}"
        batch:
          rows: 1
        flowId: process_batch
        namespace: company.team
        wait: true
        transmitFailed: true
        inputs:
          data: "{{ taskrun.items }}"

triggers:
  - id: s3
    type: io.kestra.plugin.aws.s3.Trigger
    interval: "PT1S"
    accessKeyId: "<access-key>"
    secretKeyId: "<secret-key>"
    region: "us-east-1"
    bucket: "my_bucket"
    prefix: "sub-dir"
    action: NONE

Properties

batch

  • Type: ForEachItem-Batch
  • Dynamic:
  • Required: ✔️
  • Default: { "rows": "1", "separator": "\n" }

How to split the items into batches.

flowId

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

The identifier of the subflow to be executed

items

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as , or a FILE type input parameter, like . This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the io.kestra.plugin.serdes module, which will transform files from their original format to ION.

namespace

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

The namespace of the subflow to be executed

errors

  • Type: array
  • SubType: Task
  • Dynamic:
  • Required:

List of tasks to run if any tasks failed on this FlowableTask.

inheritLabels

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: false

Whether the subflow should inherit labels from this execution that triggered it.

By default, labels are not passed to the subflow execution. If you set this option to true, the child flow execution will inherit all labels from the parent execution.

inputs

  • Type: object
  • Dynamic: ✔️
  • Required:

The inputs to pass to the subflow to be executed

labels

  • Type:
    • array
    • object
  • Dynamic: ✔️
  • Required:

The labels to pass to the subflow to be executed.

revision

  • Type: integer
  • Dynamic:
  • Required:

The revision of the subflow to be executed

By default, the last, i.e. the most recent, revision of the subflow is executed.

scheduleDate

  • Type:
    • string
    • string
  • Dynamic: ✔️
  • Required:

transmitFailed

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: true

Whether to fail the current execution if the subflow execution fails or is killed.

Note that this option works only if wait is set to true.

wait

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: true

Whether to wait for the subflows execution to finish before continuing the current execution.

Definitions

io.kestra.plugin.core.flow.ForEachItem-Batch

  • bytes
    • Type: string
    • Dynamic: ✔️
    • Required:
  • partitions
    • Type:
      • integer
      • string
    • Dynamic: ✔️
    • Required:
  • rows
    • Type:
      • integer
      • string
    • Dynamic: ✔️
    • Required:
  • separator
    • Type: string
    • Dynamic: ✔️
    • Required:
    • Default: \n