Persistent subscriptions
Persistent subscriptions
Persistent subscriptions are similar to catch-up subscriptions, but there are two key differences:
- The subscription checkpoint is maintained by the server. It means that when your client reconnects to the persistent subscription, it will automatically resume from the last known position.
- It's possible to connect more than one event consumer to the same persistent subscription. In that case, the server will load-balance the consumers, depending on the defined strategy, and distribute the events to them.
Because of those, persistent subscriptions are defined as subscription groups that are defined and maintained by the server. Consumer then connect to a particular subscription group, and the server starts sending event to the consumer.
You can read more about persistent subscriptions in the server documentation.
Creating a subscription group
The first step of dealing with a persistent subscription is to create a subscription group. You will receive an error if you attempt to create a subscription group multiple times. You must have admin permissions to create a persistent subscription group.
Subscribing to one stream
The following sample shows how to create a subscription group for a persistent subscription where you want to receive events from a specific stream. It could be a normal stream, or a stream of links (like $ce
category stream).
client.create_subscription_to_stream(
group_name="subscription-group",
stream_name=stream_name,
)
await client.createPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults(),
{ credentials: { username: "admin", password: "changeit" } }
);
await client.createPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults(),
{ credentials: { username: "admin", password: "changeit" } }
);
client.createToStream(
"test-stream",
"subscription-group",
CreatePersistentSubscriptionToStreamOptions.get()
.fromStart());
var userCredentials = new UserCredentials("admin", "changeit");
var settings = new PersistentSubscriptionSettings();
await client.CreateToStreamAsync(
"test-stream",
"subscription-group",
settings,
userCredentials: userCredentials
);
Console.WriteLine("Subscription to stream created");
err := client.CreatePersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.PersistentStreamSubscriptionOptions{})
if err != nil {
panic(err)
}
client
.create_persistent_subscription("test-stream", "subscription-group", &Default::default())
.await?;
Parameter | Description |
---|---|
stream | The stream the persistent subscription is on. |
groupName | The name of the subscription group to create. |
settings | The settings to use when creating the subscription. |
credentials | The user credentials to use for this operation. |
Subscribing to $all
The ability to subscribe to $all
was introduced in EventStoreDB 21.10. Persistent subscriptions to $all
also support filtering.
You can create a subscription group on $all much the same way you would create a subscription group on a stream:
client.create_subscription_to_all(
group_name=group_name,
filter_by_stream_name=True,
filter_include=[r"user-.*"],
)
await client.createPersistentSubscriptionToAll(
GROUP_NAME,
persistentSubscriptionToAllSettingsFromDefaults(),
{
filter: streamNameFilter({ prefixes: ["test"] }),
credentials: { username: "admin", password: "changeit" },
}
);
await client.createPersistentSubscriptionToAll(
GROUP_NAME,
persistentSubscriptionToAllSettingsFromDefaults(),
{
filter: streamNameFilter({ prefixes: ["test"] }),
credentials: { username: "admin", password: "changeit" },
}
);
client.createToAll(
"subscription-group",
CreatePersistentSubscriptionToAllOptions.get()
.fromStart());
var userCredentials = new UserCredentials("admin", "changeit");
var filter = StreamFilter.Prefix("test");
var settings = new PersistentSubscriptionSettings();
await client.CreateToAllAsync(
"subscription-group",
filter,
settings,
userCredentials: userCredentials
);
Console.WriteLine("Subscription to all created");
options := esdb.PersistentAllSubscriptionOptions{
Filter: &esdb.SubscriptionFilter{
Type: esdb.StreamFilterType,
Prefixes: []string{"test"},
},
}
err := client.CreatePersistentSubscriptionToAll(context.Background(), "subscription-group", options)
if err != nil {
panic(err)
}
let options = PersistentSubscriptionToAllOptions::default()
.filter(SubscriptionFilter::on_stream_name().add_prefix("test"));
client
.create_persistent_subscription_to_all("subscription-group", &options)
.await?;
Connecting a consumer
Once you have created a subscription group, clients can connect to it. A subscription in your application should only have the connection in your code, you should assume that the subscription already exists.
The most important parameter to pass when connecting is the buffer size. This represents how many outstanding messages the server should allow this client. If this number is too small, your subscription will spend much of its time idle as it waits for an acknowledgment to come back from the client. If it's too big, you waste resources and can start causing time out messages depending on the speed of your processing.
Connecting to one stream
The code below shows how to connect to an existing subscription group for a specific stream:
while True:
subscription = client.read_subscription_to_stream(
group_name="subscription-group",
stream_name=stream_name,
)
try:
for event in subscription:
try:
handle_event(event)
except Exception:
subscription.nack(event, action="park")
else:
subscription.ack(event)
except ConsumerTooSlow:
# subscription was dropped
continue
const subscription = client.subscribeToPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME
);
try {
for await (const event of subscription) {
try {
console.log(
`handling event ${event.event?.type} with retryCount ${event.retryCount}`
);
await handleEvent(event);
await subscription.ack(event);
} catch (error) {
await subscription.nack(PARK, error.toString(), event);
}
}
} catch (error) {
console.log(`Subscription was dropped. ${error}`);
}
const subscription =
client.subscribeToPersistentSubscriptionToStream<SomeEvent>(
STREAM_NAME,
GROUP_NAME
);
try {
for await (const event of subscription) {
try {
console.log(
`handling event ${event.event?.type} with retryCount ${event.retryCount}`
);
await handleEvent(event);
await subscription.ack(event);
} catch (error) {
await subscription.nack(PARK, error.toString(), event);
}
}
} catch (error) {
console.log(`Subscription was dropped. ${error}`);
}
client.subscribeToStream(
"test-stream",
"subscription-group",
new PersistentSubscriptionListener() {
@Override
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
System.out.println("Received event"
+ event.getOriginalEvent().getRevision()
+ "@" + event.getOriginalEvent().getStreamId());
}
@Override
public void onCancelled(PersistentSubscription subscription, Throwable exception) {
if (exception == null) {
System.out.println("Subscription is cancelled");
return;
}
System.out.println("Subscription was dropped due to " + exception.getMessage());
}
});
await using var subscription = client.SubscribeToStream(
"test-stream",
"subscription-group",
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
Console.WriteLine($"Subscription {subscriptionId} to stream started");
break;
case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
await HandleEvent(resolvedEvent);
await subscription.Ack(resolvedEvent);
break;
}
}
sub, err := client.SubscribeToPersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.SubscribeToPersistentSubscriptionOptions{})
if err != nil {
panic(err)
}
for {
event := sub.Recv()
if event.EventAppeared != nil {
sub.Ack(event.EventAppeared.Event)
}
if event.SubscriptionDropped != nil {
break
}
}
let mut sub = client
.subscribe_to_persistent_subscription(
"test-stream",
"subscription-group",
&Default::default(),
)
.await?;
loop {
let event = sub.next().await?;
// Doing some productive work with the event...
sub.ack(event).await?;
}
Parameter | Description |
---|---|
stream | The stream the persistent subscription is on. |
groupName | The name of the subscription group to subscribe to. |
eventAppeared | The action to call when an event arrives over the subscription. |
subscriptionDropped | The action to call if the subscription is dropped. |
credentials | The user credentials to use for this operation. |
bufferSize | The number of in-flight messages this client is allowed. Default: 10 |
autoAck | Whether to automatically acknowledge messages after eventAppeared returns. Default: true |
Warning
The autoAck
parameter will be deprecated in the next client release. You'll need to explicitly manage acknowledgements.
Connecting to $all
The code below shows how to connect to an existing subscription group for $all
:
while True:
subscription = client.read_subscription_to_all(
group_name=group_name,
)
try:
for event in subscription:
try:
handle_event(event)
except Exception:
subscription.nack(event, action="park")
else:
subscription.ack(event)
except ConsumerTooSlow:
# subscription was dropped
continue
const subscription =
client.subscribeToPersistentSubscriptionToAll(GROUP_NAME);
try {
for await (const event of subscription) {
console.log(
`handling event ${event.event?.type} with retryCount ${event.retryCount}`
);
await handleEvent(event);
await subscription.ack(event);
}
} catch (error) {
console.log(`Subscription was dropped. ${error}`);
}
const subscription =
client.subscribeToPersistentSubscriptionToAll(GROUP_NAME);
try {
for await (const event of subscription) {
console.log(
`handling event ${event.event?.type} with retryCount ${event.retryCount}`
);
await handleEvent(event);
await subscription.ack(event);
}
} catch (error) {
console.log(`Subscription was dropped. ${error}`);
}
client.subscribeToAll(
"subscription-group",
new PersistentSubscriptionListener() {
@Override
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
try {
System.out.println("Received event"
+ event.getOriginalEvent().getRevision()
+ "@" + event.getOriginalEvent().getStreamId());
subscription.ack(event);
}
catch (Exception ex) {
subscription.nack(NackAction.Park, ex.getMessage(), event);
}
}
public void onCancelled(PersistentSubscription subscription, Throwable exception) {
if (exception == null) {
System.out.println("Subscription is cancelled");
return;
}
System.out.println("Subscription was dropped due to " + exception.getMessage());
}
});
await using var subscription = client.SubscribeToAll(
"subscription-group",
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
Console.WriteLine($"Subscription {subscriptionId} to stream started");
break;
case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
await HandleEvent(resolvedEvent);
break;
}
}
sub, err := client.SubscribeToPersistentSubscriptionToAll(context.Background(), "subscription-group", esdb.SubscribeToPersistentSubscriptionOptions{})
if err != nil {
panic(err)
}
for {
event := sub.Recv()
if event.EventAppeared != nil {
sub.Ack(event.EventAppeared.Event)
}
if event.SubscriptionDropped != nil {
break
}
}
let mut sub = client
.subscribe_to_persistent_subscription_to_all("subscription-group", &Default::default())
.await?;
loop {
let event = sub.next().await?;
// Doing some productive work with the event...
sub.ack(event).await?;
}
The SubscribeToAllAsync
method is identical to the SubscribeToStreamAsync
method, except that you don't need to specify a stream name.
Acknowledgements
Clients must acknowledge (or not acknowledge) messages in the competing consumer model.
If processing is successful, you must send an Ack (acknowledge) to the server to let it know that the message has been handled. If processing fails for some reason, then you can Nack (not acknowledge) the message and tell the server how to handle the failure.
while True:
subscription = client.read_subscription_to_stream(
group_name=group_name,
stream_name=stream_name,
)
try:
for event in subscription:
try:
handle_event(event)
except Exception:
if event.retry_count < 5:
subscription.nack(event, action="retry")
else:
subscription.nack(event, action="park")
else:
subscription.ack(event)
except ConsumerTooSlow:
# subscription was dropped
continue
const subscription = client.subscribeToPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME
);
try {
for await (const event of subscription) {
try {
console.log(
`handling event ${event.event?.type} with retryCount ${event.retryCount}`
);
await handleEvent(event);
await subscription.ack(event);
} catch (error) {
await subscription.nack(PARK, error.toString(), event);
}
}
} catch (error) {
console.log(`Subscription was dropped. ${error}`);
}
const subscription =
client.subscribeToPersistentSubscriptionToStream<SomeEvent>(
STREAM_NAME,
GROUP_NAME
);
try {
for await (const event of subscription) {
try {
console.log(
`handling event ${event.event?.type} with retryCount ${event.retryCount}`
);
await handleEvent(event);
await subscription.ack(event);
} catch (error) {
await subscription.nack(PARK, error.toString(), event);
}
}
} catch (error) {
console.log(`Subscription was dropped. ${error}`);
}
client.subscribeToStream(
"test-stream",
"subscription-group",
new PersistentSubscriptionListener() {
@Override
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
try {
System.out.println("Received event"
+ event.getOriginalEvent().getRevision()
+ "@" + event.getOriginalEvent().getStreamId());
subscription.ack(event);
}
catch (Exception ex) {
subscription.nack(NackAction.Park, ex.getMessage(), event);
}
}
});
await using var subscription = client.SubscribeToStream(
"test-stream",
"subscription-group",
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
Console.WriteLine($"Subscription {subscriptionId} to stream with manual acks started");
break;
case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
try {
await HandleEvent(resolvedEvent);
await subscription.Ack(resolvedEvent);
} catch (UnrecoverableException ex) {
await subscription.Nack(PersistentSubscriptionNakEventAction.Park, ex.Message, resolvedEvent);
}
break;
}
}
sub, err := client.SubscribeToPersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.SubscribeToPersistentSubscriptionOptions{})
if err != nil {
panic(err)
}
for {
event := sub.Recv()
if event.EventAppeared != nil {
sub.Ack(event.EventAppeared.Event)
}
if event.SubscriptionDropped != nil {
break
}
}
let mut sub = client
.subscribe_to_persistent_subscription(
"test-stream",
"subscription-group",
&Default::default(),
)
.await?;
loop {
let event = sub.next().await?;
// Doing some productive work with the event...
sub.ack(event).await?;
}
The Nack event action describes what the server should do with the message:
Action | Description |
---|---|
Unknown | The client does not know what action to take. Let the server decide. |
Park | Park the message and do not resend. Put it on poison queue. |
Retry | Explicitly retry the message. |
Skip | Skip this message do not resend and do not put in poison queue. |
Consumer strategies
When creating a persistent subscription, you can choose between a number of consumer strategies.
RoundRobin (default)
Distributes events to all clients evenly. If the client bufferSize
is reached, the client won't receive more events until it acknowledges or not acknowledges events in its buffer.
This strategy provides equal load balancing between all consumers in the group.
DispatchToSingle
Distributes events to a single client until the bufferSize
is reached. After that, the next client is selected in a round-robin style, and the process repeats.
This option can be seen as a fall-back scenario for high availability, when a single consumer processes all the events until it reaches its maximum capacity. When that happens, another consumer takes the load to free up the main consumer resources.
Pinned
For use with an indexing projection such as the system $by_category
projection.
EventStoreDB inspects the event for its source stream id, hashing the id to one of 1024 buckets assigned to individual clients. When a client disconnects, its buckets are assigned to other clients. When a client connects, it is assigned some existing buckets. This naively attempts to maintain a balanced workload.
The main aim of this strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing. This is not a guarantee, and you should handle the usual ordering and concurrency issues.
Updating a subscription group
You can edit the settings of an existing subscription group while it is running, you don't need to delete and recreate it to change settings. When you update the subscription group, it resets itself internally, dropping the connections and having them reconnect. You must have admin permissions to update a persistent subscription group.
client.update_subscription_to_stream(
group_name=group_name,
stream_name=stream_name,
resolve_links=True,
)
await client.updatePersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults({
resolveLinkTos: true,
checkPointLowerBound: 20,
})
);
await client.updatePersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults({
resolveLinkTos: true,
checkPointLowerBound: 20,
})
);
client.updateToStream(
"test-stream",
"subscription-group",
UpdatePersistentSubscriptionToStreamOptions.get()
.resolveLinkTos()
.checkpointLowerBound(20));
var userCredentials = new UserCredentials("admin", "changeit");
var settings = new PersistentSubscriptionSettings(true, checkPointLowerBound: 20);
await client.UpdateToStreamAsync(
"test-stream",
"subscription-group",
settings,
userCredentials: userCredentials
);
Console.WriteLine("Subscription updated");
options := esdb.PersistentStreamSubscriptionOptions{
Settings: &esdb.PersistentSubscriptionSettings{
ResolveLinkTos: true,
CheckpointLowerBound: 20,
},
}
err := client.UpdatePersistentSubscription(context.Background(), "test-stream", "subscription-group", options)
if err != nil {
panic(err)
}
let options = PersistentSubscriptionOptions::default()
.resolve_link_tos(true)
.checkpoint_lower_bound(20);
client
.update_persistent_subscription("test-stream", "subscription-group", &options)
.await?;
Parameter | Description |
---|---|
stream | The stream the persistent subscription is on. |
groupName | The name of the subscription group to update. |
settings | The settings to use when creating the subscription. |
credentials | The user credentials to use for this operation. |
Persistent subscription settings
Both the Create
and Update
methods take some settings for configuring the persistent subscription.
The following table shows the configuration options you can set on a persistent subscription.
Option | Description | Default |
---|---|---|
ResolveLinkTos | Whether the subscription should resolve link events to their linked events. | false |
StartFrom | The exclusive position in the stream or transaction file the subscription should start from. | null (start from the end of the stream) |
ExtraStatistics | Whether to track latency statistics on this subscription. | false |
MessageTimeout | The amount of time after which to consider a message as timed out and retried. | 30 (seconds) |
MaxRetryCount | The maximum number of retries (due to timeout) before a message is considered to be parked. | 10 |
LiveBufferSize | The size of the buffer (in-memory) listening to live messages as they happen before paging occurs. | 500 |
ReadBatchSize | The number of events read at a time when paging through history. | 20 |
HistoryBufferSize | The number of events to cache when paging through history. | 500 |
CheckPointAfter | The amount of time to try to checkpoint after. | 2 seconds |
MinCheckPointCount | The minimum number of messages to process before a checkpoint may be written. | 10 |
MaxCheckPointCount | The maximum number of messages not checkpointed before forcing a checkpoint. | 1000 |
MaxSubscriberCount | The maximum number of subscribers allowed. | 0 (unbounded) |
NamedConsumerStrategy | The strategy to use for distributing events to client consumers. See the consumer strategies in this doc. | RoundRobin |
Deleting a subscription group
Remove a subscription group with the delete operation. Like the creation of groups, you rarely do this in your runtime code and is undertaken by an administrator running a script.
client.delete_subscription(
group_name=group_name,
stream_name=stream_name,
)
await client.deletePersistentSubscriptionToStream(STREAM_NAME, GROUP_NAME);
await client.deletePersistentSubscriptionToStream(STREAM_NAME, GROUP_NAME);
client.deleteToStream(
"test-stream",
"subscription-group");
try {
var userCredentials = new UserCredentials("admin", "changeit");
await client.DeleteToStreamAsync(
"test-stream",
"subscription-group",
userCredentials: userCredentials
);
Console.WriteLine("Subscription to stream deleted");
} catch (PersistentSubscriptionNotFoundException) {
// ignore
} catch (Exception ex) {
Console.WriteLine($"Subscription to stream delete error: {ex.GetType()} {ex.Message}");
}
err := client.DeletePersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.DeletePersistentSubscriptionOptions{})
if err != nil {
panic(err)
}
client
.delete_persistent_subscription("test-stream", "subscription-group", &Default::default())
.await?;
Parameter | Description |
---|---|
stream | The stream the persistent subscription is on. |
groupName | The name of the subscription group to delete. |
credentials | The user credentials to use for this operation |