Persistent Subscriptions

Creating a subscription group

The first step of dealing with a subscription group is to create one. You will receive an error if you attempt to create a subscription group multiple times. You must have admin permissions to create a persistent subscription group.


            var userCredentials = new UserCredentials("admin", "changeit");

            var settings = new PersistentSubscriptionSettings();
            await client.CreateAsync(
                "test-stream",
                "subscription-group",
                settings,
                userCredentials);
1
2
3
4
5
6
7
8
9
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to create.
settingsThe settings to use when creating the subscription.
credentialsThe user credentials to use for this operation.

Connecting to a subscription group

Once you have created a subscription group, clients can connect to that subscription group. A subscription in your application should only have the connection in your code, you should assume that the subscription already exists.

The most important parameter to pass when connecting is the buffer size. This represents how many outstanding messages the server should allow this client. If this number is too small, your subscription will spend much of its time idle as it waits for an acknowledgment to come back from the client. If it's too big, you waste resources and can start causing time out messages depending on the speed of your processing.


            var subscription = await client.SubscribeAsync(
                "test-stream",
                "subscription-group",
                async (subscription, evnt, retryCount, cancellationToken) => {
                    await HandleEvent(evnt);
                }, (subscription, dropReason, exception) => {
                    Console.WriteLine(quot;Subscription was dropped due to {dropReason}. {exception}");
                });
1
2
3
4
5
6
7
8
9
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to subscribe to.
eventAppearedThe action to call when an event arrives over the subscription.
subscriptionDroppedThe action to call if the subscription is dropped.
credentialsThe user credentials to use for this operation.
bufferSizeThe number of in-flight messages this client is allowed. Default: 10
autoAckWhether to automatically acknowledge messages after eventAppeared returns. Default: true

Acknowledgements

Clients must acknowledge (or not acknowledge) messages in the competing consumer model. If you enable auto-ack the subscription will automatically acknowledge messages once your handler completes them. If an error occurs, it will shut down your subscription with a message and the error.

You can choose to not auto-ack messages. This can be useful when you have multi-threaded processing of messages in your subscriber and need to pass control to something else. If you want to manually acknowlegde events, you need to set this option when subscribing and then acknowledge or not acknowledge messages as you handle them.


            var subscription = await client.SubscribeAsync(
                "test-stream",
                "subscription-group",
                async (subscription, evnt, retryCount, cancellationToken) => {
                    try {
                        await HandleEvent(evnt);
                        await subscription.Ack(evnt);
                    } catch (UnrecoverableException ex) {
                        await subscription.Nack(PersistentSubscriptionNakEventAction.Park, ex.Message, evnt);
                    }
                }, (subscription, dropReason, exception) => {
                    Console.WriteLine(quot;Subscription was dropped due to {dropReason}. {exception}");
                }, autoAck: false);
1
2
3
4
5
6
7
8
9
10
11
12
13
14

The Nak Actions describe what the server should do with the message:

ActionDescription
UnknownThe client does not know what action to take. Let the server decide.
ParkPark the message and do not resend. Put it on poison queue.
RetryExplicitly retry the message.
SkipSkip this message do not resend and do not put in poison queue.

Consumer strategies

When creating a persistent subscription, you can choose between a number of consumer strategies.

RoundRobin (default)

Distributes events to all clients evenly. If the client bufferSize is reached, the client won't receive more events until it acknowledges or not acknowledges events in its buffer.

This strategy provides equal load balancing between all consumers in the group.

DispatchToSingle

Distributes events to a single client until the bufferSize is reached. After that, the next client is selected in a round-robin style, and the process repeats.

This option can be seen as a fall-back scenario for high availability, when a single consumer processes all the events until it reaches its maximum capacity. When that happens, another consumer takes the load to free up the main consumer resources.

Pinned

For use with an indexing projection such as the system $by_category projection.

EventStoreDB inspects the event for its source stream id, hashing the id to one of 1024 buckets assigned to individual clients. When a client disconnects, its buckets are assigned to other clients. When a client connects, it is assigned some existing buckets. This naively attempts to maintain a balanced workload.

The main aim of this strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing. This is not a guarantee, and you should handle the usual ordering and concurrency issues.

Updating a subscription group

You can edit the settings of an existing subscription group while it is running, you don't need to delete and recreate it to change settings. When you update the subscription group, it resets itself internally, dropping the connections and having them reconnect. You must have admin permissions to update a persistent subscription group.


            var userCredentials = new UserCredentials("admin", "changeit");
            var settings = new PersistentSubscriptionSettings(
                resolveLinkTos: true,
                minCheckPointCount: 20);

            await client.UpdateAsync(
                "test-stream",
                "subscription-group",
                settings,
                userCredentials);
1
2
3
4
5
6
7
8
9
10
11
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to update.
settingsThe settings to use when creating the subscription.
credentialsThe user credentials to use for this operation.

Persistent subscription settings

Both the Create and Update methods take some settings for configuring the persistent subscription.

The following table shows the configuration options you can set on a persistent subscription.

OptionDescriptionDefault
ResolveLinkTosWhether the subscription should resolve link events to their linked events.false
StartFromThe exclusive position in the stream or transaction file the subscription should start from.null (start from the end of the stream)
ExtraStatisticsWhether to track latency statistics on this subscription.false
MessageTimeoutThe amount of time after which to consider a message as timed out and retried.30 (seconds)
MaxRetryCountThe maximum number of retries (due to timeout) before a message is considered to be parked.10
LiveBufferSizeThe size of the buffer (in-memory) listening to live messages as they happen before paging occurs.500
ReadBatchSizeThe number of events read at a time when paging through history.20
HistoryBufferSizeThe number of events to cache when paging through history.500
CheckPointAfterThe amount of time to try to checkpoint after.2 seconds
MinCheckPointCountThe minimum number of messages to process before a checkpoint may be written.10
MaxCheckPointCountThe maximum number of messages not checkpointed before forcing a checkpoint.1000
MaxSubscriberCountThe maximum number of subscribers allowed.0 (unbounded)
NamedConsumerStrategyThe strategy to use for distributing events to client consumers. See the consumer strategies in this doc.RoundRobin

Deleting a subscription group

Remove a subscription group with the delete operation. Like the creation of groups, you rarely do this in your runtime code and is undertaken by an administrator running a script.


            var userCredentials = new UserCredentials("admin", "changeit");
            await client.DeleteAsync(
                "test-stream",
                "subscription-group",
                userCredentials);
1
2
3
4
5
6
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to delete.
credentialsThe user credentials to use for this operation
Last Updated: 9/22/2021, 1:59:50 PM
Contributors: Mathew McLoughlin