Blueprints

Extract data, transform it, and load it in parallel to S3 and Postgres — in less than 7 seconds!

Source

yaml
id: api-json-to-postgres
namespace: company.team

tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: https://gorest.co.in/public/v2/users

  - id: ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.download.uri }}"
    newLine: false

  - id: json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.ion.uri }}"

  - id: add_column
    type: io.kestra.plugin.scripts.jython.FileTransform
    from: "{{ outputs.json.uri }}"
    script: |
      from datetime import datetime
      logger.info('row: {}', row)
      row['inserted_at'] = datetime.utcnow()

  - id: parallel
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: postgres
        type: io.kestra.plugin.core.flow.Sequential
        tasks:
          - id: final_csv
            type: io.kestra.plugin.serdes.csv.IonToCsv
            from: "{{ outputs.add_column.uri }}"
            header: true

          - id: create_table
            type: io.kestra.plugin.jdbc.postgresql.Query
            url: jdbc:postgresql://host.docker.internal:5432/
            username: postgres
            password: qwerasdfyxcv1234
            sql: |
              CREATE TABLE IF NOT EXISTS public.raw_users
                (
                    id            int,
                    name          VARCHAR,
                    email         VARCHAR,
                    gender        VARCHAR,
                    status        VARCHAR,
                    inserted_at   timestamp
                );

          - id: load_data
            type: io.kestra.plugin.jdbc.postgresql.CopyIn
            url: jdbc:postgresql://host.docker.internal:5432/
            username: postgres
            password: qwerasdfyxcv1234
            format: CSV
            from: "{{ outputs.final_csv.uri }}"
            table: public.raw_users
            header: true

      - id: s3
        type: io.kestra.plugin.core.flow.Sequential
        tasks:
          - id: final_json
            type: io.kestra.plugin.serdes.json.IonToJson
            from: "{{ outputs.add_column.uri }}"

          - id: json_to_s3
            type: io.kestra.plugin.aws.s3.Upload
            from: "{{ outputs.final_json.uri }}"
            key: users.json
            bucket: kestraio
            region: "{{ secret('AWS_DEFAULT_REGION') }}"
            accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
            secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"

About this blueprint

S3 Parallel Ingest Postgres Outputs

EXTRACT: This workflow downloads data from an external API via HTTP GET request and stores the result in Kestra's internal storage in ION format. TRANSFORM: The extracted file is then serialized to JSON and transformed row-by-row to add a new column. LOAD: The final result is ingested in parallel to S3 and Postgres.

Download

Json To Ion

Ion To Json

File Transform

Parallel

Sequential

Ion To Csv

Query

Copy In

Upload

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra