Reading events

There are two options for reading events from EventStoreDB. You can either read from an individual stream or you can read from the $all stream. The later will return all events in the store.

Each event in EventStoreDB belongs to an individual stream name. When reading events you can pick the name of the stream you want to read from. Then you can choose whether to read that stream forwards or backwards.

All events have a StreamPosition, which is the place of the event in the stream, represented by a big int (unsigned 64-bit integer) and a Position that is the events logical position that is represented by CommitPosition and a PreparePosition. This means that when reading events you have to supply a different "position" depending on if you are reading from a stream or the $all stream.

TIP

Check connecting to EventStoreDB instructions to learn how to configure and use the client SDK.

Reading from a stream

You can read events from individual streams, both all the events, or just a few events from the stream. You can start reading from any position in the stream, and read events both forwards and backwards. It's only possible to read events from one stream at a time. You ca also read events from the global event log, which spans across streams. Read more about it in the Read from $all section below.

Reading forwards

The simplest way to read a stream forwards is to supply a stream name, direction and revision to start from. This can either be a stream position Start or a big int (unsigned 64-bit integer):

var events = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    StreamPosition.Start);
1
2
3
4
options := esdb.ReadStreamOptions{
    From:      esdb.Start{},
    Direction: esdb.Forwards,
}
stream, err := db.ReadStream(context.Background(), "some-stream", options, 100)

if err != nil {
    panic(err)
}

defer stream.Close()
1
2
3
4
5
6
7
8
9
10
11
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromStart();

ReadResult result = client.readStream("some-stream", options)
        .get();

List<ResolvedEvent> events = result.getEvents();
1
2
3
4
5
6
7
8
const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  maxCount: 10,
});
1
2
3
4
5
let options = ReadStreamOptions::default()
    .position(StreamPosition::Start)
    .forwards();
let result = client.read_stream("some-stream", &options, All).await?;
1
2
3
4
const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  maxCount: 10,
});
1
2
3
4
5

This will return an enumerable that can be iterated on:

await foreach (var @event in events) {
    Console.WriteLine(Encoding.UTF8.GetString(@event.Event.Data.ToArray()));
}
1
2
3
for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
for (ResolvedEvent resolvedEvent : events) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
1
2
3
4
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
1
2
3
if let Some(mut events) = result.ok() {
    while let Some(event) = events.try_next().await? {
        let test_event = event.get_original_event().as_json::<TestEvent>()?;

        println!("Event> {:?}", test_event);
    }
}
1
2
3
4
5
6
7
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
1
2
3

There are a number of additional arguments you can provide when reading a stream, listed below.

maxCount

Passing in the max count allows you to limit the number of events returned.

resolveLinkTos

When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this value to true tells EventStoreDB to return the event as well as the event linking to it.

configureOperationOptions

You can use the configureOperationOptions argument to provide a function that will customise settings for each operation.

userCredentials

The userCredentials argument is optional. You can use it to override the default credentials specified when creating the client instance.

var result = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    StreamPosition.Start,
    userCredentials: new UserCredentials("admin", "changeit"),
    cancellationToken: cancellationToken);
1
2
3
4
5
6
options := esdb.ReadStreamOptions{
    From: esdb.Start{},
    Authenticated: &esdb.Credentials{
        Login:    "admin",
        Password: "changeit",
    },
}
stream, err := db.ReadStream(context.Background(), "some-stream", options, 100)
1
2
3
4
5
6
7
8
UserCredentials credentials = new UserCredentials("admin", "changeit");

ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromStart()
        .authenticated(credentials);

ReadResult result = client.readStream("some-stream", options)
        .get();
1
2
3
4
5
6
7
8
9
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  credentials,
  maxCount: 10,
});
1
2
3
4
5
6
7
8
9
10
11
let options = ReadStreamOptions::default()
    .position(StreamPosition::Start)
    .authenticated(Credentials::new("admin", "changeit"));

