type: "io.kestra.plugin.core.flow.ForEachItem"
Execute a subflow for each batch of items
The items
value must be Kestra's internal storage URI e.g. an output file from a previous task, or a file from inputs of FILE type.
Two special variables are available to pass as inputs to the subflow:
taskrun.items
which is the URI of internal storage file containing the batch of items to processtaskrun.iteration
which is the iteration or batch number
Examples
Execute a subflow for each batch of items. The subflow orders
is called from the parent flow orders_parallel
using the ForEachItem
task in order to start one subflow execution for each batch of items.
id: orders
namespace: company.team
inputs:
- id: order
type: STRING
tasks:
- id: read_file
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- cat "{{ inputs.order }}"
- id: read_file_content
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.order) }}"
id: orders_parallel
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT *
FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
store: true
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.extract.uri }}"
batch:
rows: 1
namespace: company.team
flowId: orders
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
inputs:
order: "{{ taskrun.items }}" # special variable that contains the items of the batch
Execute a subflow for each JSON item fetched from a REST API. The subflow mysubflow
is called from the parent flow iterate_over_json
using the ForEachItem
task; this creates one subflow execution for each JSON object.
Note how we first need to convert the JSON array to JSON-L format using the JsonWriter
task. This is because the items
attribute of the ForEachItem
task expects a file where each line represents a single item. Suitable file types include Amazon ION (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For other formats, you can use the conversion tasks available in the io.kestra.plugin.serdes
module.
In this example, the subflow mysubflow
expects a JSON object as input. The JsonReader
task first reads the JSON array from the REST API and converts it to ION. Then, the JsonWriter
task converts that ION file to JSON-L format, suitable for the ForEachItem
task.
id: mysubflow
namespace: company.team
inputs:
- id: json
type: JSON
tasks:
- id: debug
type: io.kestra.plugin.core.log.Log
message: "{{ inputs.json }}"
id: iterate_over_json
namespace: company.team
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: "https://api.restful-api.dev/objects"
contentType: application/json
method: GET
failOnEmptyResponse: true
timeout: PT15S
- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.download.uri }}"
newLine: false # regular json
- id: ion_to_jsonl
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.json_to_ion.uri }}"
newLine: true # JSON-L
- id: for_each_item
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.ion_to_jsonl.uri }}"
batch:
rows: 1
namespace: company.team
flowId: mysubflow
wait: true
transmitFailed: true
inputs:
json: "{{ json(read(taskrun.items)) }}"
This example shows how to use the combination of ForEach
and ForEachItem
tasks to process files from an S3 bucket. The ForEach
iterates over files from the S3 trigger, and the ForEachItem
task is used to split each file into batches. The process_batch
subflow is then called with the data
input parameter set to the URI of the batch to process.
id: process_batch
namespace: company.team
inputs:
- id: data
type: FILE
tasks:
- id: debug
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.data) }}"
id: process_files
namespace: company.team
tasks:
- id: loop_over_files
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.objects | jq('.[].uri') }}"
tasks:
- id: subflow_per_batch
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ trigger.uris[parent.taskrun.value] }}"
batch:
rows: 1
flowId: process_batch
namespace: company.team
wait: true
transmitFailed: true
inputs:
data: "{{ taskrun.items }}"
triggers:
- id: s3
type: io.kestra.plugin.aws.s3.Trigger
interval: "PT1S"
accessKeyId: "<access-key>"
secretKeyId: "<secret-key>"
region: "us-east-1"
bucket: "my_bucket"
prefix: "sub-dir"
action: NONE
Properties
batch
- Type: ForEachItem-Batch
- Dynamic: ❌
- Required: ✔️
- Default:
{ "rows": "1", "separator": "\n" }
How to split the items into batches.
flowId
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
The identifier of the subflow to be executed
items
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as , or a FILE type input parameter, like
. This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the
io.kestra.plugin.serdes
module, which will transform files from their original format to ION.
namespace
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
The namespace of the subflow to be executed
errors
- Type: array
- SubType: Task
- Dynamic: ❌
- Required: ❌
List of tasks to run if any tasks failed on this FlowableTask.
inheritLabels
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Whether the subflow should inherit labels from this execution that triggered it.
By default, labels are not passed to the subflow execution. If you set this option to
true
, the child flow execution will inherit all labels from the parent execution.
inputs
- Type: object
- Dynamic: ✔️
- Required: ❌
The inputs to pass to the subflow to be executed
labels
- Type:
- array
- object
- Dynamic: ✔️
- Required: ❌
The labels to pass to the subflow to be executed.
revision
- Type: integer
- Dynamic: ❌
- Required: ❌
The revision of the subflow to be executed
By default, the last, i.e. the most recent, revision of the subflow is executed.
scheduleDate
- Type:
- string
- string
- Dynamic: ✔️
- Required: ❌
transmitFailed
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
true
Whether to fail the current execution if the subflow execution fails or is killed.
Note that this option works only if
wait
is set totrue
.
wait
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
true
Whether to wait for the subflows execution to finish before continuing the current execution.
Definitions
io.kestra.plugin.core.flow.ForEachItem-Batch
bytes
- Type: string
- Dynamic: ✔️
- Required: ❌
partitions
- Type:
- integer
- string
- Dynamic: ✔️
- Required: ❌
- Type:
rows
- Type:
- integer
- string
- Dynamic: ✔️
- Required: ❌
- Type:
separator
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
\n