RealtimeTrigger
Consume a message in real-time from a Azure Event Hubs and create one execution per message.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.azure.eventhubs.Trigger instead.
type: "io.kestra.plugin.azure.eventhubs.RealtimeTrigger"
Trigger flow based on events received from Azure Event Hubs in real-time.
id: azure_eventhubs_realtime_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: Hello there! I received {{ trigger.body }} from Azure EventHubs!
triggers:
- id: read_from_eventhub
type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger
eventHubName: my_eventhub
namespace: my_eventhub_namespace
connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
bodyDeserializer: JSON
consumerGroup: "$Default"
checkpointStoreProperties:
containerName: kestra
connectionString: "{{ secret('BLOB_CONNECTION') }}"
Use Azure Eventhubs Realtime Trigger to push events into StorageTable
id: eventhubs_realtime_trigger
namespace: company.team
tasks:
- id: insert_into_storagetable
type: io.kestra.plugin.azure.storage.table.Bulk
endpoint: https://yourstorageaccount.blob.core.windows.net
connectionString: "{{ secret('STORAGETABLE_CONNECTION') }}"
table: orders
from:
- partitionKey: order_id
rowKey: "{{ trigger.body | jq('.order_id') | first }}"
properties:
customer_name: "{{ trigger.body | jq('.customer_name') | first }}"
customer_email: "{{ trigger.body | jq('.customer_email') | first }}"
product_id: "{{ trigger.body | jq('.product_id') | first }}"
price: "{{ trigger.body | jq('.price') | first }}"
quantity: "{{ trigger.body | jq('.quantity') | first }}"
total: "{{ trigger.body | jq('.total') | first }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger
eventHubName: orders
namespace: kestra
connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
bodyDeserializer: JSON
consumerGroup: $Default
checkpointStoreProperties:
containerName: kestra
connectionString: "{{ secret('BLOB_CONNECTION') }}"
The event hub to read from.
Namespace name of the event hub to connect to.
STRING
STRING
BINARY
ION
JSON
The Deserializer to be used for serializing the event value.
{}
The config properties to be passed to the Deserializer.
Configs in key/value pairs.
{}
The config properties to be used for configuring the BlobCheckpointStore.
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
5
The maximum number of retry attempts before considering a client operation to have failed.
500
The maximum permissible delay between retry attempts in milliseconds.
Connection string of the Storage Account.
$Default
The consumer group.
Custom endpoint address when connecting to the Event Hubs service.
The ISO Datetime to be used when PartitionStartingPosition
is configured to INSTANT
.
Configs in key/value pairs.
EARLIEST
EARLIEST
LATEST
INSTANT
The starting position.
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of execution states after which a trigger should be stopped (a.k.a. disabled).