Projections

This page provides an example of using user-defined projections in your application.

Adding sample data

Downloadopen in new window the following files that contain sample data used throughout this step of the getting started guide.

Add the sample data to four different streams: First, we need a function to read JSON files and construct the list of EventData instances:

public static List<EventData> ReadEvents(string filePath)
{
    var events    = JsonConvert.DeserializeObject<dynamic>(File.ReadAllText(filePath));
    var eventData = new List<EventData>();

    foreach (var @event in events)
    {
        var id        = @event.eventId.ToString();
        var eventType = @event.eventType.ToString();

        eventData.Add(new EventData(Guid.Parse(id), eventType, true, Encoding.UTF8.GetBytes(@event.data.ToString()), null));
    }

    return eventData;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Then, we can use this function and push events to EventStoreDB:

foreach (var f in Directory.GetFiles("../", "shoppingCart-*"))
{
    var streamName     = Path.GetFileNameWithoutExtension(f);
    var step3EventData = ReadEvents(f);
    var eventData      = step3EventData.ToArray();
    await conn.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventData);
}
1
2
3
4
5
6
7

Creating your first projection

Next steps

Read this guide to find out more about the user defined projection's API.

The projection counts the number of 'XBox One S's that customers added to their shopping carts.

A projection starts with a selector, in this case fromAll(). Another possibility is fromCategory({category}) which this step discusses later, but for now, fromAll should do.

The second part of a projection is a set of filters. There is a special filter called $init that sets up an initial state. You want to start a counter from 0 and each time EventStoreDB observes an ItemAdded event for an 'Xbox One S,' increment the counter.

Here is the projection code:

File not found!

You create a projection by calling the projection API and providing it with the definition of the projection. Here you decide how to run the projection, declaring that you want the projection to start from the beginning and keep running.

You can send the projection code as text along the other parameters, using the ProjectionsManager instance:

var projectionsManager = new ProjectionsManager(
    log: new ConsoleLogger(),
    httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
    operationTimeout: TimeSpan.FromMilliseconds(5000)
);
1
2
3
4
5
const string countItemsProjection = @"
    fromAll().when({
        $init: function(){
            return {
                count: 0
            }
        },
        ItemAdded: function(s,e){
            if(e.body.Description.indexOf('Xbox One S') >= 0){
                s.count += 1;
            }
        }
    })";
await projectionsManager.CreateContinuousAsync("xbox-one-s-counter", countItemsProjection, adminCredentials);
1
2
3
4
5
6
7
8
9
10
11
12
13
14

Next steps

Read here for more information on creating projections with the .NET API and the parameters available, or our projections section for details on projection syntax.

Querying projection state

Now the projection is running, you can query the state of the projection. As this projection has a single state, query it with the following request:

var projectionState = await projectionsManager.GetStateAsync("xbox-one-s-counter", adminCredentials);
Console.WriteLine(projectionState);
1
2

Querying projection state by partition

You can partition the projection state to only include some events for aggregating the state rather than processing all the events. Querying with partitions because you have to specify the partition and the name of the stream.

var projectionState = await projectionsManager.GetPartitionStateAsync(
    name: "shopping-cart-item-counter",
    partitionId: "shoppingCart-b989fe21-9469-4017-8d71-9820b8dd1164",
    userCredentials: adminCredentials
);
Console.WriteLine(projectionState);
1
2
3
4
5
6

The server then returns the state for the partition:

File not found!

Emitting new events

The above gives you the correct result but requires you to poll for the state of a projection. What if you wanted EventStoreDB to notify you about state updates via subscriptions?

Output state

Update the projection to output the state to a stream by calling the outputState() method on the projection which by default produces a $projections-{projection-name}-result stream.

Below is the updated projection:

File not found!

To update the projection, edit the projection definition with the following request:

var projectionsManager = new ProjectionsManager(
    log: new ConsoleLogger(),
    httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
    operationTimeout: TimeSpan.FromMilliseconds(5000)
);
1
2
3
4
5
const string countItemsProjectionUpdate = @"
    fromAll()
        .when({
            $init: function(){
                return {
                    count: 0
                }
            },
        ItemAdded: function(s,e){
            if(e.body.Description.indexOf('Xbox One S') >= 0){
                s.count += 1;
            }
        }
    }).outputState()";

await projection.UpdateQueryAsync("xbox-one-s-counter", countItemsProjectionUpdate, adminCredentials);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Then reset the projection you created above:

await projection.ResetAsync("xbox-one-s-counter", adminCredentials);
1

You should get a response similar to the one below:

You can now read the events in the result stream by issuing a read request.

var readEvents = await conn.ReadStreamEventsForwardAsync("$projections-xbox-one-s-counter-result", 0, 10, true);

foreach (var evt in readEvents.Events)
{
    Console.WriteLine(Encoding.UTF8.GetString(evt.Event.Data));
}
1
2
3
4
5
6

Configuring projections

You can configure properties of the projection by updating values of the options object. For example, the following projection changes the name of the results stream:

File not found!

Then send the update to the projection:

var projectionsManager = new ProjectionsManager(
    log: new ConsoleLogger(),
    httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
    operationTimeout: TimeSpan.FromMilliseconds(5000)
);
1
2
3
4
5
const string optionsProjectionOptionsUpdate = @"
    options({ resultStreamName: 'xboxes' })
    fromAll()
        .when({
        $init: function(){
            return {
                count: 0
            }
        },
        ItemAdded: function(s,e){
            if(e.body.Description.indexOf('Xbox One S') >= 0){
                s.count += 1;
            }
        }
    }).outputState()";

await projection.UpdateQueryAsync("xbox-one-s-counter", optionsProjectionOptionsUpdate, adminCredentials);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

TIP

You can find all the options available in the user defined projections guide.

Now you can read the result as above, but use the new stream name:

readEvents = await conn.ReadStreamEventsForwardAsync("xboxes", 0, 10, true);

foreach (var evt in readEvents.Events)
{
    Console.WriteLine(Encoding.UTF8.GetString(evt.Event.Data));
}
1
2
3
4
5
6

Example: number of items per shopping cart

The example in this step so far relied on a global state for the projection, but what if you wanted a count of the number of items in the shopping cart per shopping cart.

EventStoreDB has a built-in $by_category projection that lets you select events from a particular list of streams. Enable this projection with the following command.

var projectionsManager = new ProjectionsManager(
    log: new ConsoleLogger(),
    httpEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
    operationTimeout: TimeSpan.FromMilliseconds(5000)
);
1
2
3
4
5
await projectionsManager.EnableAsync("$by_category", adminCredentials);
1

The projection links events from existing streams to new streams by splitting the stream name by a separator. You can configure the projection to specify the position of where to split the stream id and provide a separator.

By default, the category splits the stream id by a dash. The category is the first string.

Stream NameCategory
shoppingCart-54shoppingCart
shoppingCart-v1-54shoppingCart
shoppingCartNo category as there is no separator

You want to define a projection that produces a count per stream for a category, but the state needs to be per stream. To do so, use $by_category and its fromCategory API method.

Below is the projection:

File not found!

Create the projection with the following request:

const string itemCounterProjection = @"
    fromCategory('shoppingCart')
        .foreachStream()
        .when({
            $init: function(){
                return {
                    count: 0
                }
            },
            ItemAdded: function(s,e){
                s.count += 1;
            }
        })";

await projectionsManager.CreateContinuousAsync(
    name: "shopping-cart-item-counter",
    query: itemCounterProjection,
    trackEmittedStreams: true,
    userCredentials: adminCredentials
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

Managing projections

The EventStoreDB Client API includes helper methods that use the HTTP API to allow you to manage projections. This document describes the methods found in the ProjectionsManager class. All methods in this class are asynchronous.

Enable a projection

Enables an existing projection by name. You must have access to a projection to enable it.

Task EnableAsync(string name, UserCredentials userCredentials = null);
1

Disable a projection

Disables an existing projection by name. You must have access to a projection to disable it.

Task DisableAsync(string name, UserCredentials userCredentials = null);
1

Abort a projection

Aborts an existing projection by name. You must have access to a projection to abort it.

Task AbortAsync(string name, UserCredentials userCredentials = null);
1

Create a one-time projection

Creates a projection that runs until the end of the log and then stops. The query parameter contains the JavaScript you want created as a one time projection.

Task CreateOneTimeAsync(string query, UserCredentials userCredentials = null);
1

Create a continuous projection

Creates a projection that runs until the end of the log and then continues running. The query parameter contains the JavaScript you want created as a one time projection. Continuous projections have explicit names and you can enable or disable them via this name.

Task CreateContinuousAsync(
    string name, string query, UserCredentials userCredentials = null
);
1
2
3

List all projections

Returns a list of all projections.

Task<List<ProjectionDetails>> ListAllAsync(UserCredentials userCredentials = null);
1

List one-time projections

Returns a list of all One-Time Projections.

Task<List<ProjectionDetails>> ListOneTimeAsync(UserCredentials userCredentials = null);
1

Get statistics on a projection

Returns the statistics associated with a named projection.

Task<string> GetStatisticsAsync(string name, UserCredentials userCredentials = null);
1

Delete projection

Deletes a named projection. You must have access to a projection to delete it.

Task DeleteAsync(string name, UserCredentials userCredentials = null);
1

Get state

Retrieves the state of a projection.

Task<string> GetState(string name, UserCredentials userCredentials = null);
1

Get partition state

Retrieves the state of the projection via the given partition.

Task<string> GetPartitionStateAsync(
    string name, string partition, UserCredentials userCredentials = null
);
1
2
3

Get result

Retrieves the result of the projection.

Task<string> GetResult(string name, UserCredentials userCredentials = null);
1

Get partition result

Retrieves the result of the projection via the given partition.

Task<string> GetPartitionResultAsync(
    string name, string partition, UserCredentials userCredentials = null
);
1
2
3
Last Updated:
Contributors: Claude Devarenne, Renae Sowald