let result = client.read_stream("some-stream", &options, All).await?;
1
2
3
4
5
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  credentials,
  maxCount: 10,
});
1
2
3
4
5
6
7
8
9
10
11

Reading from a revision

Instead of providing the StreamPosition you can also provide a specific stream revision as a big int (unsigned 64-bit integer).

var events = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    revision: 10,
    maxCount: 20);
1
2
3
4
5
ropts := esdb.ReadStreamOptions{
    From: esdb.Revision(10),
}

stream, err := db.ReadStream(context.Background(), "some-stream", ropts, 20)

if err != nil {
    panic(err)
}

defer stream.Close()
1
2
3
4
5
6
7
8
9
10
11
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromRevision(10);

ReadResult result = client.readStream("some-stream", 20, options)
        .get();

List<ResolvedEvent> events = result.getEvents();
1
2
3
4
5
6
7
8
const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: BigInt(10),
  maxCount: 20,
});
1
2
3
4
5
let options = ReadStreamOptions::default().position(StreamPosition::Position(10));
let result = client.read_stream("some-stream", &options, 20).await?;
1
2
const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: BigInt(10),
  maxCount: 20,
});
1
2
3
4
5

Reading backwards

As well as being able to read a stream forwards you can also go backwards. When reading backwards, you need to set the stream position the end if you want to read all the events:

var events = client.ReadStreamAsync(
    Direction.Backwards,
    "some-stream",
    StreamPosition.End);

await foreach (var e in events) {
    Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
}
1
2
3
4
5
6
7
8
ropts := esdb.ReadStreamOptions{
    Direction: esdb.Backwards,
    From:      esdb.End{},
}

stream, err := db.ReadStream(context.Background(), "some-stream", ropts, 10)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReadStreamOptions options = ReadStreamOptions.get()
        .backwards()
        .fromEnd();

ReadResult result = client.readStream("some-stream", options)
        .get();

List<ResolvedEvent> events = result.getEvents();

for (ResolvedEvent resolvedEvent : events) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
const events = client.readStream("some-stream", {
  direction: BACKWARDS,
  fromRevision: END,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
1
2
3
4
5
6
7
8
9
let options = ReadStreamOptions::default()
    .position(StreamPosition::End)
    .backwards();
let result = client.read_stream("some-stream", &options, All).await?;

if let Some(mut events) = result.ok() {
    while let Some(event) = events.try_next().await? {
        let test_event = event.get_original_event().as_json::<TestEvent>()?;

        println!("Event> {:?}", test_event);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
const events = client.readStream<SomeEvent>("some-stream", {
  direction: BACKWARDS,
  fromRevision: END,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
1
2
3
4
5
6
7
8
9

TIP

You can use reading backwards to find the last position in the stream. Just read backwards one event and get the position.

Checking if the stream exists

Reading a stream returns a ReadStreamResult, which contains a property ReadState. This property can have the value StreamNotFound and Ok.

It is important to check the value of this field before attempting to iterate an empty stream as it will throw an exception.

For example:

var result = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    revision: 10,
    maxCount: 20);

if (await result.ReadState == ReadState.StreamNotFound) {
    return;
}

await foreach (var e in result) {
    Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
ropts := esdb.ReadStreamOptions{
    From: esdb.Revision(10),
}

stream, err := db.ReadStream(context.Background(), "some-stream", ropts, 100)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if errors.Is(err, esdb.ErrStreamNotFound) {
        fmt.Print("Stream not found")
    }

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromRevision(10);

List<ResolvedEvent> events = null;
try {
    ReadResult result = client.readStream("some-stream", 20, options)
            .get();
    events = result.getEvents();
} catch (ExecutionException e) {
    Throwable innerException = e.getCause();

    if (innerException instanceof StreamNotFoundException) {
        return;
    }
}

for (ResolvedEvent resolvedEvent : events) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

    const events = client.readStream("some-stream", {
      direction: FORWARDS,
      fromRevision: BigInt(10),
      maxCount: 20,
    });

    try {
      for await (const resolvedEvent of events) {
        console.log(resolvedEvent.event?.data);
      }
    } catch (error) {
      if (error instanceof StreamNotFoundError) {
        return;
      }

      throw error;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let options = ReadStreamOptions::default().position(StreamPosition::Position(10));

let result = client.read_stream("some-stream", &options, All).await?;

match result {
    ReadResult::Ok(mut events) => {
        while let Some(event) = events.try_next().await? {
            let test_event = event.get_original_event().as_json::<TestEvent>()?;

            println!("Event> {:?}", test_event);
        }
    }

    ReadResult::StreamNotFound(stream_name) => {
        println!("Stream not found: {}", stream_name);
    }

    ReadResult::StreamDeleted(stream_name) => {
        println!("Stream '{}' is deleted", stream_name);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

    const events = client.readStream<SomeEvent>("some-stream", {
      direction: FORWARDS,
      fromRevision: BigInt(10),
      maxCount: 20,
    });

    try {
      for await (const resolvedEvent of events) {
        console.log(resolvedEvent.event?.data);
      }
    } catch (error) {
      if (error instanceof StreamNotFoundError) {
        return;
      }

      throw error;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

Reading from the $all stream

Reading from the all stream is similar to reading from an individual stream but with some small differences. Primarily the need to provide an admin user account credentials and that you need to provide a transaction log position instead of a stream revision.

Reading forwards

The simplest way to read the $all stream forwards is to supply a direction and transaction log position to start from. This can either be a stream position Start or a big int (unsigned 64-bit integer):

var events = client.ReadAllAsync(
    Direction.Forwards, Position.Start);
1
2
options := esdb.ReadAllOptions{
    From:      esdb.Start{},
    Direction: esdb.Forwards,
}
stream, err := db.ReadAll(context.Background(), options, 100)

if err != nil {
    panic(err)
}

defer stream.Close()
1
2
3
4
5
6
7
8
9
10
11
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart();

ReadResult result = client.readAll(options)
        .get();

List<ResolvedEvent> events = result.getEvents();
1
2
3
4
5
6
7
8
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});
1
2
3
4
5
let options = ReadAllOptions::default()
    .position(StreamPosition::Start)
    .forwards();
let mut events = client.read_all(&Default::default(), All).await?;
1
2
3
4
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});
1
2
3
4
5

You can iterate asynchronously through the result:

await foreach (var e in events) {
    Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
}
1
2
3
for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
for (ResolvedEvent resolvedEvent : events) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
1
2
3
4
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
1
2
3
while let Some(event) = events.try_next().await? {
    println!("Event> {:?}", event.get_original_event());
}
1
2
3
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
1
2
3

There are a number of additional arguments you can provide when reading a stream.

maxCount

Passing in the max count allows you to limit the number of events that returned.

resolveLinkTos

When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this value to true will tell EventStoreDB to return the event as well as the event linking to it.

var result = client.ReadAllAsync(
    Direction.Forwards,
    Position.Start,
    resolveLinkTos: true,
    cancellationToken: cancellationToken);
1
2
3
4
5
ropts := esdb.ReadAllOptions{
    ResolveLinkTos: true,
}

stream, err := db.ReadAll(context.Background(), ropts, 100)
1
2
3
4
5
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart()
        .resolveLinkTos();

ReadResult result = client.readAll(options)
        .get();

List<ResolvedEvent> events = result.getEvents();
1
2
3
4
5
6
7
8
9
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  resolveLinkTos: true,
  maxCount: 10,
});
1
2
3
4
5
6
let options = ReadAllOptions::default().resolve_link_tos();
client.read_all(&options, All).await?;
1
2
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  resolveLinkTos: true,
  maxCount: 10,
});
1
2
3
4
5
6

configureOperationOptions

This argument is generic setting class for all operations that can be set on all operations executed against EventStoreDB.

userCredentials

The credentials used to read the data can be supplied. to be used by the subscription as follows. This will override the default credentials set on the connection.

var result = client.ReadAllAsync(
    Direction.Forwards,
    Position.Start,
    userCredentials: new UserCredentials("admin", "changeit"),
    cancellationToken: cancellationToken);
1
2
3
4
5
ropts := esdb.ReadAllOptions{
    From: esdb.Start{},
    Authenticated: &esdb.Credentials{
        Login:    "admin",
        Password: "changeit",
    },
}
stream, err := db.ReadAll(context.Background(), ropts, 100)
1
2
3
4
5
6
7
8
UserCredentials credentials = new UserCredentials("admin", "changeit");

ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart()
        .authenticated(credentials);

ReadResult result = client.readAll(options)
        .get();
1
2
3
4
5
6
7
8
9
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  credentials,
  maxCount: 10,
});
1
2
3
4
5
6
7
8
9
10
11
let options = ReadAllOptions::default()
    .authenticated(Credentials::new("admin", "changeit"))
    .position(StreamPosition::Position(Position {
        commit: 1_110,
        prepare: 1_110,
    }));
let events = client.read_all(&options, All).await?;
1
2
3
4
5
6
7
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  credentials,
  maxCount: 10,
});
1
2
3
4
5
6
7
8
9
10
11

Reading backwards

As well as being able to read a stream forwards you can also go backwards. When reading backwards is the position will have to be set to the end if you want to read all the events:

var events = client.ReadAllAsync(
    Direction.Backwards, Position.End);
1
2
ropts := esdb.ReadAllOptions{
    Direction: esdb.Backwards,
    From:      esdb.End{},
}

stream, err := db.ReadAll(context.Background(), ropts, 100)

if err != nil {
    panic(err)
}

defer stream.Close()
1
2
3
4
5
6
7
8
9
10
11
12
ReadAllOptions options = ReadAllOptions.get()
        .backwards()
        .fromEnd();

ReadResult result = client.readAll(options)
        .get();

List<ResolvedEvent> events = result.getEvents();
1
2
3
4
5
6
7
8
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  maxCount: 10,
});
1
2
3
4
5
let options = ReadAllOptions::default().position(StreamPosition::End);

let mut events = client.read_all(&options, All).await?;
1
2
3
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  maxCount: 10,
});
1
2
3
4
5

TIP

You can use reading backwards to find the last position in the stream. Just read backwards one event and get the position.

Handling system events

When reading from the all stream EventStoreDB will also return system events. In most cases you won't need to care about these events and they should be ignored.

All system events begin with $ or $ and can be easily ignored by checking the EventType property.

var events = client.ReadAllAsync(
    Direction.Forwards, Position.Start);

await foreach (var e in events) {
    if (e.Event.EventType.StartsWith("quot;)) {
        continue;
    }

    Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
}
1
2
3
4
5
6
7
8
9
10
stream, err := db.ReadAll(context.Background(), esdb.ReadAllOptions{}, 100)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)

    if strings.HasPrefix(event.OriginalEvent().EventType, "quot;) {
        continue
    }

    fmt.Printf("Event> %v", event)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart();

ReadResult result = client.readAll(options)
        .get();

List<ResolvedEvent> events = result.getEvents();

for (ResolvedEvent resolvedEvent : events) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    if (recordedEvent.getEventType().startsWith("quot;)) {
        continue;
    }
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  if (resolvedEvent.event?.type.startsWith("quot;)) {
    continue;
  }

  console.log(resolvedEvent.event?.type);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
let mut events = client.read_all(&Default::default(), All).await?;

while let Some(event) = events.try_next().await? {
    if event.get_original_event().event_type.starts_with("quot;) {
        continue;
    }

    println!("Event> {:?}", event.get_original_event());
}
1
2
3
4
5
6
7
8
9
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  if (resolvedEvent.event?.type.startsWith("quot;)) {
    continue;
  }

  console.log(resolvedEvent.event?.type);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
Last Updated: 10/14/2021, 6:49:49 AM
Contributors: Mathew McLoughlin, Oskar Dudycz