Control your orchestration logic.

Flowable tasks control the orchestration logic — run tasks or subflows in parallel, create loops and conditional branching.

Flowable Tasks don't run heavy operations — those are handled by workers.

Flowable Tasks are used for branching, grouping, running tasks in parallel, and more.

Flowable Tasks use expressions from the execution context to define the next tasks to run. For example, you can use the outputs of a previous task in a Switch task to decide which task to run next.

Sequential

This task processes tasks one after another sequentially. It is used to group tasks.

yaml
id: sequential
namespace: company.team

tasks:
  - id: sequential
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: 1st
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.startDate }}"

      - id: 2nd
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.id }}"

  - id: last
    type: io.kestra.plugin.core.debug.Return
    format: "{{ task.id }} > {{ taskrun.startDate }}"

Parallel

This task processes tasks in parallel. It makes it convenient to process many tasks at once.

yaml
id: parallel
namespace: company.team

tasks:
  - id: parallel
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: 1st
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.startDate }}"

      - id: 2nd
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.id }}"

  - id: last
    type: io.kestra.plugin.core.debug.Return
    format: "{{ task.id }} > {{ taskrun.startDate }}"

Switch

This task processes a set of tasks conditionally depending on a contextual variable's value.

In the following example, an input will be used to decide which task to run next.

yaml
id: switch
namespace: company.team

inputs:
  - id: param
    type: BOOLEAN

tasks:
  - id: decision
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.param }}"
    cases:
      true:
        - id: is_true
          type: io.kestra.plugin.core.log.Log
          message: "This is true"
      false:
        - id: is_false
          type: io.kestra.plugin.core.log.Log
          message: "This is false"

If

This task processes a set of tasks conditionally depending on a condition. The condition must coerce to a boolean. Boolean coercion allows 0, -0, null and '' to coerce to false, all other values to coerce to true. The else branch is optional.

In the following example, an input will be used to decide which task to run next.

yaml
id: if_condition
namespace: company.team

inputs:
  - id: param
    type: BOOLEAN

tasks:
  - id: if
    type: io.kestra.plugin.core.flow.If
    condition: "{{ inputs.param }}"
    then:
      - id: when_true
        type: io.kestra.plugin.core.log.Log
        message: "This is true"
    else:
      - id: when_false
        type: io.kestra.plugin.core.log.Log
        message: "This is false"

ForEach

This task will execute a group of tasks for each value in the list.

In the following example, the variable is static, but it can be generated from a previous task output and starts an arbitrary number of subtasks.

yaml
id: foreach_example
namespace: company.team

tasks:
  - id: for_each
    type: io.kestra.plugin.core.flow.ForEach
    values: ["value 1", "value 2", "value 3"]
    tasks:
      - id: before_if
        type: io.kestra.plugin.core.debug.Return
        format: "Before if {{ taskrun.value }}"
      - id: if
        type: io.kestra.plugin.core.flow.If
        condition: '{{ taskrun.value == "value 2" }}'
        then:
          - id: after_if
            type: io.kestra.plugin.core.debug.Return
            format: "After if {{ parent.taskrun.value }}"

This example shows how to run tasks in parallel for each value in the list. All child tasks of the parallel task will run in parallel. However, due to the concurrencyLimit property set to 2, only two parallel task groups will run at any given time.

yaml
id: parallel_tasks_example
namespace: company.team

tasks:
  - id: for_each
    type: io.kestra.plugin.core.flow.ForEach
    values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    concurrencyLimit: 2
    tasks:
      - id: parallel
        type: io.kestra.plugin.core.flow.Parallel
        tasks:
        - id: log
          type: io.kestra.plugin.core.log.Log
          message: Processing {{ parent.taskrun.value }}
        - id: shell
          type: io.kestra.plugin.scripts.shell.Commands
          commands:
            - sleep {{ parent.taskrun.value }}

For processing items, or forwarding processing to a subflow, ForEachItem is better suited.

ForEachItem

