Flowable Tasks​Flowable ​Tasks

Run tasks or subflows in parallel, create loops and conditional branching.

Add parallelism using Flowable tasks

One of the most common orchestration requirements is to execute independent processes in parallel. For example, you can process data for each partition in parallel. This can significantly speed up the processing time.

The flow below uses the ForEach flowable task to execute a list of tasks in parallel.

  1. The concurrencyLimit property with value 0 makes the list of tasks to execute in parallel.
  2. The values property defines the list of items to iterate over.
  3. The tasks property defines the list of tasks to execute for each item in the list. You can access the iteration value using the {{ taskrun.value }} variable.
id: python_partitions
namespace: company.team

description: Process partitions in parallel

  - id: getPartitions
    type: io.kestra.plugin.scripts.python.Script
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/pydata:latest
    script: |
      from kestra import Kestra
      partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
      Kestra.outputs({'partitions': partitions})

  - id: processPartitions
    type: io.kestra.plugin.core.flow.ForEach
    concurrencyLimit: 0
    values: '{{ outputs.getPartitions.vars.partitions }}'
      - id: partition
        type: io.kestra.plugin.scripts.python.Script
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/pydata:latest
        script: |
          import random
          import time
          from kestra import Kestra

          filename = '{{ taskrun.value }}'
          print(f"Reading and processing partition {filename}")
          nr_rows = random.randint(1, 1000)
          processing_time = random.randint(1, 20)
          Kestra.counter('nr_rows', nr_rows, {'partition': filename})
          Kestra.timer('processing_time', processing_time, {'partition': filename})

To learn more about flowable tasks, check out the full flowable tasks documentation.

Was this page helpful?