yaml
type: "io.kestra.plugin.pulsar.Produce"

Produce message to a Pulsar topic.

Examples

Read a CSV file, transform it to the right format, and publish it to Pulsar topic.

yaml
    id: produce
    namespace: company.team

    inputs:
      - type: FILE
        id: file

    tasks:
      - id: csv_reader
        type: io.kestra.plugin.serdes.csv.CsvToIon
        from: "{{ inputs.file }}"

      - id: file_transform
        type: io.kestra.plugin.scripts.nashorn.FileTransform
        from: {{ outputs.csv_reader.uri }}"
        script: |
          var result = {
            "key": row.id,
            "value": {
              "username": row.username,
              "tweet": row.tweet
            },
            "eventTime": row.timestamp,
            "properties": {
              "key": "value"
            }
          };
          row = result

      - id: produce
        type: io.kestra.plugin.pulsar.Produce
        from: "{{ outputs.file_transform.uri }}"
        uri: pulsar://localhost:26650
        serializer: JSON
        topic: test_kestra

Properties

from

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Source of the sent message.

Can be a Kestra internal storage URI, a map or a list in the following format: key, value, eventTime, properties, deliverAt, deliverAfter and sequenceId.

serializer

  • Type: object
  • Dynamic:
  • Required: ✔️

topic

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic to send a message to.

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs.

You need to specify a Pulsar protocol URL.

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6650,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651

accessMode

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • Shared
    • Exclusive
    • ExclusiveWithFencing
    • WaitForExclusive

Configure the type of access mode that the producer requires on the topic.

Possible values are:

  • Shared: By default, multiple producers can publish to a topic.
  • Exclusive: Require exclusive access for producer. Fail immediately if there's already a producer connected.
  • WaitForExclusive: Producer creation is pending until it can acquire exclusive access.

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication token.

Authentication token that can be required by some providers such as Clever Cloud.

compressionType

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • NONE
    • LZ4
    • ZLIB
    • ZSTD
    • SNAPPY

Set the compression type for the producer.

By default, message payloads are not compressed. Supported compression types are:

  • NONE: No compression (Default).
  • LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib.
  • ZLIB: Standard ZLib compression.
  • ZSTD Compress with Zstandard codec. Since Pulsar 2.3.
  • SNAPPY Compress with Snappy codec. Since Pulsar 2.4.

encryptionKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Add public encryption key, used by producer to encrypt the data key.

producerName

  • Type: string
  • Dynamic: ✔️
  • Required:

Specify a name for the producer.

producerProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Add all the properties in the provided map to the producer.

schemaString

  • Type: string
  • Dynamic: ✔️
  • Required:

JSON string of the topic's schema

Required for connecting with topics with a defined schema and strict schema checking

schemaType

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: NONE
  • Possible Values:
    • NONE
    • AVRO
    • JSON

The schema type of the topic

Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.

tlsOptions

TLS authentication options.

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Outputs

messagesCount

  • Type: integer
  • Required:

Definitions

io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions

  • ca
    • Type: string
    • Dynamic:
    • Required:
  • cert
    • Type: string
    • Dynamic:
    • Required:
  • key
    • Type: string
    • Dynamic:
    • Required: