Projections
Projections
This page provides an example of using user-defined projections in your application.
Adding sample data
Download the following files that contain sample data used throughout this step of the getting started guide.
- shoppingCart-b989fe21-9469-4017-8d71-9820b8dd1164.json
- shoppingCart-b989fe21-9469-4017-8d71-9820b8dd1165.json
- shoppingCart-b989fe21-9469-4017-8d71-9820b8dd1166.json
- shoppingCart-b989fe21-9469-4017-8d71-9820b8dd1167.json
Add the sample data to four different streams: First, we need a function to read JSON files and construct the list of EventData
instances:
public static List<EventData> ReadEvents(string filePath)
{
var events = JsonConvert.DeserializeObject<dynamic>(File.ReadAllText(filePath));
var eventData = new List<EventData>();
foreach (var @event in events)
{
var id = @event.eventId.ToString();
var eventType = @event.eventType.ToString();
eventData.Add(new EventData(Guid.Parse(id), eventType, true, Encoding.UTF8.GetBytes(@event.data.ToString()), null));
}
return eventData;
}
Then, we can use this function and push events to EventStoreDB:
foreach (var f in Directory.GetFiles("../", "shoppingCart-*"))
{
var streamName = Path.GetFileNameWithoutExtension(f);
var step3EventData = ReadEvents(f);
var eventData = step3EventData.ToArray();
await conn.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventData);
}
Creating your first projection
Next steps
Read this guide to find out more about the user defined projection's API.
The projection counts the number of 'XBox One S's that customers added to their shopping carts.
A projection starts with a selector, in this case fromAll()
. Another possibility is fromCategory({category})
which this step discusses later, but for now, fromAll
should do.
The second part of a projection is a set of filters. There is a special filter called $init
that sets up an initial state. You want to start a counter from 0 and each time EventStoreDB observes an ItemAdded
event for an 'Xbox One S,' increment the counter.
Here is the projection code:
fromAll()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
if(e.body.Description.indexOf("Xbox One S") >= 0){
s.count += 1;
}
}
})
You create a projection by calling the projection API and providing it with the definition of the projection. Here you decide how to run the projection, declaring that you want the projection to start from the beginning and keep running.
You can send the projection code as text along the other parameters, using the ProjectionsManager
instance:
var projectionsManager = new ProjectionsManager(
log: new ConsoleLogger(),
httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
operationTimeout: TimeSpan.FromMilliseconds(5000)
);
const string countItemsProjection = @"
fromAll().when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
if(e.body.Description.indexOf('Xbox One S') >= 0){
s.count += 1;
}
}
})";
await projectionsManager.CreateContinuousAsync("xbox-one-s-counter", countItemsProjection, adminCredentials);
Next steps
Read here for more information on creating projections with the .NET API and the parameters available, or our projections section for details on projection syntax.
Querying projection state
Now the projection is running, you can query the state of the projection. As this projection has a single state, query it with the following request:
var projectionState = await projectionsManager.GetStateAsync("xbox-one-s-counter", adminCredentials);
Console.WriteLine(projectionState);
Querying projection state by partition
You can partition the projection state to only include some events for aggregating the state rather than processing all the events. Querying with partitions because you have to specify the partition and the name of the stream.
var projectionState = await projectionsManager.GetPartitionStateAsync(
name: "shopping-cart-item-counter",
partitionId: "shoppingCart-b989fe21-9469-4017-8d71-9820b8dd1164",
userCredentials: adminCredentials
);
Console.WriteLine(projectionState);
The server then returns the state for the partition:
{
"count": 2
}
Emitting new events
The above gives you the correct result but requires you to poll for the state of a projection. What if you wanted EventStoreDB to notify you about state updates via subscriptions?
Output state
Update the projection to output the state to a stream by calling the outputState()
method on the projection which by default produces a $projections-{projection-name}-result
stream.
Below is the updated projection:
fromAll()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
if(e.body.Description.indexOf("Xbox One S") >= 0){
s.count += 1;
}
}
}).outputState()
To update the projection, edit the projection definition with the following request:
var projectionsManager = new ProjectionsManager(
log: new ConsoleLogger(),
httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
operationTimeout: TimeSpan.FromMilliseconds(5000)
);
const string countItemsProjectionUpdate = @"
fromAll()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
if(e.body.Description.indexOf('Xbox One S') >= 0){
s.count += 1;
}
}
}).outputState()";
await projection.UpdateQueryAsync("xbox-one-s-counter", countItemsProjectionUpdate, adminCredentials);
Then reset the projection you created above:
await projection.ResetAsync("xbox-one-s-counter", adminCredentials);
You should get a response similar to the one below:
You can now read the events in the result stream by issuing a read request.
var readEvents = await conn.ReadStreamEventsForwardAsync("$projections-xbox-one-s-counter-result", 0, 10, true);
foreach (var evt in readEvents.Events)
{
Console.WriteLine(Encoding.UTF8.GetString(evt.Event.Data));
}
Configuring projections
You can configure properties of the projection by updating values of the options
object. For example, the following projection changes the name of the results stream:
options({
resultStreamName: "xboxes"
})
fromAll()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
if(e.body.Description.indexOf("Xbox One S") >= 0){
s.count += 1;
}
}
}).outputState()
Then send the update to the projection:
var projectionsManager = new ProjectionsManager(
log: new ConsoleLogger(),
httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
operationTimeout: TimeSpan.FromMilliseconds(5000)
);
const string optionsProjectionOptionsUpdate = @"
options({ resultStreamName: 'xboxes' })
fromAll()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
if(e.body.Description.indexOf('Xbox One S') >= 0){
s.count += 1;
}
}
}).outputState()";
await projection.UpdateQueryAsync("xbox-one-s-counter", optionsProjectionOptionsUpdate, adminCredentials);
Tips
You can find all the options available in the user defined projections guide.
Now you can read the result as above, but use the new stream name:
readEvents = await conn.ReadStreamEventsForwardAsync("xboxes", 0, 10, true);
foreach (var evt in readEvents.Events)
{
Console.WriteLine(Encoding.UTF8.GetString(evt.Event.Data));
}
Example: number of items per shopping cart
The example in this step so far relied on a global state for the projection, but what if you wanted a count of the number of items in the shopping cart per shopping cart.
EventStoreDB has a built-in $by_category
projection that lets you select events from a particular list of streams. Enable this projection with the following command.
var projectionsManager = new ProjectionsManager(
log: new ConsoleLogger(),
httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
operationTimeout: TimeSpan.FromMilliseconds(5000)
);
await projectionsManager.EnableAsync("$by_category", adminCredentials);
The projection links events from existing streams to new streams by splitting the stream name by a separator. You can configure the projection to specify the position of where to split the stream id
and provide a separator.
By default, the category splits the stream id
by a dash. The category is the first string.
Stream Name | Category |
---|---|
shoppingCart-54 | shoppingCart |
shoppingCart-v1-54 | shoppingCart |
shoppingCart | No category as there is no separator |
You want to define a projection that produces a count per stream for a category, but the state needs to be per stream. To do so, use $by_category
and its fromCategory
API method.
Below is the projection:
fromCategory('shoppingCart')
.foreachStream()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
s.count += 1;
}
})
Create the projection with the following request:
const string itemCounterProjection = @"
fromCategory('shoppingCart')
.foreachStream()
.when({
$init: function(){
return {
count: 0
}
},
ItemAdded: function(s,e){
s.count += 1;
}
})";
await projectionsManager.CreateContinuousAsync(
name: "shopping-cart-item-counter",
query: itemCounterProjection,
trackEmittedStreams: true,
userCredentials: adminCredentials
);
Managing projections
The EventStoreDB Client API includes helper methods that use the HTTP API to allow you to manage projections. This document describes the methods found in the ProjectionsManager
class. All methods in this class are asynchronous.
Enable a projection
Enables an existing projection by name. You must have access to a projection to enable it.
Task EnableAsync(string name, UserCredentials userCredentials = null);
Disable a projection
Disables an existing projection by name. You must have access to a projection to disable it.
Task DisableAsync(string name, UserCredentials userCredentials = null);
Abort a projection
Aborts an existing projection by name. You must have access to a projection to abort it.
Task AbortAsync(string name, UserCredentials userCredentials = null);
Create a one-time projection
Creates a projection that runs until the end of the log and then stops. The query parameter contains the JavaScript you want created as a one time projection.
Task CreateOneTimeAsync(string query, UserCredentials userCredentials = null);
Create a continuous projection
Creates a projection that runs until the end of the log and then continues running. The query parameter contains the JavaScript you want created as a one time projection. Continuous projections have explicit names and you can enable or disable them via this name.
Task CreateContinuousAsync(
string name, string query, UserCredentials userCredentials = null
);
List all projections
Returns a list of all projections.
Task<List<ProjectionDetails>> ListAllAsync(UserCredentials userCredentials = null);
List one-time projections
Returns a list of all One-Time Projections.
Task<List<ProjectionDetails>> ListOneTimeAsync(UserCredentials userCredentials = null);
Get statistics on a projection
Returns the statistics associated with a named projection.
Task<string> GetStatisticsAsync(string name, UserCredentials userCredentials = null);
Delete projection
Deletes a named projection. You must have access to a projection to delete it.
Task DeleteAsync(string name, UserCredentials userCredentials = null);
Get state
Retrieves the state of a projection.
Task<string> GetState(string name, UserCredentials userCredentials = null);
Get partition state
Retrieves the state of the projection via the given partition.
Task<string> GetPartitionStateAsync(
string name, string partition, UserCredentials userCredentials = null
);
Get result
Retrieves the result of the projection.
Task<string> GetResult(string name, UserCredentials userCredentials = null);
Get partition result
Retrieves the result of the projection via the given partition.
Task<string> GetPartitionResultAsync(
string name, string partition, UserCredentials userCredentials = null
);