Flowable Tasks
Control your orchestration logic.
Flowable tasks control the orchestration logic — run tasks or subflows in parallel, create loops and conditional branching.
Flowable Tasks don't run heavy operations — those are handled by workers.
Flowable Tasks are used for branching, grouping, running tasks in parallel, and more.
Flowable Tasks use expressions from the execution context to define the next tasks to run. For example, you can use the outputs of a previous task in a Switch
task to decide which task to run next.
Sequential
This task processes tasks one after another sequentially. It is used to group tasks.
id: sequential
namespace: company.team
tasks:
- id: sequential
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: 1st
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.id }}"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
You can access the output of a sibling task using the syntax {{ outputs.sibling.value }}
.
Parallel
This task processes tasks in parallel. It makes it convenient to process many tasks at once.
id: parallel
namespace: company.team
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: 1st
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.id }}"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
You cannot access the output of a sibling task as tasks will be run in parallel.
Switch
This task processes a set of tasks conditionally depending on a contextual variable's value.
In the following example, an input will be used to decide which task to run next.
id: switch
namespace: company.team
inputs:
- id: param
type: BOOLEAN
tasks:
- id: decision
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.param }}"
cases:
true:
- id: is_true
type: io.kestra.plugin.core.log.Log
message: "This is true"
false:
- id: is_false
type: io.kestra.plugin.core.log.Log
message: "This is false"
If
This task processes a set of tasks conditionally depending on a condition.
The condition must coerce to a boolean. Boolean coercion allows 0, -0, null and '' to coerce to false, all other values to coerce to true.
The else
branch is optional.
In the following example, an input will be used to decide which task to run next.
id: if_condition
namespace: company.team
inputs:
- id: param
type: BOOLEAN
tasks:
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{ inputs.param }}"
then:
- id: when_true
type: io.kestra.plugin.core.log.Log
message: "This is true"
else:
- id: when_false
type: io.kestra.plugin.core.log.Log
message: "This is false"
ForEach
This task will execute a group of tasks for each value in the list.
In the following example, the variable is static, but it can be generated from a previous task output and starts an arbitrary number of subtasks.
id: foreach_example
namespace: company.team
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: ["value 1", "value 2", "value 3"]
tasks:
- id: before_if
type: io.kestra.plugin.core.debug.Return
format: "Before if {{ taskrun.value }}"
- id: if
type: io.kestra.plugin.core.flow.If
condition: '{{ taskrun.value == "value 2" }}'
then:
- id: after_if
type: io.kestra.plugin.core.debug.Return
format: "After if {{ parent.taskrun.value }}"
You can access the output of a sibling task using the syntax {{ outputs.sibling[taskrun.value].value }}
.
This example shows how to run tasks in parallel for each value in the list. All child tasks of the parallel task will run in parallel. However, due to the concurrencyLimit
property set to 2, only two parallel task groups will run at any given time.
id: parallel_tasks_example
namespace: company.team
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
concurrencyLimit: 2
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: Processing {{ parent.taskrun.value }}
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
commands:
- sleep {{ parent.taskrun.value }}
For processing items, or forwarding processing to a subflow, ForEachItem is better suited.
ForEachItem
This task allows you to iterate over a list of items and run a subflow for each item, or for each batch containing multiple items.
Syntax:
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }}
inputs:
file: "{{ taskrun.items }}" # items of the batch
batch:
rows: 4
namespace: company.team
flowId: subflow
revision: 1 # optional (default: latest)
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
labels: # optional labels to pass to the subflow to be executed
key: value
This will execute the subflow company.team.subflow
for each batch of items.
To pass the batch of items to a subflow, you can use inputs. The example above uses an input of FILE
type called file
that takes the URI of an internal storage file containing the batch of items.
The next example shows you how you can access the outputs from each subflow executed. The ForEachItem automatically merges the URIs of the outputs from each subflow into a single file. The URI of this file is available through the subflowOutputs
output.
id: for_each_item
namespace: company.team
tasks:
- id: generate
type: io.kestra.plugin.scripts.shell.Script
script: |
for i in $(seq 1 10); do echo "$i" >> data; done
outputFiles:
- data
- id: for_each_item
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.generate.outputFiles.data }}"
batch:
rows: 4
wait: true
flowId: my_subflow
namespace: company.team
inputs:
value: "{{ taskrun.items }}"
- id: for_each_outputs
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.forEachItem_merge.subflowOutputs }}" # Log the URI of the file containing the URIs of the outputs from each subflow
ForEach
vs ForEachItem
Both ForEach
and ForEachItem
are similar, but there are specific use cases that suit one over the other:
ForEach
generates a lot of Task Runs which can impact performance.ForEachItem
generates separate executions using Subflows for the group of tasks. This scales better for larger datasets.
Read more about performance optimization here.
AllowFailure
This task will allow child tasks to fail. If any child task fails:
- The AllowFailure failed task will be marked as status
WARNING
. - All children's tasks inside the AllowFailure will be stopped immediately.
- The Execution will continue for all others tasks.
- At the end, the execution as a whole will also be marked as status
WARNING
.
In the following example:
allow_failure
will be labelled asWARNING
.ko
will be labelled asFAILED
.next
will not be run.end
will be run and labelledSUCCESS
.
id: each
namespace: company.team
tasks:
- id: allow_failure
type: io.kestra.plugin.core.flow.AllowFailure
tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail
- id: next
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: end
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
Fail
This task will fail the flow; it can be used with or without conditions.
Without conditions, it can be used, for example, to fail on some switch value.
id: fail_on_switch
namespace: company.team
inputs:
- id: param
type: STRING
required: true
tasks:
- id: switch
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.param }}"
cases:
case1:
- id: case1
type: io.kestra.plugin.core.log.Log
message: Case 1
case2:
- id: case2
type: io.kestra.plugin.core.log.Log
message: Case 2
notexist:
- id: fail
type: io.kestra.plugin.core.execution.Fail
default:
- id: default
type: io.kestra.plugin.core.log.Log
message: default
With conditions, it can be used, for example, to validate inputs.
id: fail_on_condition
namespace: company.team
inputs:
- id: param
type: STRING
required: true
tasks:
- id: before
type: io.kestra.plugin.core.log.Log
message: "I'm before the fail on condition"
- id: fail
type: io.kestra.plugin.core.execution.Fail
condition: "{{ inputs.param == 'fail' }}"
- id: after
type: io.kestra.plugin.core.log.Log
message: "I'm after the fail on condition"
Subflow
This task will trigger another flow. This allows you to decouple the first flow from the second and monitor each flow individually.
You can pass flow outputs as inputs to the triggered subflow (those must be declared in the subflow).
id: subflow
namespace: company.team
tasks:
- id: "subflow"
type: io.kestra.plugin.core.flow.Subflow
namespace: company.team
flowId: my-subflow
inputs:
file: "{{ inputs.myFile }}"
store: 12
Worker
The Worker
task is deprecated in favor of theWorkingDirectory
task. The next section explains how you can use theWorkingDirectory
task in order to allow multiple tasks to share a file system during the flow's Execution.
WorkingDirectory
By default, Kestra will launch each task in a new working directory, possibly on different workers if multiple ones exist.
The example below will run all tasks nested under the WorkingDirectory
task sequentially. All those tasks will be executed in the same working directory, allowing the reuse of the previous tasks' output files in the downstream tasks. In order to share a working directory, all tasks nested under the WorkingDirectory
task will be launched on the same worker.
This task can be particularly useful for compute-intensive file system operations.
id: working_dir_flow
namespace: company.team
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: first
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'
- id: second
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- |
echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::'
This task can also cache files inside the working directory, for example, to cache script dependencies like the node_modules
of a node Script
task.
id: node_with_cache
namespace: company.team
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
cache:
patterns:
- node_modules/**
ttl: PT1H
tasks:
- id: script
type: io.kestra.plugin.scripts.node.Script
beforeCommands:
- npm install colors
script: |
const colors = require("colors");
console.log(colors.red("Hello"));
This task can also fetch files from namespace files and make them available to all child tasks.
id: node_with_cache
namespace: company.team
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
namespaceFiles:
enabled: true
include:
- dir1/*.*
exclude:
- dir2/*.*
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
commands:
- cat dir1/file1.txt
Pause
Kestra flows run until all tasks complete, but sometimes you need to:
- Add a manual validation before continuing the execution.
- Wait for some duration before continuing the execution.
For this, you can use the Pause task.
On the following example, the validation
will pause until a manual modification of the task step, and the wait
will wait for 5 minutes.
id: pause
namespace: company.team
tasks:
- id: validation
type: io.kestra.plugin.core.flow.Pause
tasks:
- id: ok
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "started after manual validation"'
- id: wait
type: io.kestra.plugin.core.flow.Pause
delay: PT5M
tasks:
- id: waited
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'echo "start after 5 minutes"'
A Pause task without delay will wait indefinitely until the task state is changed to Running. For this: go to the Gantt tab of the Execution page, click on the task, select Change status on the contextual menu and select Mark as RUNNING on the form. This will make the task run until its end.
DAG
This task allows defining dependencies between tasks by creating a directed acyclic graph (DAG). Instead of an explicit DAG structure, this task allows you to only define upstream dependencies for each task using the dependsOn
property. This way, you can set dependencies more implicitly for each task, and Kestra will figure out the overall flow structure.
id: dag
namespace: company.team
tasks:
- id: dag
description: "my task"
type: io.kestra.plugin.core.flow.Dag
tasks:
- task:
id: task1
type: io.kestra.plugin.core.log.Log
message: I'm the task 1
- task:
id: task2
type: io.kestra.plugin.core.log.Log
message: I'm the task 2
dependsOn:
- task1
- task:
id: task3
type: io.kestra.plugin.core.log.Log
message: I'm the task 3
dependsOn:
- task1
- task:
id: task4
type: io.kestra.plugin.core.log.Log
message: I'm the task 4
dependsOn:
- task2
- task:
id: task5
type: io.kestra.plugin.core.log.Log
message: I'm the task 5
dependsOn:
- task4
- task3
Template (deprecated)
Templates are lists of tasks that can be shared between flows. You can define a template and call it from other flows, allowing them to share a list of tasks and keep these tasks updated without changing your flow.
The following example uses the Template task to use a template.
id: template
namespace: company.team
tasks:
- id: template
type: io.kestra.plugin.core.flow.Template
namespace: company.team
templateId: template
Was this page helpful?