Catch-up subscriptions

Subscriptions allow you to subscribe to a stream and receive notifications about new events added to the stream.

You provide an event handler and an optional starting point to the subscription. The handler is called for each event from the starting point onward.

If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.

TIP

Check the Getting Started guide to learn how to configure and use the client SDK.

Subscribing from the start

When you need to process all the events in the store, including historical events, you'd need to subscribe from the beginning. You can either subscribe to receive events from a single stream, or subscribe to $all if you need to process all events in the database.

Subscribing to a stream

The simplest stream subscription looks like the following :

await client.SubscribeToStreamAsync("some-stream",
    FromStream.Start,
    async (subscription, evnt, cancellationToken) => {
        Console.WriteLine(quot;Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
        await HandleEvent(evnt);
    });
1
2
3
4
5
6
stream, err := db.SubscribeToStream(context.Background(), "some-stream", esdb.SubscribeToStreamOptions{})

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event := stream.Recv()

    if event.EventAppeared != nil {
        // handles the event...
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SubscriptionListener listener = new SubscriptionListener() {
    @Override
    public void onEvent(Subscription subscription, ResolvedEvent event) {
        System.out.println("Received event"
                + event.getOriginalEvent().getRevision()
                + "@" + event.getOriginalEvent().getStreamId());
        HandleEvent(event);
    }
};
client.subscribeToStream("some-stream", listener);
1
2
3
4
5
6
7
8
9
10
const subscription = client.subscribeToStream("some-stream");

for await (const resolvedEvent of subscription) {
  console.log(
    `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
  );

  await handleEvent(resolvedEvent);
}
1
2
3
4
5
6
7
8
9
let mut stream = client
    .subscribe_to_stream("some-stream", &Default::default())
    .await;

loop {
    let event = stream.next().await?;
    // Handles the event...
}
1
2
3
4
5
6
7
8
const subscription =
  client.subscribeToStream<SomeStreamEvents>("some-stream");

for await (const resolvedEvent of subscription) {
  console.log(
    `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
  );
  await handleEvent(resolvedEvent);
}
1
2
3
4
5
6
7
8
9

The provided handler will be called for every event in the stream.

When you subscribe to a stream with link events, for example the $ce category stream, you need to set resolveLinkTos to true. Read more about it below.

Subscribing to $all

Subscribing to $all is much the same as subscribing to a single stream. The handler will be called for every event appended after the starting position.

await client.SubscribeToAllAsync(
    FromAll.Start, 
    async (subscription, evnt, cancellationToken) => {
        Console.WriteLine(quot;Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
        await HandleEvent(evnt);
    });
1
2
3
4
5
6
stream, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{})

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event := stream.Recv()

    if event.EventAppeared != nil {
        // handles the event...
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SubscriptionListener listener = new SubscriptionListener() {
    @Override
    public void onEvent(Subscription subscription, ResolvedEvent event) {
        System.out.println("Received event"
                + event.getOriginalEvent().getRevision()
                + "@" + event.getOriginalEvent().getStreamId());
        HandleEvent(event);
    }
};
client.subscribeToAll(listener);
1
2
3
4
5
6
7
8
9
10
const subscription = client.subscribeToAll();

for await (const resolvedEvent of subscription) {
  console.log(
    `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
  );

  await handleEvent(resolvedEvent);
}
1
2
3
4
5
6
7
8
9
let mut stream = client.subscribe_to_all(&Default::default()).await;

loop {
    let event = stream.next().await?;
    // Handles the event...
}
1
2
3
4
5
6
const subscription = client.subscribeToAll();

for await (const resolvedEvent of subscription) {
  console.log(
    `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
  );
  await handleEvent(resolvedEvent);
}
1
2
3
4
5
6
7
8

Subscribing from a specific position

The previous examples will subscribe to the stream from the beginning. This will end up calling the handler for every event in the stream and then wait for new events after that.

Both the stream and $all subscriptions accept a starting position if you want to read from a specific point onward. If events already exist at the position you subscribe to, they will be read on the server side and sent to the subscription.

Once caught up, the sever will push any new events received on the streams to the client. There is no difference between catching up and live on the client side.

WARNING

The positions provided to the subscriptions are exclusive. You will only receive the next event after the subscribed position.

Subscribing to a stream

To subscribe to a stream from a specific position, you need to provide a stream position. This can be Start, End or a big int (unsigned 64 bit integer) position.

The following subscribes to the stream some-stream at position 20, this means that events 21 and onward will be handled:

await client.SubscribeToStreamAsync(
    "some-stream",
    FromStream.After(StreamPosition.FromInt64(20)),
    EventAppeared);
1
2
3
4
db.SubscribeToStream(context.Background(), "some-stream", esdb.SubscribeToStreamOptions{
    From: esdb.Revision(20),
})
1
2
3
client.subscribeToStream(
        "some-stream",
        listener,
        SubscribeToStreamOptions.get()
                .fromRevision(20)
);
1
2
3
4
5
6
const subscription = client.subscribeToStream("some-stream", {
  fromRevision: BigInt(20),
});
1
2
3
let options = SubscribeToStreamOptions::default().start_from(StreamPosition::Position(20));

client.subscribe_to_stream("some-stream", &options).await;
1
2
3
const subscription = client.subscribeToStream<SomeStreamEvents>(
  "some-stream",
  {
    fromRevision: BigInt(20),
  }
);
1
2
3
4
5
6

Subscribing to $all

Subscribing to the $all stream is much like subscribing to a regular stream. The only difference is how you need to specify the stream position. For the $all stream, you have to provide a Position structure instead, which consists of two big integers - prepare and commit positions. The Position value can be Start, End or a Position created from a commit and prepare position.

The corresponding $all subscription will subscribe from the event after the one at commit position 1056 and prepare position 1056.

Please note that this position will need to be a legitimate position in $all.

var result = await client.AppendToStreamAsync("subscribe-to-all-from-position", StreamState.NoStream, new[] {
    new EventData(Uuid.NewUuid(), "-", ReadOnlyMemory<byte>.Empty)
});

await client.SubscribeToAllAsync(
    FromAll.After(result.LogPosition),
    EventAppeared);
1
2
3
4
5
6
7
db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    From: esdb.Position{
        Commit:  1_056,
        Prepare: 1_056,
    },
})
1
2
3
4
5
6
client.subscribeToAll(
        listener,
        SubscribeToAllOptions.get()
                .fromPosition(new Position(1056, 1056))
);
1
2
3
4
5
const subscription = client.subscribeToAll({
  fromPosition: {
    commit: BigInt(1056),
    prepare: BigInt(1056),
  },
});
1
2
3
4
5
6
let options = SubscribeToAllOptions::default().position(StreamPosition::Position(Position {
    commit: 1_056,
    prepare: 1_056,
}));

client.subscribe_to_all(&options).await;
1
2
3
4
5
6
const subscription = client.subscribeToAll({
  fromPosition: {
    commit: BigInt(1056),
    prepare: BigInt(1056),
  },
});
1
2
3
4
5
6

Subscribing to a stream for live updates

You can subscribe to a stream to get live updates by subscribing to the end of the stream:

await client.SubscribeToStreamAsync(
    "some-stream",
    FromStream.End,
    EventAppeared);
1
2
3
4
options = esdb.SubscribeToStreamOptions{
    From: esdb.End{},
}

db.SubscribeToStream(context.Background(), "some-stream", options)
1
2
3
4
5
client.subscribeToStream(
        "some-stream",
        listener,
        SubscribeToStreamOptions.get()
                .fromEnd()
);
1
2
3
4
5
6
const subscription = client.subscribeToStream("some-stream", {
  fromRevision: END,
});
1
2
3
let options = SubscribeToStreamOptions::default().start_from(StreamPosition::End);
client.subscribe_to_stream("some-stream", &options).await;
1
2
const subscription = client.subscribeToStream<SomeStreamEvents>(
  "some-stream",
  {
    fromRevision: END,
  }
);
1
2
3
4
5
6

And the same works with $all :

await client.SubscribeToAllAsync(
    FromAll.End, 
    EventAppeared);
1
2
3
db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    From: esdb.End{},
})
1
2
3
client.subscribeToAll(
        listener,
        SubscribeToAllOptions.get()
                .fromEnd()
);
1
2
3
4
5
const subscription = client.subscribeToAll({
  fromPosition: END,
});
1
2
3
let options = SubscribeToAllOptions::default().position(StreamPosition::End);
client.subscribe_to_all(&options).await;
1
2
const subscription = client.subscribeToAll({
  fromPosition: END,
});
1
2
3

