Appending events
Appending events
When you start working with EventStoreDB, it is empty. The first meaningful operation is to add one or more events to the database using one of the available client SDKs.
Tips
Check the Getting Started guide to learn how to configure and use the client SDK.
Append your first event
The simplest way to append an event to EventStoreDB is to create an EventData
object and call AppendToStream
method.
event1 = NewEvent(
type="some-event",
data=b'{"important_data": "some value"}',
)
commit_position = client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=[event1],
)
const event = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
});
type SomeEvent = JSONEventType<
"some-event",
{
id: string;
value: string;
}
>;
const event = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
});
EventData eventData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());
client.appendToStream("some-stream", options, eventData)
.get();
var eventData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"some value\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"some-stream",
StreamState.NoStream,
new List<EventData> {
eventData
}
);
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
}
result, err := db.AppendToStream(context.Background(), "some-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::NoStream);
let _ = client
.append_to_stream("some-stream", &options, event)
.await?;
AppendToStream
takes a collection of EventData
, which allows you to save more than one event in a single batch.
Outside the example above, other options exist for dealing with different scenarios.
Tips
If you are new to Event Sourcing, please study the Handling concurrency section below.
Working with EventData
Events appended to EventStoreDB must be wrapped in an EventData
object. This allows you to specify the event's content, the type of event, and whether it's in JSON format. In its simplest form, you need three arguments: eventId, type, and data.
eventId
This takes the format of a Uuid
and is used to uniquely identify the event you are trying to append. If two events with the same Uuid
are appended to the same stream in quick succession, EventStoreDB will only append one of the events to the stream.
For example, the following code will only append a single event:
event = NewEvent(
id=uuid4(),
type="some-event",
data=b'{"important_data": "some value"}',
metadata=b"{}",
content_type="application/json",
)
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.ANY,
events=event,
)
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.ANY,
events=event,
)
const event = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("same-event-stream", event);
// attempt to append the same event again
await client.appendToStream("same-event-stream", event);
const event = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("same-event-stream", event);
// attempt to append the same event again
await client.appendToStream("same-event-stream", event);
EventData eventData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.any());
client.appendToStream("same-event-stream", options, eventData)
.get();
// attempt to append the same event again
client.appendToStream("same-event-stream", options, eventData)
.get();
var eventData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"some value\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"same-event-stream",
StreamState.Any,
new List<EventData> {
eventData
}
);
// attempt to append the same event again
await client.AppendToStreamAsync(
"same-event-stream",
StreamState.Any,
new List<EventData> {
eventData
}
);
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
id := uuid.New()
event := esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
EventID: id,
Data: bytes,
}
_, err = db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{}, event)
if err != nil {
panic(err)
}
// attempt to append the same event again
_, err = db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{}, event)
if err != nil {
panic(err)
}
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default();
let _ = client
.append_to_stream("same-event-stream", &options, event.clone())
.await?;
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
type
Each event should be supplied with an event type. This unique string is used to identify the type of event you are saving.
It is common to see the explicit event code type name used as the type as it makes serialising and de-serialising of the event easy. However, we recommend against this as it couples the storage to the type and will make it more difficult if you need to version the event at a later date.
data
Representation of your event data. It is recommended that you store your events as JSON objects. This allows you to take advantage of all of EventStoreDB's functionality, such as projections. That said, you can save events using whatever format suits your workflow. Eventually, the data will be stored as encoded bytes.
metadata
Storing additional information alongside your event that is part of the event itself is standard practice. This can be correlation IDs, timestamps, access information, etc. EventStoreDB allows you to store a separate byte array containing this information to keep it separate.
isJson
Simple boolean field to tell EventStoreDB if the event is stored as json, true by default.
Handling concurrency
When appending events to a stream, you can supply a stream state or stream revision. Your client uses this to inform EventStoreDB of the state or version you expect the stream to be in when appending an event. If the stream isn't in that state, an exception will be thrown.
For example, if you try to append the same record twice, expecting both times that the stream doesn't exist, you will get an exception on the second:
event1 = NewEvent(
type="some-event",
data=b'{"important_data": "some value"}',
)
stream_name = str(uuid4())
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=event1,
)
event2 = NewEvent(
type="some-event",
data=b'{"important_data": "some other value"}',
)
try:
# attempt to append the same event again
client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=event2,
)
except exceptions.WrongCurrentVersion:
print("Error appending second event")
const eventOne = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
const eventTwo = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some other value",
},
});
await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
});
// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
});
const eventOne = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
const eventTwo = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some other value",
},
});
await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
});
// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
});
EventData eventDataOne = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"some value"
))
.build();
EventData eventDataTwo = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"2",
"some other value"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());
client.appendToStream("no-stream-stream", options, eventDataOne)
.get();
// attempt to append the same event again
client.appendToStream("no-stream-stream", options, eventDataTwo)
.get();
var eventDataOne = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"some value\"}"u8.ToArray()
);
var eventDataTwo = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"2\" \"value\": \"some other value\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"no-stream-stream",
StreamState.NoStream,
new List<EventData> {
eventDataOne
}
);
// attempt to append the same event again
await client.AppendToStreamAsync(
"no-stream-stream",
StreamState.NoStream,
new List<EventData> {
eventDataTwo
}
);
data := TestEvent{
Id: "1",
ImportantData: "some value",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
}
_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
if err != nil {
panic(err)
}
bytes, err = json.Marshal(TestEvent{
Id: "2",
ImportantData: "some other value",
})
if err != nil {
panic(err)
}
// attempt to append the same event again
_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
let data = TestEvent {
id: "1".to_string(),
important_data: "some value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::NoStream);
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
let data = TestEvent {
id: "2".to_string(),
important_data: "some other value".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let _ = client
.append_to_stream("same-event-stream", &options, event)
.await?;
There are three available stream states:
Any
NoStream
StreamExists
This check can be used to implement optimistic concurrency. When retrieving a stream from EventStoreDB, note the current version number. When you save it back, you can determine if somebody else has modified the record in the meantime.
original_version = StreamState.NO_STREAM
for event in client.read_stream(stream_name):
original_version = event.stream_position
event1 = NewEvent(
type="some-event",
data=b'{"important_data": "some value"}',
)
client.append_to_stream(
stream_name=stream_name,
current_version=original_version,
events=event1,
)
event2 = NewEvent(
type="some-event",
data=b'{"important_data": "some other value"}',
)
try:
client.append_to_stream(
stream_name=stream_name,
current_version=original_version,
events=event2,
)
except exceptions.WrongCurrentVersion:
print("Error appending event2")
const events = client.readStream("concurrency-stream", {
fromRevision: START,
direction: FORWARDS,
});
let revision = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
const clientOneEvent = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
});
const clientTwoEvent = jsonEvent({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
});
const events = client.readStream<SomeEvent>("concurrency-stream", {
fromRevision: START,
direction: FORWARDS,
});
let revision: AppendExpectedRevision = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
const clientOneEvent = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "1",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
});
const clientTwoEvent = jsonEvent<SomeEvent>({
id: uuid(),
type: "some-event",
data: {
id: "2",
value: "some value",
},
});
await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
});
ReadStreamOptions readStreamOptions = ReadStreamOptions.get()
.forwards()
.fromStart();
ReadResult result = client.readStream("concurrency-stream", readStreamOptions)
.get();
EventData clientOneData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"1",
"clientOne"
))
.build();
EventData clientTwoData = EventData
.builderAsJson(
UUID.randomUUID(),
"some-event",
new TestEvent(
"2",
"clientTwo"
))
.build();
AppendToStreamOptions options = AppendToStreamOptions.get()
.expectedRevision(result.getLastStreamPosition());
client.appendToStream("concurrency-stream", options, clientOneData)
.get();
client.appendToStream("concurrency-stream", options, clientTwoData)
.get();
var clientOneRead = client.ReadStreamAsync(
Direction.Forwards,
"concurrency-stream",
StreamPosition.Start
);
var clientOneRevision = (await clientOneRead.LastAsync()).Event.EventNumber.ToUInt64();
var clientTwoRead = client.ReadStreamAsync(Direction.Forwards, "concurrency-stream", StreamPosition.Start);
var clientTwoRevision = (await clientTwoRead.LastAsync()).Event.EventNumber.ToUInt64();
var clientOneData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"1\" \"value\": \"clientOne\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"no-stream-stream",
clientOneRevision,
new List<EventData> {
clientOneData
}
);
var clientTwoData = new EventData(
Uuid.NewUuid(),
"some-event",
"{\"id\": \"2\" \"value\": \"clientTwo\"}"u8.ToArray()
);
await client.AppendToStreamAsync(
"no-stream-stream",
clientTwoRevision,
new List<EventData> {
clientTwoData
}
);
ropts := esdb.ReadStreamOptions{
Direction: esdb.Backwards,
From: esdb.End{},
}
stream, err := db.ReadStream(context.Background(), "concurrency-stream", ropts, 1)
if err != nil {
panic(err)
}
defer stream.Close()
lastEvent, err := stream.Recv()
if err != nil {
panic(err)
}
data := TestEvent{
Id: "1",
ImportantData: "clientOne",
}
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
aopts := esdb.AppendToStreamOptions{
ExpectedRevision: lastEvent.OriginalStreamRevision(),
}
_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
data = TestEvent{
Id: "1",
ImportantData: "clientTwo",
}
bytes, err = json.Marshal(data)
if err != nil {
panic(err)
}
_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
ContentType: esdb.ContentTypeJson,
EventType: "some-event",
Data: bytes,
})
let options = ReadStreamOptions::default().position(StreamPosition::End);
let last_event = client
.read_stream("concurrency-stream", &options)
.await?
.next()
.await?
.expect("the stream to at least exist.");
let data = TestEvent {
id: "1".to_string(),
important_data: "clientOne".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let options = AppendToStreamOptions::default().expected_revision(ExpectedRevision::Exact(
last_event.get_original_event().revision,
));
let _ = client
.append_to_stream("concurrency-stream", &options, event)
.await?;
let data = TestEvent {
id: "2".to_string(),
important_data: "clientTwo".to_string(),
};
let event = EventData::json("some-event", &data)?.id(Uuid::new_v4());
let _ = client
.append_to_stream("concurrency-stream", &options, event)
.await?;
User credentials
You can provide user credentials to append the data as follows. This will override the default credentials set on the connection.
credentials = client.construct_call_credentials(
username="admin",
password="changeit",
)
commit_position = client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.ANY,
events=event,
credentials=credentials,
)
const credentials = {
username: "admin",
password: "changeit",
};
await client.appendToStream("some-stream", event, {
credentials,
});
const credentials = {
username: "admin",
password: "changeit",
};
await client.appendToStream("some-stream", event, {
credentials,
});
UserCredentials credentials = new UserCredentials("admin", "changeit");
AppendToStreamOptions options = AppendToStreamOptions.get()
.authenticated(credentials);
client.appendToStream("some-stream", options, eventData)
.get();
await client.AppendToStreamAsync(
"some-stream",
StreamState.Any,
new[] { eventData },
userCredentials: new UserCredentials("admin", "changeit"),
cancellationToken: cancellationToken
);
credentials := &esdb.Credentials{Login: "admin", Password: "changeit"}
result, err := db.AppendToStream(context.Background(), "some-stream", esdb.AppendToStreamOptions{Authenticated: credentials}, event)
let options =
AppendToStreamOptions::default().authenticated(Credentials::new("admin", "changeit"));
let _ = client
.append_to_stream("some-stream", &options, event)
.await?;