Produce
type: "io.kestra.plugin.kafka.Produce"
Send a message to a Kafka topic.
Examples
Read a CSV file, transform it and send it to Kafka.
id: send_message_to_kafka
namespace: company.team
inputs:
- id: file
type: FILE
description: A CSV file with columns: id, username, tweet, and timestamp.
tasks:
- id: csv_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ inputs.file }}"
- id: ion_to_avro_schema
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.csv_to_ion.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"timestamp": row.timestamp,
"headers": {
"key": "value"
}
};
row = result
- id: avro_to_kafka
type: io.kestra.plugin.kafka.Produce
from: "{{ outputs.ion_to_avro_schema.uri }}"
keySerializer: STRING
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
topic: test_kestra
valueAvroSchema: |
{"type":"record","name":"twitter_schema","namespace":"io.kestra.examples","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"}]}
valueSerializer: AVRO
Properties
from
- Type:
- string
- array
- object
- Dynamic: ✔️
- Required: ✔️
The content of the message to be sent to Kafka.
Can be a Kestra internal storage URI, a map (i.e. a list of key-value pairs) or a list of maps. The following keys are supported:
key
,value
,partition
,timestamp
, andheaders
.
keySerializer
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
The serializer used for the key.
Possible values are:
STRING
,INTEGER
,FLOAT
,DOUBLE
,LONG
,SHORT
,BYTE_ARRAY
,BYTE_BUFFER
,BYTES
,UUID
,VOID
,AVRO
,JSON
.
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ✔️
Kafka connection properties.
The
bootstrap.servers
property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs. If you want to pass a truststore or a keystore, you must provide a base64 encoded string forssl.keystore.location
andssl.truststore.location
.
valueSerializer
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
The serializer used for the value.
Possible values are:
STRING
,INTEGER
,FLOAT
,DOUBLE
,LONG
,SHORT
,BYTE_ARRAY
,BYTE_BUFFER
,BYTES
,UUID
,VOID
,AVRO
,JSON
.
keyAvroSchema
- Type: string
- Dynamic: ✔️
- Required: ❌
Avro Schema if the key is set to AVRO
type.
serdeProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
- Default:
{}
Serializer configuration
Configuration that will be passed to serializer or deserializer. The
avro.use.logical.type.converters
is always passed when you have any values set totrue
.
topic
- Type: string
- Dynamic: ✔️
- Required: ❌
Kafka topic to which the message should be sent.
Could also be passed inside the
from
property using the keytopic
.
transactional
- Type:
- boolean
- string
- Dynamic: ✔️
- Required: ❌
valueAvroSchema
- Type: string
- Dynamic: ✔️
- Required: ❌
Avro Schema if the value is set to AVRO
type.
Outputs
messagesCount
- Type: integer
- Required: ❌