This won't read through the history of the stream, but will rather notify the handler when a new event appears in the respective stream.

Keep in mind that when you subscribe to a stream from a certain position, as described above, you will also get live updates after your subscription catches up (processes all the historical events).

Link-to events point to events in other streams in EventStoreDB. These are generally created by projections such as the $by_event_type projection which links events of the same event type into the same stream. This makes it easier to look up all events of a certain type.

TIP

Filtered subscriptions make it easier and faster to subscribe to all events of a certain type or matching a prefix.

When reading a stream you can specify whether to resolve link-to's or not. By default, link-to events are not resolved. You can change this behaviour by setting the resolveLinkTos parameter to true:

await client.SubscribeToStreamAsync(
    "$et-myEventType",
    FromStream.Start,
    EventAppeared,
    resolveLinkTos: true);
1
2
3
4
5
options = esdb.SubscribeToStreamOptions{
    From:           esdb.Start{},
    ResolveLinkTos: true,
}

db.SubscribeToStream(context.Background(), "$et-myEventType", options)
1
2
3
4
5
6
client.subscribeToStream(
        "$et-myEventType",
        listener,
        SubscribeToStreamOptions.get()
                .fromStart()
                .resolveLinkTos()
);
1
2
3
4
5
6
7
const subscription = client.subscribeToStream("$et-myEventType", {
  fromRevision: START,
  resolveLinkTos: true,
});
1
2
3
4
let options = SubscribeToStreamOptions::default()
    .start_from(StreamPosition::Start)
    .resolve_link_tos();

client
    .subscribe_to_stream("$et-myEventType", &options)
    .await;
1
2
3
4
5
6
7
const subscription = client.subscribeToStream<SomeStreamEvents>(
  "$et-myEventType",
  {
    fromRevision: START,
    resolveLinkTos: true,
  }
);
1
2
3
4
5
6
7

Dropped subscriptions

When a subscription stops or experiences an error, it will be dropped. The subscription provides a subscriptionDropped callback, which will get called when the subscription breaks.

The subscriptionDropped callback allows you to inspect the reason why the subscription dropped, as well as any exceptions that occurred.

The possible reasons for a subscription to drop are:

ReasonWhy it might happen
DisposedThe subscription got cancelled or disposed by the client.
SubscriberErrorAn error occurred while handling an event in the subscription handler.
ServerErrorAn error occurred on the server, and the server closed the subscription. Check the server logs for more information.

Bear in mind that a subscription can also drop because it is slow. The server tried to push all the live events to the subscription when it is in the live processing mode. If the subscription gets the reading buffer overflow and won't be able to acknowledge the buffer, it will break.

Handling subscription drops

An application, which hosts the subscription, can go offline for a period of time for different reasons. It could be a crash, infrastructure failure, or a new version deployment. As you rarely would want to reprocess all the events again, you'd need to store the current position of the subscription somewhere, and then use it to restore the subscription from the point where it dropped off:

var checkpoint = await ReadStreamCheckpointAsync();
await client.SubscribeToStreamAsync(
    "some-stream",
    checkpoint is null ? FromStream.Start : FromStream.After(checkpoint.Value),
    eventAppeared: async (subscription, evnt, cancellationToken) => {
        await HandleEvent(evnt);
        checkpoint = evnt.OriginalEventNumber;
    },
    subscriptionDropped: ((subscription, reason, exception) => {
        Console.WriteLine(quot;Subscription was dropped due to {reason}. {exception}");
        if (reason != SubscriptionDroppedReason.Disposed) {
            // Resubscribe if the client didn't stop the subscription
            Resubscribe(checkpoint);
        }
    }));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
options = esdb.SubscribeToStreamOptions{
    From: esdb.Start{},
}

for {

    stream, err := db.SubscribeToStream(context.Background(), "some-stream", options)

    if err != nil {
        time.Sleep(1 * time.Second)
        continue
    }

    for {
        event := stream.Recv()

        if event.SubscriptionDropped != nil {
            stream.Close()
            break
        }

        if event.EventAppeared != nil {
            // handles the event...
            options.From = esdb.Revision(event.EventAppeared.OriginalEvent().EventNumber)
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
client.subscribeToStream(
        "some-stream",
        new SubscriptionListener() {
            StreamPosition<Long> checkpoint = StreamPosition.start();
            @Override
            public void onEvent(Subscription subscription, ResolvedEvent event) {
                HandleEvent(event);
                checkpoint = StreamPosition.position(event.getOriginalEvent().getRevision());
            }

            @Override
            public void onError(Subscription subscription, Throwable throwable) {
                System.out.println("Subscription was dropped due to " + throwable.getMessage());
                Resubscribe(checkpoint);
            }
        },
        SubscribeToStreamOptions.get()
                .fromStart()
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
let checkpoint = START;

const subscription = client
  .subscribeToStream("some-stream", {
    fromRevision: checkpoint,
  })
  .on("data", (resolvedEvent) => {
    handleEvent(resolvedEvent);
    checkpoint = resolvedEvent.event?.revision ?? checkpoint;
  });
1
2
3
4
5
6
7
8
9
10
let retry = RetryOptions::default().retry_forever();
let options = SubscribeToStreamOptions::default().retry_options(retry);
let mut stream = client.subscribe_to_stream("some-stream", &options).await;

loop {
    let event = stream.next().await?;
    // Handles the event...
}
1
2
3
4
5
6
7
8
let checkpoint: ReadRevision = START;

const subscription = client
  .subscribeToStream<SomeStreamEvents>("some-stream", {
    fromRevision: checkpoint,
  })
  .on("data", (resolvedEvent) => {
    handleEvent(resolvedEvent);
    checkpoint = resolvedEvent.event?.revision ?? checkpoint;
  });
1
2
3
4
5
6
7
8
9
10

When subscribed to $all you want to keep the position of the event in the $all stream. As mentioned previously, the $all stream position consists of two big integers (prepare and commit positions), not one:

var checkpoint = await ReadCheckpointAsync();
await client.SubscribeToAllAsync(
    checkpoint is null ? FromAll.Start : FromAll.After(checkpoint.Value),
    eventAppeared: async (subscription, evnt, cancellationToken) => {
        await HandleEvent(evnt);
        checkpoint = evnt.OriginalPosition!.Value;
    },
    subscriptionDropped: ((subscription, reason, exception) => {
        Console.WriteLine(quot;Subscription was dropped due to {reason}. {exception}");
        if (reason != SubscriptionDroppedReason.Disposed) {
            // Resubscribe if the client didn't stop the subscription
            Resubscribe(checkpoint);
        }
    }));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
options = esdb.SubscribeToAllOptions{
    From: esdb.Start{},
}

for {
    stream, err := db.SubscribeToAll(context.Background(), options)

    if err != nil {
        time.Sleep(1 * time.Second)
        continue
    }

    for {
        event := stream.Recv()

        if event.SubscriptionDropped != nil {
            stream.Close()
            break
        }

        if event.EventAppeared != nil {
            // handles the event...
            options.From = event.EventAppeared.OriginalEvent().Position
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
client.subscribeToAll(
        new SubscriptionListener() {
            StreamPosition<Position> checkpoint = StreamPosition.start();
            @Override
            public void onEvent(Subscription subscription, ResolvedEvent event) {
                HandleEvent(event);
                checkpoint = StreamPosition.position(event.getOriginalEvent().getPosition());
            }

            @Override
            public void onError(Subscription subscription, Throwable throwable) {
                System.out.println("Subscription was dropped due to " + throwable.getMessage());
                Resubscribe(checkpoint);
            }
        },
        SubscribeToAllOptions.get()
                .fromStart()
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let checkpoint = START;

const subscription = client
  .subscribeToAll({
    fromPosition: checkpoint,
  })
  .on("data", (resolvedEvent) => {
    handleEvent(resolvedEvent);
    checkpoint = resolvedEvent.event?.position ?? checkpoint;
  });
1
2
3
4
5
6
7
8
9
10
let retry = RetryOptions::default().retry_forever();
let options = SubscribeToAllOptions::default().retry_options(retry);
let mut stream = client.subscribe_to_all(&options).await;

loop {
    let event = stream.next().await?;
    // Handles the event...
}
1
2
3
4
5
6
7
8
let checkpoint: ReadPosition = START;

const subscription = client
  .subscribeToAll({
    fromPosition: checkpoint,
  })
  .on("data", (resolvedEvent) => {
    handleEvent(resolvedEvent);
    checkpoint = resolvedEvent.event?.position ?? checkpoint;
  });
1
2
3
4
5
6
7
8
9
10

User credentials

The user creating a subscription must have read access to the stream it's subscribing to, and only admin users may subscribe to $all or create filtered subscriptions.

The code below shows how you can provide user credentials for a subscription. When you specify subscription credentials explicitly, it will override the default credentials set for the client. If you don't specify any credentials, the client will use the credentials specified for the client, if you specified those.

await client.SubscribeToAllAsync(
    FromAll.Start, 
    EventAppeared,
    userCredentials: new UserCredentials("admin", "changeit"));
1
2
3
4
db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Authenticated: &esdb.Credentials{
        Login:    "admin",
        Password: "changeit",
    },
})
1
2
3
4
5
6
UserCredentials credentials = new UserCredentials("admin", "changeit");

SubscribeToAllOptions options = SubscribeToAllOptions.get()
        .authenticated(credentials);

client.subscribeToAll(
        listener,
        options);
1
2
3
4
5
6
7
8
const subscription = client.subscribeToStream("some-stream", {
  credentials: {
    username: "admin",
    password: "changeit",
  },
});
1
2
3
4
5
6
let options =
    SubscribeToAllOptions::default().authenticated(Credentials::new("admin", "changeit"));
client.subscribe_to_all(&options).await;
1
2
3
const subscription = client.subscribeToStream<SomeStreamEvents>(
  "some-stream",
  {
    credentials: {
      username: "admin",
      password: "changeit",
    },
  }
);
1
2
3
4
5
6
7
8
9

Server-side filtering

EventStoreDB allows you to filter the events whilst you subscribe to the $all stream so that you only receive the events that you care about.

You can filter by event type or stream name using either a regular expression or a prefix. Server-side filtering is currently only available on the $all stream.

TIP

Server-side filtering introduced as a simpler alternative to projections. Before creating a projection to get the events you care about you should first consider filtering.

A simple stream prefix filter looks like this:

var prefixStreamFilter = new SubscriptionFilterOptions(StreamFilter.Prefix("test-", "other-"));
await client.SubscribeToAllAsync(
    FromAll.Start, 
    EventAppeared,
    filterOptions: prefixStreamFilter);
1
2
3
4
5
db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:     esdb.StreamFilterType,
        Prefixes: []string{"test-"},
    },
})
1
2
3
4
5
6
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .addStreamNamePrefix("test-")
        .build();

SubscribeToAllOptions options = SubscribeToAllOptions.get()
        .filter(filter);

client.subscribeToAll(
        listener,
        options);
1
2
3
4
5
6
7
8
9
10
const subscription = client.subscribeToAll({
  filter: streamNameFilter({ prefixes: ["test-", "other-"] }),
});
1
2
3
let filter = SubscriptionFilter::on_stream_name().add_prefix("test-");
let options = SubscribeToAllOptions::default().filter(filter);

client.subscribe_to_all(&options).await;
1
2
3
4
const subscription = client.subscribeToAll({
  filter: streamNameFilter({ prefixes: ["test-", "other-"] }),
});
1
2
3

The filtering API is described more in-depth in the filtering section.

Filtering out system events

There are a number of events in EventStoreDB called system events. These are prefixed with a $ and under most circumstances you won't care about these. They can be filtered out by passing in a SubscriptionFilterOptions when subscribing to the $all stream.

await client.SubscribeToAllAsync(FromAll.Start,
    (s, e, c) => {
        Console.WriteLine(
            quot;{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
        return Task.CompletedTask;
    },
    filterOptions: new SubscriptionFilterOptions(
        EventTypeFilter.ExcludeSystemEvents())
);
1
2
3
4
5
6
7
8
9
sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: esdb.ExcludeSystemEventsFilter(),
})

if err != nil {
    panic(err)
}

defer sub.Close()

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        streamId := event.EventAppeared.OriginalEvent().StreamID
        revision := event.EventAppeared.OriginalEvent().EventNumber

        fmt.Printf("received event %v@%v", revision, streamId)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SubscriptionListener listener = new SubscriptionListener() {
    @Override
    public void onEvent(Subscription subscription, ResolvedEvent event) {
        System.out.println("Received event"
                + event.getOriginalEvent().getRevision()
                + "@" + event.getOriginalEvent().getStreamId());
    }
};
String excludeSystemEventsRegex = "^[^\\$].*";

SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .withEventTypeRegularExpression(excludeSystemEventsRegex)
        .build();

SubscribeToAllOptions options = SubscribeToAllOptions.get()
        .filter(filter);

client.subscribeToAll(
        listener,
        options
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const subscription = client
  .subscribeToAll({
    fromPosition: START,
    filter: excludeSystemEvents(),
  })
  .on("data", (resolvedEvent) => {
    console.log(
      `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
    );
  });
1
2
3
4
5
6
7
8
9
10
let filter = SubscriptionFilter::on_event_type().exclude_system_events();
let options = SubscribeToAllOptions::default().filter(filter);

let mut sub = client.subscribe_to_all(&options).await;

loop {
    let event = sub.next().await?;
    let stream_id = event.get_original_stream_id();
    let revision = event.get_original_event().revision;

    println!("Received event {}@{}", revision, stream_id);
}
1
2
3
4
5
6
7
8
9
10
11
12
const subscription = client
  .subscribeToAll({
    fromPosition: START,
    filter: excludeSystemEvents(),
  })
  .on("data", (resolvedEvent) => {
    console.log(
      `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
    );
  });
1
2
3
4
5
6
7
8
9
10

TIP

$stats events are no longer stored in EventStoreDB by default so there won't be as many $ events as before.

Filtering by event type

If you only want to subscribe to events of a given type there are two options. You can either use a regular expression or a prefix.

Filtering by prefix

If you want to filter by prefix pass in a SubscriptionFilterOptions to the subscription with an EventTypeFilter.Prefix.

var filter = new SubscriptionFilterOptions(
    EventTypeFilter.Prefix("customer-"));
1
2
sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:     esdb.EventFilterType,
        Prefixes: []string{"customer-"},
    },
})

if err != nil {
    panic(err)
}

defer sub.Close()

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        streamId := event.EventAppeared.OriginalEvent().StreamID
        revision := event.EventAppeared.OriginalEvent().EventNumber

        fmt.Printf("received event %v@%v", revision, streamId)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .addEventTypePrefix("customer-")
        .build();
1
2
3
const filter = eventTypeFilter({
  prefixes: ["customer-"],
});
1
2
3
let filter = SubscriptionFilter::on_event_type().add_prefix("customer-");
let options = SubscribeToAllOptions::default().filter(filter);

let mut sub = client.subscribe_to_all(&options).await;
1
2
3
4
const filter = eventTypeFilter({
  prefixes: ["customer-"],
});
1
2
3

This will only subscribe to events with a type that begin with customer-.

Filtering by regular expression

If you want to subscribe to multiple event types then it might be better to provide a regular expression.

var filter = new SubscriptionFilterOptions(
    EventTypeFilter.RegularExpression("^user|^company"));
1
2
sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:  esdb.EventFilterType,
        Regex: "^user|^company",
    },
})