This task allows you to iterate over a list of items and run a subflow for each item, or for each batch containing multiple items.

Syntax:

yaml
  - id: each
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }}
    inputs:
      file: "{{ taskrun.items }}" # items of the batch
    batch:
      rows: 4
    namespace: company.team
    flowId: subflow
    revision: 1 # optional (default: latest)
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    labels: # optional labels to pass to the subflow to be executed
      key: value

This will execute the subflow company.team.subflow for each batch of items. To pass the batch of items to a subflow, you can use inputs. The example above uses an input of FILE type called file that takes the URI of an internal storage file containing the batch of items.

The next example shows you how you can access the outputs from each subflow executed. The ForEachItem automatically merges the URIs of the outputs from each subflow into a single file. The URI of this file is available through the subflowOutputs output.

yaml
id: for_each_item
namespace: company.team

tasks:
  - id: generate
    type: io.kestra.plugin.scripts.shell.Script
    script: |
      for i in $(seq 1 10); do echo "$i" >> data; done
    outputFiles:
      - data

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.generate.outputFiles.data }}"
    batch:
      rows: 4
    wait: true
    flowId: my_subflow
    namespace: company.team
    inputs:
       value: "{{ taskrun.items }}"

  - id: for_each_outputs
    type: io.kestra.plugin.core.log.Log
    message: "{{ outputs.forEachItem_merge.subflowOutputs }}" # Log the URI of the file containing the URIs of the outputs from each subflow

ForEach vs ForEachItem

Both ForEach and ForEachItem are similar, but there are specific use cases that suit one over the other:

  • ForEach generates a lot of Task Runs which can impact performance.
  • ForEachItem generates separate executions using Subflows for the group of tasks. This scales better for larger datasets.

Read more about performance optimization here.


AllowFailure

This task will allow child tasks to fail. If any child task fails:

  • The AllowFailure failed task will be marked as status WARNING.
  • All children's tasks inside the AllowFailure will be stopped immediately.
  • The Execution will continue for all others tasks.
  • At the end, the execution as a whole will also be marked as status WARNING.

In the following example:

  • allow_failure will be labelled as WARNING.
  • ko will be labelled as FAILED.
  • next will not be run.
  • end will be run and labelled SUCCESS.
yaml
id: each
namespace: company.team

tasks:
  - id: allow_failure
    type: io.kestra.plugin.core.flow.AllowFailure
    tasks:
      - id: ko
        type: io.kestra.plugin.core.execution.Fail
      - id: next
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.startDate }}"

  - id: end
    type: io.kestra.plugin.core.debug.Return
    format: "{{ task.id }} > {{ taskrun.startDate }}"

Fail

This task will fail the flow; it can be used with or without conditions.

Without conditions, it can be used, for example, to fail on some switch value.

yaml
id: fail_on_switch
namespace: company.team

inputs:
  - id: param
    type: STRING
    required: true

tasks:
  - id: switch
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.param }}"
    cases:
      case1:
        - id: case1
          type: io.kestra.plugin.core.log.Log
          message: Case 1
      case2:
        - id: case2
          type: io.kestra.plugin.core.log.Log
          message: Case 2
      notexist:
        - id: fail
          type: io.kestra.plugin.core.execution.Fail
      default:
        - id: default
          type: io.kestra.plugin.core.log.Log
          message: default

With conditions, it can be used, for example, to validate inputs.

yaml
id: fail_on_condition
namespace: company.team

inputs:
  - id: param
    type: STRING
    required: true

tasks:
  - id: before
    type: io.kestra.plugin.core.log.Log
    message: "I'm before the fail on condition"
  - id: fail
    type: io.kestra.plugin.core.execution.Fail
    condition: "{{ inputs.param == 'fail' }}"
  - id: after
    type: io.kestra.plugin.core.log.Log
    message: "I'm after the fail on condition"

Subflow

This task will trigger another flow. This allows you to decouple the first flow from the second and monitor each flow individually.

You can pass flow outputs as inputs to the triggered subflow (those must be declared in the subflow).

yaml
id: subflow
namespace: company.team

tasks:
  - id: "subflow"
    type: io.kestra.plugin.core.flow.Subflow
    namespace: company.team
    flowId: my-subflow
    inputs:
      file: "{{ inputs.myFile }}"
      store: 12

Worker

The Worker task is deprecated in favor of theWorkingDirectory task. The next section explains how you can use theWorkingDirectory task in order to allow multiple tasks to share a file system during the flow's Execution.

WorkingDirectory

By default, Kestra will launch each task in a new working directory, possibly on different workers if multiple ones exist.

The example below will run all tasks nested under the WorkingDirectory task sequentially. All those tasks will be executed in the same working directory, allowing the reuse of the previous tasks' output files in the downstream tasks. In order to share a working directory, all tasks nested under the WorkingDirectory task will be launched on the same worker.

This task can be particularly useful for compute-intensive file system operations.

yaml
id: working_dir_flow
namespace: company.team

tasks:
  - id: working_dir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: first
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'

      - id: second
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - |
            echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::'

This task can also cache files inside the working directory, for example, to cache script dependencies like the node_modules of a node Script task.

yaml
id: node_with_cache
namespace: company.team

tasks:
  - id: working_dir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    cache:
      patterns:
        - node_modules/**
      ttl: PT1H
    tasks:
      - id: script
        type: io.kestra.plugin.scripts.node.Script
        beforeCommands:
          - npm install colors
        script: |
          const colors = require("colors");
          console.log(colors.red("Hello"));

This task can also fetch files from namespace files and make them available to all child tasks.

yaml
id: node_with_cache
namespace: company.team

tasks:
  - id: working_dir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    namespaceFiles:
      enabled: true
      include:
        - dir1/*.*
      exclude:
        - dir2/*.*
    tasks:
      - id: shell
        type: io.kestra.plugin.scripts.shell.Commands
        commands:
          - cat dir1/file1.txt

Pause

Kestra flows run until all tasks complete, but sometimes you need to:

  • Add a manual validation before continuing the execution.
  • Wait for some duration before continuing the execution.

For this, you can use the Pause task.

On the following example, the validation will pause until a manual modification of the task step, and the wait will wait for 5 minutes.

yaml
id: pause
namespace: company.team

tasks:
  - id: validation
    type: io.kestra.plugin.core.flow.Pause
    tasks:
      - id: ok
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "started after manual validation"'

  - id: wait
    type: io.kestra.plugin.core.flow.Pause
    delay: PT5M
    tasks:
      - id: waited
        type: io.kestra.plugin.scripts.shell.Commands
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
        commands:
          - 'echo "start after 5 minutes"'

DAG

This task allows defining dependencies between tasks by creating a directed acyclic graph (DAG). Instead of an explicit DAG structure, this task allows you to only define upstream dependencies for each task using the dependsOn property. This way, you can set dependencies more implicitly for each task, and Kestra will figure out the overall flow structure.

yaml
id: dag
namespace: company.team
tasks:
  - id: dag
    description: "my task"
    type: io.kestra.plugin.core.flow.Dag
    tasks:
      - task:
          id: task1
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 1
      - task:
          id: task2
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 2
        dependsOn:
          - task1
      - task:
          id: task3
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 3
        dependsOn:
          - task1
      - task:
          id: task4
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 4
        dependsOn:
          - task2
      - task:
          id: task5
          type: io.kestra.plugin.core.log.Log
          message: I'm the task 5
        dependsOn:
          - task4
          - task3

Template (deprecated)

Templates are lists of tasks that can be shared between flows. You can define a template and call it from other flows, allowing them to share a list of tasks and keep these tasks updated without changing your flow.

The following example uses the Template task to use a template.

yaml
id: template
namespace: company.team

tasks:
  - id: template
    type: io.kestra.plugin.core.flow.Template
    namespace: company.team
    templateId: template

Was this page helpful?