Persistent subscriptions

Persistent subscriptions are similar to catch-up subscriptions, but there are two key differences:

  • The subscription checkpoint is maintained by the server. It means that when your client reconnects to the persistent subscription, it will automatically resume from the last known position.
  • It's possible to connect more than one event consumer to the same persistent subscription. In that case, the server will load-balance the consumers, depending on the defined strategy, and distribute the events to them.

Because of those, persistent subscriptions are defined as subscription groups that are defined and maintained by the server. Consumer then connect to a particular subscription group, and the server starts sending event to the consumer.

You can read more about persistent subscriptions in the server documentation.

Creating a subscription group

The first step of dealing with a persistent subscription is to create a subscription group. You will receive an error if you attempt to create a subscription group multiple times. You must have admin permissions to create a persistent subscription group.

Subscribing to one stream

The following sample shows how to create a subscription group for a persistent subscription where you want to receive events from a specific stream. It could be a normal stream, or a stream of links (like $ce category stream).

client.create_subscription_to_stream(
    group_name="subscription-group",
    stream_name=stream_name,
)
1
2
3
4
await client.createPersistentSubscriptionToStream(
  STREAM_NAME,
  GROUP_NAME,
  persistentSubscriptionToStreamSettingsFromDefaults(),
  { credentials: { username: "admin", password: "changeit" } }
);
1
2
3
4
5
6
await client.createPersistentSubscriptionToStream(
  STREAM_NAME,
  GROUP_NAME,
  persistentSubscriptionToStreamSettingsFromDefaults(),
  { credentials: { username: "admin", password: "changeit" } }
);
1
2
3
4
5
6
client.createToStream(
    "test-stream",
    "subscription-group",
    CreatePersistentSubscriptionToStreamOptions.get()
        .fromStart());
1
2
3
4
5
var userCredentials = new UserCredentials("admin", "changeit");

var settings = new PersistentSubscriptionSettings();
await client.CreateToStreamAsync(
    "test-stream",
    "subscription-group",
    settings,
    userCredentials: userCredentials
);

Console.WriteLine("Subscription to stream created");
1
2
3
4
5
6
7
8
9
10
11
err := client.CreatePersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.PersistentStreamSubscriptionOptions{})

if err != nil {
    panic(err)
}
1
2
3
4
5
client
    .create_persistent_subscription("test-stream", "subscription-group", &Default::default())
    .await?;
1
2
3
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to create.
settingsThe settings to use when creating the subscription.
credentialsThe user credentials to use for this operation.

Subscribing to $all

The ability to subscribe to $all was introduced in EventStoreDB 21.10. Persistent subscriptions to $all also support filtering.

You can create a subscription group on $all much the same way you would create a subscription group on a stream:

client.create_subscription_to_all(
    group_name=group_name,
    filter_by_stream_name=True,
    filter_include=[r"user-.*"],
)
1
2
3
4
5
await client.createPersistentSubscriptionToAll(
  GROUP_NAME,
  persistentSubscriptionToAllSettingsFromDefaults(),
  {
    filter: streamNameFilter({ prefixes: ["test"] }),
    credentials: { username: "admin", password: "changeit" },
  }
);
1
2
3
4
5
6
7
8
await client.createPersistentSubscriptionToAll(
  GROUP_NAME,
  persistentSubscriptionToAllSettingsFromDefaults(),
  {
    filter: streamNameFilter({ prefixes: ["test"] }),
    credentials: { username: "admin", password: "changeit" },
  }
);
1
2
3
4
5
6
7
8
client.createToAll(
    "subscription-group",
    CreatePersistentSubscriptionToAllOptions.get()
        .fromStart());
1
2
3
4
var userCredentials = new UserCredentials("admin", "changeit");
var filter = StreamFilter.Prefix("test");

var settings = new PersistentSubscriptionSettings();
await client.CreateToAllAsync(
    "subscription-group",
    filter,
    settings,
    userCredentials: userCredentials
);

Console.WriteLine("Subscription to all created");
1
2
3
4
5
6
7
8
9
10
11
12
options := esdb.PersistentAllSubscriptionOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:     esdb.StreamFilterType,
        Prefixes: []string{"test"},
    },
}

err := client.CreatePersistentSubscriptionToAll(context.Background(), "subscription-group", options)

if err != nil {
    panic(err)
}
1
2
3
4
5
6
7
8
9
10
11
12
let options = PersistentSubscriptionToAllOptions::default()
    .filter(SubscriptionFilter::on_stream_name().add_prefix("test"));

client
    .create_persistent_subscription_to_all("subscription-group", &options)
    .await?;
1
2
3
4
5
6

Connecting a consumer

Once you have created a subscription group, clients can connect to it. A subscription in your application should only have the connection in your code, you should assume that the subscription already exists.

The most important parameter to pass when connecting is the buffer size. This represents how many outstanding messages the server should allow this client. If this number is too small, your subscription will spend much of its time idle as it waits for an acknowledgment to come back from the client. If it's too big, you waste resources and can start causing time out messages depending on the speed of your processing.

Connecting to one stream

The code below shows how to connect to an existing subscription group for a specific stream:

while True:
    subscription = client.read_subscription_to_stream(
        group_name="subscription-group",
        stream_name=stream_name,
    )
    try:
        for event in subscription:
            try:
                handle_event(event)
            except Exception:
                subscription.nack(event, action="park")
            else:
                subscription.ack(event)

    except ConsumerTooSlow:
        # subscription was dropped
        continue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const subscription = client.subscribeToPersistentSubscriptionToStream(
  STREAM_NAME,
  GROUP_NAME
);