if err != nil {
    panic(err)
}

defer sub.Close()

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        streamId := event.EventAppeared.OriginalEvent().StreamID
        revision := event.EventAppeared.OriginalEvent().EventNumber

        fmt.Printf("received event %v@%v", revision, streamId)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .withEventTypeRegularExpression("^user|^company")
        .build();
1
2
3
const filter = eventTypeFilter({
  regex: "^user|^company",
});
1
2
3
let filter = SubscriptionFilter::on_event_type().regex("^user|^company");
let options = SubscribeToAllOptions::default().filter(filter);

let mut sub = client.subscribe_to_all(&options).await;
1
2
3
4
const filter = eventTypeFilter({
  regex: "^user|^company",
});
1
2
3

This will subscribe to any event that begins with user or company.

Filtering by stream name

If you only want to subscribe to a stream with a given name there are two options. You can either use a regular expression or a prefix.

Filtering by prefix

If you want to filter by prefix pass in a SubscriptionFilterOptions to the subscription with an StreamFilter.Prefix.

var filter = new SubscriptionFilterOptions(
    StreamFilter.Prefix("user-"));
1
2
sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:     esdb.StreamFilterType,
        Prefixes: []string{"user-"},
    },
})

if err != nil {
    panic(err)
}

defer sub.Close()

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        streamId := event.EventAppeared.OriginalEvent().StreamID
        revision := event.EventAppeared.OriginalEvent().EventNumber

        fmt.Printf("received event %v@%v", revision, streamId)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .addStreamNamePrefix("user-")
        .build();
