yaml
type: "io.kestra.plugin.kafka.Produce"

Send a message to a Kafka topic.

Examples

Read a CSV file, transform it and send it to Kafka.

yaml
id: send_message_to_kafka
namespace: company.team

inputs:
  - id: file
    type: FILE
    description: A CSV file with columns: id, username, tweet, and timestamp.

tasks:
  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ inputs.file }}"

  - id: ion_to_avro_schema
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.csv_to_ion.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet
        },
        "timestamp": row.timestamp,
        "headers": {
          "key": "value"
        }
      };
      row = result

  - id: avro_to_kafka
    type: io.kestra.plugin.kafka.Produce
    from: "{{ outputs.ion_to_avro_schema.uri }}"
    keySerializer: STRING
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
    topic: test_kestra
    valueAvroSchema: |
      {"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
    valueSerializer: AVRO

Properties

from

  • Type:
    • string
    • array
    • object
  • Dynamic: ✔️
  • Required: ✔️

The content of the message to be sent to Kafka.

Can be a Kestra internal storage URI, a map (i.e. a list of key-value pairs) or a list of maps. The following keys are supported: key, value, partition, timestamp, and headers.

keySerializer

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • INTEGER
    • FLOAT
    • DOUBLE
    • LONG
    • SHORT
    • BYTE_ARRAY
    • BYTE_BUFFER
    • BYTES
    • UUID
    • VOID
    • AVRO
    • JSON

The serializer used for the key.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Kafka connection properties.

The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs. If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.

valueSerializer

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • INTEGER
    • FLOAT
    • DOUBLE
    • LONG
    • SHORT
    • BYTE_ARRAY
    • BYTE_BUFFER
    • BYTES
    • UUID
    • VOID
    • AVRO
    • JSON

The serializer used for the value.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

keyAvroSchema

  • Type: string
  • Dynamic: ✔️
  • Required:

Avro Schema if the key is set to AVRO type.

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:
  • Default: {}

Serializer configuration

Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.

topic

  • Type: string
  • Dynamic: ✔️
  • Required:

Kafka topic to which the message should be sent.

Could also be passed inside the from property using the key topic.

transactional

  • Type:
    • boolean
    • string
  • Dynamic: ✔️
  • Required:

valueAvroSchema

  • Type: string
  • Dynamic: ✔️
  • Required:

Avro Schema if the value is set to AVRO type.

Outputs

messagesCount

  • Type: integer
  • Required: