Appending events
When you start working with EventStoreDB, the database is empty. So, the first meaningful operation in this case would be to add one or more events to the database using one of the available client SDKs.
TIP
Check the Getting Started guide to learn how to configure and use the client SDK.
Append your first event
The simplest way to append an event to EventStoreDB is to create an EventData
object and call AppendToStream
method.
var eventData = new EventData(
Uuid.NewUuid(),
"some-event",
Encoding.UTF8.GetBytes("{\"id\": \"1\" \"value\": \"some value\"}")
);
await client.AppendToStreamAsync(
"some-stream",
StreamState.NoStream,
new List<EventData> {
eventData
});
2
3
4
5
6
7
8
9
10
11
12
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
}
result, err := db.AppendToStream(context.Background(), "some-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
EventData eventData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());
client.appendToStream("some-stream", options, eventData)
.get();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const event = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
});
2
3
4
5
6
7
8
9
10
11
12
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::NoStream);
let _ = client
.append_to_stream("some-stream", &options, event)
.await?;
2
3
4
5
6
7
8
9
10
11
type SomeEvent = JSONEventType<
"some-event",
{
id: string;
value: string;
}
>;
const event = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
As you can see, AppendToStream
takes a collection of EventData
, which makes possible saving more than one event in a single batch.
As well as the example above there is also a number of other options for dealing with different scenarios.
TIP
If you are new to Event Sourcing, please study the Handling concurrency section below.
Working with EventData
When appending events to EventStoreDB they must first all be wrapped in an EventData
object. This allows you to specify the content of the event, the type of event and whether its in Json format. In its simplest form you need to the three following arguments:
eventId
This takes the format of a Uuid
and is used to uniquely identify the event you are trying to append. If two events with the same Uuid
are appended to the same stream in quick succession EventStoreDB will only append one copy of the event to the stream.
For example, the following code will only append a single event:
var eventData = new EventData(
Uuid.NewUuid(),
"some-event",
Encoding.UTF8.GetBytes("{\"id\": \"1\" \"value\": \"some value\"}")
);
await client.AppendToStreamAsync(
"same-event-stream",
StreamState.Any,
new List<EventData> {
eventData
});
// attempt to append the same event again
await client.AppendToStreamAsync(
"same-event-stream",
StreamState.Any,
new List<EventData> {
eventData
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
id := uuid.Must(uuid.NewV4())
event := esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
EventID: id,
Data: bytes,
}
_, err = db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{}, event)
if err != nil {
panic(err)
}
// attempt to append the same event again
_, err = db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{}, event)
if err != nil {
panic(err)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
EventData eventData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.any());
client.appendToStream("same-event-stream", options, eventData)
.get();
// attempt to append the same event again
client.appendToStream("same-event-stream", options, eventData)
.get();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const event = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("same-event-stream", event);
// attempt to append the same event again
await client.appendToStream("same-event-stream", event);
2
3
4
5
6
7
8
9
10
11
12
13
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default();
let _ = client
.append_to_stream("same-event-stream", &options, event.clone())
.await?;
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const event = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("same-event-stream", event);
// attempt to append the same event again
await client.appendToStream("same-event-stream", event);
2
3
4
5
6
7
8
9
10
11
12
13
type
An event type should be supplied for each event. This is a unique string used to identify the type of event you are saving.
It is common to see the explicit event code type name used as the type as it makes serialising and de-serialising of the event easy. However, we recommend against this as it couples the storage to the type and will make it more difficult if you need to version the event at a later date.
data
Representation of your event data. It is recommended that you store your events as JSON objects as this will allow you to make use of all of EventStoreDB's functionality such as projections. Ultimately though, you can save it using whatever format you like as eventually, it will be stored as encoded bytes.
metadata
It is common to need to store additional information along side your event that is part of the event itself. This can be correlation Id's, timestamps, access information etc. EventStoreDB allows you to store a separate byte array containing this information to keep it separate.
isJson
Simple boolean field to tell EventStoreDB if the event is stored as json, true by default.
Handling concurrency
When appending events to a stream you can supply a stream state or stream revision. Your client can use this to tell EventStoreDB what state or version you expect the stream to be in when you append. If the stream isn't in that state then an exception will be thrown.
For example if we try and append the same record twice expecting both times that the stream doesn't exist we will get an exception on the second:
var eventDataOne = new EventData(
Uuid.NewUuid(),
"some-event",
Encoding.UTF8.GetBytes("{\"id\": \"1\" \"value\": \"some value\"}")
);
var eventDataTwo = new EventData(
Uuid.NewUuid(),
"some-event",
Encoding.UTF8.GetBytes("{\"id\": \"2\" \"value\": \"some other value\"}")
);
await client.AppendToStreamAsync(
"no-stream-stream",
StreamState.NoStream,
new List<EventData> {
eventDataOne
});
// attempt to append the same event again
await client.AppendToStreamAsync(
"no-stream-stream",
StreamState.NoStream,
new List<EventData> {
eventDataTwo
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
}
_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
if err != nil {
panic(err)
}
bytes, err = json.Marshal(TestEvent{
Id: "2",
ImportantData: "some other value",
})
if err != nil {
panic(err)
}
// attempt to append the same event again
_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
EventData eventDataOne = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
EventData eventDataTwo = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"2",
"some other value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());
client.appendToStream("no-stream-stream", options, eventDataOne)
.get();
// attempt to append the same event again
client.appendToStream("no-stream-stream", options, eventDataTwo)
.get();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const eventOne = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
const eventTwo = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some other value",
},
});
await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
});
// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::NoStream);
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
let data = TestEvent {
id: "2".to_string(),
important_data: "some other value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const eventOne = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
const eventTwo = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some other value",
},
});
await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
});
// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
There are three available stream states:
Any
NoStream
StreamExists
This check can be used to implement optimistic concurrency. When you retrieve a stream from EventStoreDB, you take note of the current version number, then when you save it back you can determine if somebody else has modified the record in the meantime.
var clientOneRead = client.ReadStreamAsync(
Direction.Forwards,
"concurrency-stream",
StreamPosition.Start);
var clientOneRevision = (await clientOneRead.LastAsync()).Event.EventNumber.ToUInt64();
var clientTwoRead = client.ReadStreamAsync(Direction.Forwards, "concurrency-stream", StreamPosition.Start);
var clientTwoRevision = (await clientTwoRead.LastAsync()).Event.EventNumber.ToUInt64();
var clientOneData = new EventData(
Uuid.NewUuid(),
"some-event",
Encoding.UTF8.GetBytes("{\"id\": \"1\" \"value\": \"clientOne\"}")
);
await client.AppendToStreamAsync(
"no-stream-stream",
clientOneRevision,
new List<EventData> {
clientOneData
});
var clientTwoData = new EventData(
Uuid.NewUuid(),
"some-event",
Encoding.UTF8.GetBytes("{\"id\": \"2\" \"value\": \"clientTwo\"}")
);
await client.AppendToStreamAsync(
"no-stream-stream",
clientTwoRevision,
new List<EventData> {
clientTwoData
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ropts := esdb.ReadStreamOptions{
Direction: esdb.Backwards,
From: esdb.End{},
}
stream, err := db.ReadStream(context.Background(), "concurrency-stream", ropts, 1)
if err != nil {
panic(err)
}
defer stream.Close()
lastEvent, err := stream.Recv()
if err != nil {
panic(err)
}
data := TestEvent{
Id: "1",
ImportantData: "clientOne",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
aopts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Revision(lastEvent.OriginalEvent().EventNumber),
}
_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
data = TestEvent{
Id: "1",
ImportantData: "clientTwo",
}
bytes, err = json.Marshal(data)
if err != nil {
panic(err)
}
_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
ReadStreamOptions readStreamOptions = ReadStreamOptions.get()
.forwards()
.fromStart();
ReadResult result = client.readStream("concurrency-stream", readStreamOptions)
.get();
EventData clientOneData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"clientOne"
))
.build();
EventData clientTwoData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"2",
"clientTwo"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(result.getLastStreamPosition());
client.appendToStream("concurrency-stream", options, clientOneData)
.get();
client.appendToStream("concurrency-stream", options, clientTwoData)
.get();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const events = client.readStream("concurrency-stream", {
fromRevision: START,
direction: FORWARDS,
});
let revision = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
const clientOneEvent = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
});
const clientTwoEvent = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
let options = ReadStreamOptions::default().position(StreamPosition::End);
let last_event = client
.read_stream("concurrency-stream", &options)
.await?
.next()
.await?
.expect("the stream to at least exist.");
let data = TestEvent {
id: "1".to_string(),
important_data: "clientOne".to_string(),
};
let event = EventData::json("some-event", data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::Exact(
last_event.get_original_event().revision,
));
let _ = client
.append_to_stream("concurrency-stream", &options, event)
.await?;
let data = TestEvent {
id: "2".to_string(),
important_data: "clientTwo".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let _ = client
.append_to_stream("concurrency-stream", &options, event)
.await?;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
const events = client.readStream<SomeEvent>("concurrency-stream", {
fromRevision: START,
direction: FORWARDS,
});
let revision: AppendExpectedRevision = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
const clientOneEvent = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
});
const clientTwoEvent = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
User credentials
You can provide user credentials to be used to append the data as follows. This will override the default credentials set on the connection.
await client.AppendToStreamAsync(
"some-stream",
StreamState.Any,
new[] { eventData },
userCredentials: new UserCredentials("admin", "changeit"),
cancellationToken: cancellationToken
);
2
3
4
5
6
7
credentials := &esdb.Credentials{Login: "admin", Password: "changeit"}
result, err := db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{Authenticated: credentials}, event)
2
3
UserCredentials credentials = new UserCredentials("admin", "changeit");
AppendToStreamOptions options = AppendToStreamOptions.get()
.authenticated(credentials);
client.appendToStream("some-stream", options, eventData)
.get();
2
3
4
5
6
7
const credentials = {
username: "admin",
password: "changeit",
};
await client.appendToStream("some-stream", event, {
credentials,
});
2
3
4
5
6
7
8
let options =
AppendToStreamOptions::default().authenticated(Credentials::new("admin", "changeit"));
let _ = client
.append_to_stream("some-stream", &options, event)
.await?;
2
3
4
5
6
const credentials = {
username: "admin",
password: "changeit",
};
await client.appendToStream("some-stream", event, {
credentials,
});
2
3
4
5
6
7
8