1
2
3
const filter = streamNameFilter({
  prefixes: ["user-"],
});
1
2
3
let filter = SubscriptionFilter::on_stream_name().add_prefix("user-");
let options = SubscribeToAllOptions::default().filter(filter);

let mut sub = client.subscribe_to_all(&options).await;
1
2
3
4
const filter = streamNameFilter({
  prefixes: ["user-"],
});
1
2
3

This will only subscribe to all streams with a name that begin with user-.

Filtering by regular expression

If you want to subscribe to multiple streams then it might be better to provide a regular expression.

var filter = new SubscriptionFilterOptions(
    StreamFilter.RegularExpression("^account|^savings"));
1
2
sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:  esdb.StreamFilterType,
        Regex: "^user|^company",
    },
})

if err != nil {
    panic(err)
}

defer sub.Close()

for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        streamId := event.EventAppeared.OriginalEvent().StreamID
        revision := event.EventAppeared.OriginalEvent().EventNumber

        fmt.Printf("received event %v@%v", revision, streamId)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .withStreamNameRegularExpression("^account|^savings")
        .build();
1
2
3
const filter = streamNameFilter({
  regex: "^account|^savings",
});
1
2
3
let filter = SubscriptionFilter::on_event_type().regex("/^[^\\$].*/");
let options = SubscribeToAllOptions::default().filter(filter);

