Kafka Sink
Overview
The Kafka sink writes events to a Kafka topic. It can extract the partition key from the record based on specific sources such as the stream ID, headers, or record key and also supports basic authentication and resilience features to handle transient errors.
Quickstart
You can create the Kafka Sink connector as follows:
$JSON = @"
{
"settings": {
"instanceTypeName": "kafka-sink",
"topic": "customers",
"bootstrapServers": "localhost:9092",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream",
"authentication:username": "your-username",
"authentication:password": "your-password",
"authentication:securityProtocol": "SaslSsl",
"waitForBrokerAck": "true"
}
}
"@ `
curl.exe -X POST `
-H "Content-Type: application/json" `
-d $JSON `
http://localhost:2113/connectors/kafka-sink-connector
JSON='{
"settings": {
"instanceTypeName": "kafka-sink",
"topic": "your-topic",
"bootstrapServers": "your-kafka-cluster-address:9092",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream",
"authentication:username": "your-username",
"authentication:password": "your-password",
"authentication:securityProtocol": "SaslSsl",
"waitForBrokerAck": "true"
}
}'
curl -X POST \
-H "Content-Type: application/json" \
-d "$JSON" \
http://localhost:2113/connectors/mongo-sink-connector
After creating and starting the Kafka sink connector, every time an event is appended to the example-stream
, the Kafka sink connector will send the record to the specified Kafka topic.You can find a list of available management API endpoints in the API Reference.
Settings
Adjust these settings to specify the behavior and interaction of your Kafka sink connector with EventStoreDB, ensuring it operates according to your requirements and preferences.
Tips
The Kafka sink inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page.
The kafka sink can be configured with the following options:
Name | Details |
---|---|
topic | required Type: string Description: The Kafka topic to produce records to. |
bootstrapServers | Type: string Description: Comma-separated list of Kafka broker addresses. Default: localhost:9092 |
defaultHeaders | Type: dict<string,string> Description: Headers included in all produced messages. Default: None |
authentication:securityProtocol | Type: SecurityProtocol Description: Protocol used for Kafka broker communication. Default: Plaintext |
authentication:username | Type: string Description: Username for authentication. |
authentication:password | Type: string Description: Password for authentication. |
Partitioning
Name | Details |
---|---|
partitionKeyExtraction:enabled | Type: boolean Description: Enables partition key extraction. Default: false |
partitionKeyExtraction:source | Type: Enum Description: Source for extracting the partition key. Accepted Values: stream , streamSuffix , headers Default: PartitionKey |
partitionKeyExtraction:expression | Type: string Description: Regular expression for extracting the partition key. |
See the Partitioning section for examples.
Resilience
Besides the common sink settings that can be found in the Resilience Configuration page, the Kafka sink connector supports additional settings related to resilience:
Name | Details |
---|---|
waitForBrokerAck | Type: boolean Description: Whether the producer waits for broker acknowledgment before considering the send operation complete. Default: true |
resilience:enabled | Type: boolean Description: Enables resilience features for message handling. Default: true |
resilience:maxRetries | Type: int Description: Maximum number of retry attempts. Default: -1 (unlimited) |
resilience:transientErrorDelay | Type: TimeSpan Description: Delay between retries for transient errors. Default: 00:00:00 |
resilience:reconnectBackoffMaxMs | Type: int Description: Maximum backoff time in milliseconds for reconnection attempts. Default: 20000 |
resilience:messageSendMaxRetries | Type: int Description: Number of times to retry sending a failing message. Note: Retrying may cause reordering unless enable.idempotence is set to true.Default: 2147483647 |
Miscellaneous
Name | Details |
---|---|
brokerAddressFamily | Type: BrokerAddressFamily Description: Allowed broker IP address families. Default: V4 |
compression:type | Type: CompressionType Description: Kafka compression type. Default: Zstd |
compression:level | Type: int Description: Kafka compression level. Default: 6 |
At least once delivery
The Kafka sink guarantees at least once delivery by retrying failed requests based on configurable resilience settings. It will continue to attempt delivery until the event is successfully sent or the maximum number of retries is reached, ensuring each event is delivered at least once.
The Kafka sink currently retries transient errors based on the following error codes:
- Local_AllBrokersDown: All broker connections are down
- OutOfOrderSequenceNumber: Broker received an out of order sequence number
- TransactionCoordinatorFenced: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
- UnknownProducerId: Unknown Producer Id.
For detailed information on the listed error codes, refer to the Kafka documentation.
Configuration example
{
"resilience:enabled": true,
"resilience:requestTimeoutMs": 3000,
"resilience:maxRetries": -1,
"resilience:transientErrorDelay": "00:00:05",
"resilience:reconnectBackoffMaxMs": 20000,
"resilience:messageSendMaxRetries": 2147483647
}
Broker Acknowledgment
In the Kafka sink connector for EventStoreDB, broker acknowledgment refers to the producer waiting for confirmation from the Kafka broker that a message has been successfully received. When waitForBrokerAck
is enabled (which is the default setting), the producer waits for this acknowledgment, ensuring more reliable delivery of messages, which is crucial for systems that require durability and fault tolerance.
While this setting improves reliability, it can slightly increase latency, as the producer must wait for confirmation from Kafka before continuing. If higher throughput is preferred over strict delivery guarantees, you can disable this option.
For more details about Kafka broker acknowledgment, refer to Kafka's official documentation.
To learn more about authentication in Kafka, see Authentication using SASL
For Kafka client enum types, please refer to the official Kafka .NET client documentation.
Examples
Partitioning
The Kafka sink connector writes events to Kafka topics, and it allows the customization of partition keys. Kafka's partitioning strategy is essential for ensuring that related messages are sent to the same partition, which helps maintain message ordering and effective load distribution. Read more about Kafka Partitions.
Kafka partition keys can be generated from various sources, similar to how document IDs are generated in the MongoDB connector. These sources include the event stream, stream suffix, headers, or other record fields.
By default, it will use the PartitionKey
and grab this value from the EventStoreDB record.
Partition using Stream ID
You can extract part of the stream name using a regular expression (regex) to define the partition key. The expression is optional and can be customized based on your naming convention. In this example, the expression captures the stream name up to _data
.
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "stream",
"partitionKeyExtraction:expression": "^(.*)_data$"
}
Alternatively, if you only need the last segment of the stream name (after a hyphen), you can use the streamSuffix
source. This doesn't require an expression since it automatically extracts the suffix.
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "streamSuffix"
}
The streamSuffix
source is useful when stream names follow a structured format, and you want to use only the trailing part as the document ID. For example, if the stream is named user-123
, the partition key would be 123
.
Partition using header values
You can generate the document ID by concatenating values from specific event headers. In this case, two header values (key1
and key2
) are combined to form the ID.
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "headers",
"partitionKeyExtraction:expression": "key1,key2"
}
The Headers
source allows you to pull values from the event's metadata. The documentId:expression
field lists the header keys (in this case, key1
and key2
), and their values are concatenated to generate the document ID. This is useful when headers hold important metadata that should define the document's unique identifier, such as region, user ID, or other identifiers.
Click here to see an example
{
"key1": "value1",
"key2": "value2"
}
// outputs "value1-value2"