try {
  for await (const event of subscription) {
    try {
      console.log(
        `handling event ${event.event?.type} with retryCount ${event.retryCount}`
      );

      await handleEvent(event);
      await subscription.ack(event);
    } catch (error) {
      await subscription.nack(PARK, error.toString(), event);
    }
  }
} catch (error) {
  console.log(`Subscription was dropped. ${error}`);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const subscription =
  client.subscribeToPersistentSubscriptionToStream<SomeEvent>(
    STREAM_NAME,
    GROUP_NAME
  );

try {
  for await (const event of subscription) {
    try {
      console.log(
        `handling event ${event.event?.type} with retryCount ${event.retryCount}`
      );
      await handleEvent(event);
      await subscription.ack(event);
    } catch (error) {
      await subscription.nack(PARK, error.toString(), event);
    }
  }
} catch (error) {
  console.log(`Subscription was dropped. ${error}`);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
client.subscribeToStream(
    "test-stream",
    "subscription-group",
    new PersistentSubscriptionListener() {
        @Override
        public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
            System.out.println("Received event"
                + event.getOriginalEvent().getRevision()
                + "@" + event.getOriginalEvent().getStreamId());
        }

        @Override
        public void onCancelled(PersistentSubscription subscription, Throwable exception) {
            if (exception == null) {
                System.out.println("Subscription is cancelled");
                return;
            }

            System.out.println("Subscription was dropped due to " + exception.getMessage());
        }
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
await using var subscription = client.SubscribeToStream(
    "test-stream",
    "subscription-group", 
    cancellationToken: ct);
await foreach (var message in subscription.Messages) {
    switch (message) {
        case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
            Console.WriteLine(quot;Subscription {subscriptionId} to stream started");
            break;
        case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
            await HandleEvent(resolvedEvent);
            await subscription.Ack(resolvedEvent);
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sub, err := client.SubscribeToPersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.SubscribeToPersistentSubscriptionOptions{})

if err != nil {
    panic(err)
}

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        sub.Ack(event.EventAppeared.Event)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let mut sub = client
    .subscribe_to_persistent_subscription(
        "test-stream",
        "subscription-group",
        &Default::default(),
    )
    .await?;

loop {
    let event = sub.next().await?;
    // Doing some productive work with the event...
    sub.ack(event).await?;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to subscribe to.
eventAppearedThe action to call when an event arrives over the subscription.
subscriptionDroppedThe action to call if the subscription is dropped.
credentialsThe user credentials to use for this operation.
bufferSizeThe number of in-flight messages this client is allowed. Default: 10
autoAckWhether to automatically acknowledge messages after eventAppeared returns. Default: true

WARNING

The autoAck parameter will be deprecated in the next client release. You'll need to explicitly manage acknowledgements.

Connecting to $all

The code below shows how to connect to an existing subscription group for $all:

while True:
    subscription = client.read_subscription_to_all(
        group_name=group_name,
    )
    try:
        for event in subscription:
            try:
                handle_event(event)
            except Exception:
                subscription.nack(event, action="park")
            else:
                subscription.ack(event)

    except ConsumerTooSlow:
        # subscription was dropped
        continue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const subscription =
  client.subscribeToPersistentSubscriptionToAll(GROUP_NAME);

try {
  for await (const event of subscription) {
    console.log(
      `handling event ${event.event?.type} with retryCount ${event.retryCount}`
    );

    await handleEvent(event);
    await subscription.ack(event);
  }
} catch (error) {
  console.log(`Subscription was dropped. ${error}`);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const subscription =
  client.subscribeToPersistentSubscriptionToAll(GROUP_NAME);

try {
  for await (const event of subscription) {
    console.log(
      `handling event ${event.event?.type} with retryCount ${event.retryCount}`
    );
    await handleEvent(event);
    await subscription.ack(event);
  }
} catch (error) {
  console.log(`Subscription was dropped. ${error}`);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
client.subscribeToAll(
    "subscription-group",
    new PersistentSubscriptionListener() {
        @Override
        public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
            try {
                System.out.println("Received event"
                    + event.getOriginalEvent().getRevision()
                    + "@" + event.getOriginalEvent().getStreamId());
                subscription.ack(event);
            }
            catch (Exception ex) {
                subscription.nack(NackAction.Park, ex.getMessage(), event);
            }
        }

        public void onCancelled(PersistentSubscription subscription, Throwable exception) {
            if (exception == null) {
                System.out.println("Subscription is cancelled");
                return;
            }

            System.out.println("Subscription was dropped due to " + exception.getMessage());
        }
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
await using var subscription = client.SubscribeToAll(
    "subscription-group",
    cancellationToken: ct);

await foreach (var message in subscription.Messages) {
    switch (message) {
        case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
            Console.WriteLine(quot;Subscription {subscriptionId} to stream started");
            break;
        case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
            await HandleEvent(resolvedEvent);
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
sub, err := client.SubscribeToPersistentSubscriptionToAll(context.Background(), "subscription-group", esdb.SubscribeToPersistentSubscriptionOptions{})

if err != nil {
    panic(err)
}

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        sub.Ack(event.EventAppeared.Event)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let mut sub = client
    .subscribe_to_persistent_subscription_to_all("subscription-group", &Default::default())
    .await?;

loop {
    let event = sub.next().await?;
    // Doing some productive work with the event...
    sub.ack(event).await?;
}
1
2
3
4
5
6
7
8
9

The SubscribeToAllAsync method is identical to the SubscribeToStreamAsync method, except that you don't need to specify a stream name.

Acknowledgements

Clients must acknowledge (or not acknowledge) messages in the competing consumer model.

If processing is successful, you must send an Ack (acknowledge) to the server to let it know that the message has been handled. If processing fails for some reason, then you can Nack (not acknowledge) the message and tell the server how to handle the failure.

while True:
    subscription = client.read_subscription_to_stream(
        group_name=group_name,
        stream_name=stream_name,
    )
    try:
        for event in subscription:
            try:
                handle_event(event)
            except Exception:
                if event.retry_count < 5:
                    subscription.nack(event, action="retry")
                else:
                    subscription.nack(event, action="park")
            else:
                subscription.ack(event)

    except ConsumerTooSlow:
        # subscription was dropped
        continue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const subscription = client.subscribeToPersistentSubscriptionToStream(
  STREAM_NAME,
  GROUP_NAME
);

try {
  for await (const event of subscription) {
    try {
      console.log(
        `handling event ${event.event?.type} with retryCount ${event.retryCount}`
      );

      await handleEvent(event);
      await subscription.ack(event);
    } catch (error) {
      await subscription.nack(PARK, error.toString(), event);
    }
  }
} catch (error) {
  console.log(`Subscription was dropped. ${error}`);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const subscription =
  client.subscribeToPersistentSubscriptionToStream<SomeEvent>(
    STREAM_NAME,
    GROUP_NAME
  );

try {
  for await (const event of subscription) {
    try {
      console.log(
        `handling event ${event.event?.type} with retryCount ${event.retryCount}`
      );
      await handleEvent(event);
      await subscription.ack(event);
    } catch (error) {
      await subscription.nack(PARK, error.toString(), event);
    }
  }
} catch (error) {
  console.log(`Subscription was dropped. ${error}`);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
client.subscribeToStream(
        "test-stream",
        "subscription-group",
        new PersistentSubscriptionListener() {
            @Override
            public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
                try {
                    System.out.println("Received event"
                            + event.getOriginalEvent().getRevision()
                            + "@" + event.getOriginalEvent().getStreamId());
                    subscription.ack(event);
                }
                catch (Exception ex) {
                    subscription.nack(NackAction.Park, ex.getMessage(), event);
                }
            }
        });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
await using var subscription = client.SubscribeToStream(
    "test-stream",
    "subscription-group",
    cancellationToken: ct);
await foreach (var message in subscription.Messages) {
    switch (message) {
        case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
            Console.WriteLine(quot;Subscription {subscriptionId} to stream with manual acks started");
            break;
        case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
            try {
                await HandleEvent(resolvedEvent);
                await subscription.Ack(resolvedEvent);
            } catch (UnrecoverableException ex) {
                await subscription.Nack(PersistentSubscriptionNakEventAction.Park, ex.Message, resolvedEvent);
            }
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
sub, err := client.SubscribeToPersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.SubscribeToPersistentSubscriptionOptions{})

if err != nil {
    panic(err)
}

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        sub.Ack(event.EventAppeared.Event)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let mut sub = client
    .subscribe_to_persistent_subscription(
        "test-stream",
        "subscription-group",
        &Default::default(),
    )
    .await?;

loop {
    let event = sub.next().await?;
    // Doing some productive work with the event...
    sub.ack(event).await?;
}
1
2
3
4
5
6
7
8
9
10
11
12
13

The Nack event action describes what the server should do with the message:

ActionDescription
UnknownThe client does not know what action to take. Let the server decide.
ParkPark the message and do not resend. Put it on poison queue.
RetryExplicitly retry the message.
SkipSkip this message do not resend and do not put in poison queue.

Consumer strategies

When creating a persistent subscription, you can choose between a number of consumer strategies.

RoundRobin (default)

Distributes events to all clients evenly. If the client bufferSize is reached, the client won't receive more events until it acknowledges or not acknowledges events in its buffer.

This strategy provides equal load balancing between all consumers in the group.

DispatchToSingle

Distributes events to a single client until the bufferSize is reached. After that, the next client is selected in a round-robin style, and the process repeats.

This option can be seen as a fall-back scenario for high availability, when a single consumer processes all the events until it reaches its maximum capacity. When that happens, another consumer takes the load to free up the main consumer resources.

Pinned

For use with an indexing projection such as the system $by_category projection.

EventStoreDB inspects the event for its source stream id, hashing the id to one of 1024 buckets assigned to individual clients. When a client disconnects, its buckets are assigned to other clients. When a client connects, it is assigned some existing buckets. This naively attempts to maintain a balanced workload.

The main aim of this strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing. This is not a guarantee, and you should handle the usual ordering and concurrency issues.

Updating a subscription group

You can edit the settings of an existing subscription group while it is running, you don't need to delete and recreate it to change settings. When you update the subscription group, it resets itself internally, dropping the connections and having them reconnect. You must have admin permissions to update a persistent subscription group.

client.update_subscription_to_stream(
    group_name=group_name,
    stream_name=stream_name,
    resolve_links=True,
)
1
2
3
4
5
await client.updatePersistentSubscriptionToStream(
  STREAM_NAME,
  GROUP_NAME,
  persistentSubscriptionToStreamSettingsFromDefaults({
    resolveLinkTos: true,
    checkPointLowerBound: 20,
  })
);
1
2
3
4
5
6
7
8
await client.updatePersistentSubscriptionToStream(
  STREAM_NAME,
  GROUP_NAME,
  persistentSubscriptionToStreamSettingsFromDefaults({
    resolveLinkTos: true,
    checkPointLowerBound: 20,
  })
);
1
2
3
4
5
6
7
8
client.updateToStream(
    "test-stream",
    "subscription-group",
    UpdatePersistentSubscriptionToStreamOptions.get()
        .resolveLinkTos()
        .checkpointLowerBound(20));
1
2
3
4
5
6
var userCredentials = new UserCredentials("admin", "changeit");
var settings = new PersistentSubscriptionSettings(true, checkPointLowerBound: 20);

await client.UpdateToStreamAsync(
    "test-stream",
    "subscription-group",
    settings,
    userCredentials: userCredentials
);

Console.WriteLine("Subscription updated");
1
2
3
4
5
6
7
8
9
10
11
options := esdb.PersistentStreamSubscriptionOptions{
    Settings: &esdb.PersistentSubscriptionSettings{
        ResolveLinkTos:       true,
        CheckpointLowerBound: 20,
    },
}

err := client.UpdatePersistentSubscription(context.Background(), "test-stream", "subscription-group", options)

if err != nil {
    panic(err)
}
1
2
3
4
5
6
7
8
9
10
11
12
let options = PersistentSubscriptionOptions::default()
    .resolve_link_tos(true)
    .checkpoint_lower_bound(20);

client
    .update_persistent_subscription("test-stream", "subscription-group", &options)
    .await?;
1
2
3
4
5
6
7
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to update.
settingsThe settings to use when creating the subscription.
credentialsThe user credentials to use for this operation.

Persistent subscription settings

Both the Create and Update methods take some settings for configuring the persistent subscription.

The following table shows the configuration options you can set on a persistent subscription.

OptionDescriptionDefault
ResolveLinkTosWhether the subscription should resolve link events to their linked events.false
StartFromThe exclusive position in the stream or transaction file the subscription should start from.null (start from the end of the stream)
ExtraStatisticsWhether to track latency statistics on this subscription.false
MessageTimeoutThe amount of time after which to consider a message as timed out and retried.30 (seconds)
MaxRetryCountThe maximum number of retries (due to timeout) before a message is considered to be parked.10
LiveBufferSizeThe size of the buffer (in-memory) listening to live messages as they happen before paging occurs.500
ReadBatchSizeThe number of events read at a time when paging through history.20
HistoryBufferSizeThe number of events to cache when paging through history.500
CheckPointAfterThe amount of time to try to checkpoint after.2 seconds
MinCheckPointCountThe minimum number of messages to process before a checkpoint may be written.10
MaxCheckPointCountThe maximum number of messages not checkpointed before forcing a checkpoint.1000
MaxSubscriberCountThe maximum number of subscribers allowed.0 (unbounded)
NamedConsumerStrategyThe strategy to use for distributing events to client consumers. See the consumer strategies in this doc.RoundRobin

Deleting a subscription group

Remove a subscription group with the delete operation. Like the creation of groups, you rarely do this in your runtime code and is undertaken by an administrator running a script.

client.delete_subscription(
    group_name=group_name,
    stream_name=stream_name,
)
1
2
3
4
await client.deletePersistentSubscriptionToStream(STREAM_NAME, GROUP_NAME);
1
await client.deletePersistentSubscriptionToStream(STREAM_NAME, GROUP_NAME);
1
client.deleteToStream(
    "test-stream",
    "subscription-group");
1
2
3
try {
    var userCredentials = new UserCredentials("admin", "changeit");
    await client.DeleteToStreamAsync(
        "test-stream",
        "subscription-group",
        userCredentials: userCredentials
    );

    Console.WriteLine("Subscription to stream deleted");
} catch (PersistentSubscriptionNotFoundException) {
    // ignore
} catch (Exception ex) {
    Console.WriteLine(quot;Subscription to stream delete error: {ex.GetType()} {ex.Message}");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
err := client.DeletePersistentSubscription(context.Background(), "test-stream", "subscription-group", esdb.DeletePersistentSubscriptionOptions{})

if err != nil {
    panic(err)
}
1
2
3
4
5
client
    .delete_persistent_subscription("test-stream", "subscription-group", &Default::default())
    .await?;
1
2
3
ParameterDescription
streamThe stream the persistent subscription is on.
groupNameThe name of the subscription group to delete.
credentialsThe user credentials to use for this operation
Last Updated:
Contributors: Alexey Zimarev, Oskar Dudycz, Claude Devarenne, Hayley Campbell, Mathew McLoughlin