let mut sub = client.subscribe_to_all(&options).await;
1
2
3
4
const filter = streamNameFilter({
  regex: "^account|^savings",
});
1
2
3

This will subscribe to any stream with a name that begins with account or savings.

Checkpointing

There is one thing to consider with server-side filtering, and that is when events that match your filter are few and far between. In this scenario, you might find yourself in the situation where EventStoreDB has searched through 1 million events, and the last thing you want to happen is for the server to get to event 900k and then have your client crash. It won't have been able to take a checkpoint and upon a restart, you'd have to go back to the beginning and start again.

In this case you can make use of an additional delegate that will be triggered every n number of events (32 by default).

To make use of it set up checkpointReached on the SubscriptionFilterOptions class.

var filter = new SubscriptionFilterOptions(
    EventTypeFilter.ExcludeSystemEvents(),
    checkpointReached: (s, p, c) =>
    {
        Console.WriteLine(quot;checkpoint taken at {p.PreparePosition}");
        return Task.CompletedTask;
    });
1
2
3
4
5
6
7
for {
    event := sub.Recv()

    if event.EventAppeared != nil {
        streamId := event.EventAppeared.OriginalEvent().StreamID
        revision := event.EventAppeared.OriginalEvent().EventNumber

        fmt.Printf("received event %v@%v", revision, streamId)
    }

    if event.CheckPointReached != nil {
        fmt.Printf("checkpoint taken at %v", event.CheckPointReached.Prepare)
    }

    if event.SubscriptionDropped != nil {
        break
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
String excludeSystemEventsRegex = "/^[^\\$].*/";

SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .withEventTypeRegularExpression(excludeSystemEventsRegex)
        .withCheckpointer(
                new Checkpointer() {
                    @Override
                    public CompletableFuture<Void> onCheckpoint(Subscription subscription, Position position) {
                        System.out.println("checkpoint taken at {p.PreparePosition}");
                        return CompletableFuture.completedFuture(null);
                    }
                })
        .build();
1
2
3
4
5
6
7
8
9
10
11
12
13
excludeSystemEvents({
  async checkpointReached(_subscription, position) {
    // The subscription will wait until the promise is resolved
    await doSomethingAsync();
    console.log(`checkpoint taken at ${position.prepare}`);
  },
});
1
2
3
4
5
6
7
loop {
    let event = sub.next_subscription_event().await?;
    match event {
        SubscriptionEvent::EventAppeared(event) => {
            let stream_id = event.get_original_stream_id();
            let revision = event.get_original_event().revision;

            println!("Received event {}@{}", revision, stream_id);
        }

        SubscriptionEvent::Checkpoint(position) => {
            println!("checkpoint taken at {}", position.prepare);
        }

        _ => {}
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
excludeSystemEvents({
  async checkpointReached(_subscription, position) {
    // The subscription will wait until the promise is resolved
    await doSomethingAsync();
    console.log(`checkpoint taken at ${position.prepare}`);
  },
});
1
2
3
4
5
6
7

This will be called every n number of events. If you want to be specific about the number of events threshold you can also pass that as a parameter.

var filter = new SubscriptionFilterOptions(
    EventTypeFilter.ExcludeSystemEvents(),
    checkpointInterval: 1000,
    checkpointReached: (s, p, c) =>
    {
        Console.WriteLine(quot;checkpoint taken at {p.PreparePosition}");
        return Task.CompletedTask;
    });
1
2
3
4
5
6
7
8
sub, err := db.SubscribeToAll(context.Background(), esdb.SubscribeToAllOptions{
    Filter: &esdb.SubscriptionFilter{
        Type:  esdb.EventFilterType,
        Regex: "/^[^\\$].*/",
    },
})
1
2
3
4
5
6
String excludeSystemEventsRegex = "/^[^\\$].*/";

SubscriptionFilter filter = SubscriptionFilter.newBuilder()
        .withEventTypeRegularExpression(excludeSystemEventsRegex)
        .withCheckpointer(
                new Checkpointer() {
                    @Override
                    public CompletableFuture<Void> onCheckpoint(Subscription subscription, Position position) {
                        System.out.println("checkpoint taken at {p.PreparePosition}");
                        return CompletableFuture.completedFuture(null);
                    }
                },
                1000)
        .build();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
const filter = eventTypeFilter({
  regex: "^[^$].*",
  checkpointInterval: 1000,
  checkpointReached(_subscription, position) {
    console.log(`checkpoint taken at ${position.prepare}`);
  },
});
1
2
3
4
5
6
7
let filter = SubscriptionFilter::on_event_type().regex("/^[^\\$].*/");
let options = SubscribeToAllOptions::default().filter(filter);

let mut sub = client.subscribe_to_all(&options).await;
1
2
3
4
const filter = eventTypeFilter({
  regex: "^[^$].*",
  checkpointInterval: 1000,
  checkpointReached(_subscription, position) {
    console.log(`checkpoint taken at ${position.prepare}`);
  },
});
1
2
3
4
5
6
7

WARNING

This number will be called every n * 32 events.

Last Updated:
Contributors: Alexey Zimarev, Oskar Dudycz, Mathew McLoughlin