Projection management

The various gRPC client APIs include dedicated clients that allow you to manage projections.

WARNING

Currently not all clients fully expose all operations.

For a detailed explanation of projections, see the server documentation.

You can find the full sample code from these documentation on the respective clients repositoriesopen in new window

Required packages

Install the client SDK package to your project.

dotnet add package EventStore.Client.Grpc.ProjectionManagement --version 21.*
1
# Maven
<dependency>
  <groupId>com.eventstore</groupId>
  <artifactId>db-client-java</artifactId>
  <version>1.0.0</version>
</dependency>

# Gradle
implementation 'com.eventstore:db-client-java:1.0.0'

# SBT
libraryDependencies += "com.eventstore" % "db-client-java" % "1.0.0"
1
2
3
4
5
6
7
8
9
10
11
12
# Yarn
$ yarn add @eventstore/db-client

# NPM
$ npm install --save @eventstore/db-client
1
2
3
4
5
No additional configuration is needed having Rust installed. Go check https://rustup.rs.
1
# Yarn
$ yarn add @eventstore/db-client

# NPM
$ npm install --save @eventstore/db-client
1
2
3
4
5

Creating a client

Projection management operations are exposed through a dedicated client.

var settings = EventStoreClientSettings.Create(connection);
settings.ConnectionName = "Projection management client";
settings.DefaultCredentials = new UserCredentials("admin", "changeit");
var managementClient = new EventStoreProjectionManagementClient(settings);
1
2
3
4
Sample available soon
1
const client = EventStoreDBClient.connectionString`
    esdb+discover://${ADMIN}:${PASSWORD}@${ENDPOINT}?nodePreference=leader
`;
1
2
3
Sample available soon
1
const client = EventStoreDBClient.connectionString`
    esdb+discover://${ADMIN}:${PASSWORD}@${ENDPOINT}?nodePreference=leader
`;
1
2
3

Create a projection

Creates a projection that runs until the last event in the store, and then continues processing new events as they are appended to the store. The query parameter contains the JavaScript you want created as a projection. Projections have explicit names, and you can enable or disable them via this name.

const string js = @"fromAll()
                    .when({
                        $init: function() {
                            return {
                                count: 0
                            };
                        },
                        $any: function(s, e) {
                            s.count += 1;
                        }
                    })
                    .outputState();";
var name = quot;countEvents_Create_{Guid.NewGuid()}";
await managementClient.CreateContinuousAsync(name, js);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Sample available soon
1
const name = `countEvents_Create_${uuid()}`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .outputState();
`;
await client.createContinuousProjection(name, projection);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This is currently not available in the Rust client
1
const name = `countEvents_Create_${uuid()}`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .outputState();
`;
await client.createContinuousProjection(name, projection);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Trying to create projections with the same name will result in an error:

await managementClient.CreateContinuousAsync(name, js);
try {

    await managementClient.CreateContinuousAsync(name, js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
    var format = quot;{name} already exists";
    Console.WriteLine(format);
}
1
2
3
4
5
6
7
8
Sample available soon
1
try {
  await client.createContinuousProjection(name, projection);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("Conflict")) throw err;
  console.log(`${name} already exists`);
}
1
2
3
4
5
6
// This is currently not available in the Rust client
1
try {
  await client.createContinuousProjection(name, projection);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("Conflict")) throw err;
  console.log(`${name} already exists`);
}
1
2
3
4
5
6

Restart the subsystem

Restarts the entire projection subsystem. The user must be in the $ops or $admin group to perform this operation

await managementClient.RestartSubsystemAsync();
1
// This is currently not available in the java client 
1
await client.restartSubsystem();
1
Sample available soon
1
await client.restartSubsystem();
1

Enable a projection

Enables an existing projection by name. Once enabled, the projection will start to process events even after restarting the server or the projection subsystem. You must have access to a projection to enable it, see the ACL documentation

await managementClient.EnableAsync("$by_category");
1
// This is currently not available in the java client
1
await client.enableProjection("$by_category");
1
// This is currently not available in the Rust client
1
await client.enableProjection("$by_category");
1

You can only enable an existing projection. When you try to enable a non-existing projection, you'll get an error:

try {
    await managementClient.EnableAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
    Console.WriteLine(e.Message);
}
1
2
3
4
5
// This is currently not available in the java client
1
const projectionName = "projection that does not exist";

try {
  await client.enableProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
// This is currently not available in the Rust client
1
const projectionName = "projection that does not exist";

try {
  await client.enableProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8

Disable a projection

Disables a projection, this will save the projection checkpoint. Once disabled, the projection will not process events even after restarting the server or the projection subsystem. You must have access to a projection to disable it, see the ACL documentation

WARNING

The .net clients up to version 21.2 had an incorrect behavior: they will not save the checkpoint

await managementClient.DisableAsync("$by_category");
1
// This is currently not available in the java client
1
await client.disableProjection("$by_category");
1
// This is currently not available in the Rust client
1
await client.disableProjection("$by_category");
1

You can only disable an existing projection. When you try to disable a non-existing projection, you'll get an error:

try {
    await managementClient.DisableAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
    Console.WriteLine(e.Message);
}
1
2
3
4
5
// This is currently not available in the java client
1
const projectionName = "projection that does not exist";

try {
  await client.disableProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
// This is currently not available in the Rust client
1
const projectionName = "projection that does not exist";

try {
  await client.disableProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8

Delete a projection

Deletes a projection

// This is currently not available in the .net client
1
// This is currently not available in the java client
1
// A projection must be disabled to allow it to be deleted.
await client.disableProjection(name, { writeCheckpoint: true });

// The projection can now be deleted
await client.deleteProjection(name);
1
2
3
4
5
// This is currently not available in the Rust client
1
// A projection must be disabled to allow it to be deleted.
await client.disableProjection(name, { writeCheckpoint: true });

// The projection can now be deleted
await client.deleteProjection(name);
1
2
3
4
5

You can only delete an existing projection. When you try to delete a non-existing projection, you'll get an error:

// This is currently not available in the .net client
1
// This is currently not available in the java client
1
const projectionName = "projection that does not exist";

try {
  await client.deleteProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
// This is currently not available in the Rust client
1
const projectionName = "projection that does not exist";

try {
  await client.deleteProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8

Abort a projection

Aborts a projection, this will not save the projection's checkpoint.

WARNING

The .net clients up to version 21.2 had an incorrect behavior: they will save the checkpoint.

// The .net clients prior to version 21.6 had an incorrect behavior: they will save the checkpoint.
await managementClient.AbortAsync("countEvents_Abort");
1
2
// This is currently not available in the java client
1
await client.disableProjection(name, {
  // not writing the checkpoint will abort the projection
  writeCheckpoint: false,
});
1
2
3
4
// This is currently not available in the Rust client
1
await client.disableProjection(name, {
  // not writing the checkpoint will abort the projection
  writeCheckpoint: false,
});
1
2
3
4

You can only abort an existing projection. When you try to abort a non-existing projection, you'll get an error:

try {
    await managementClient.AbortAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
    Console.WriteLine(e.Message);
}
1
2
3
4
5
// This is currently not available in the java client
1
const projectionName = "projection that does not exist";

try {
  await client.disableProjection(projectionName, {
    writeCheckpoint: false,
  });
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
9
10
// This is currently not available in the Rust client
1
const projectionName = "projection that does not exist";

try {
  await client.disableProjection(projectionName, {
    writeCheckpoint: false,
  });
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
9
10

Reset a projection

Resets a projection. This will re-emit events. Streams that are written to from the projection will also be soft deleted.

// Checkpoint will be written prior to resetting the projection
await managementClient.ResetAsync("countEvents_Reset");
1
2
// This is currently not available in the java client
1
await client.resetProjection(name);
1
// This is currently not available in the Rust client
1
await client.resetProjection(name);
1

Resetting a projection that does not exists will result in an error.

try {
    await managementClient.ResetAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
    Console.WriteLine(e.Message);
}
1
2
3
4
5
// This is currently not available in the java client
1
const projectionName = "projection that does not exist";

try {
  await client.resetProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
// This is currently not available in the Rust client
1
const projectionName = "projection that does not exist";

try {
  await client.resetProjection(projectionName);
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8

Update a projection

Updates a projection. The name parameter is the name of the projection to be updated. The query parameter contains the new JavaScript.

const string js = @"fromAll()
                    .when({
                        $init: function() {
                            return {
                                count: 0
                            };
                        },
                        $any: function(s, e) {
                            s.count += 1;
                        }
                    })
                    .outputState();";
var name = quot;countEvents_Update_{Guid.NewGuid()}";

await managementClient.CreateContinuousAsync(name, "fromAll().when()");
await managementClient.UpdateAsync(name, js);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This is currently not available in the java client
1
const name = `countEvents_Update_${uuid()}`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .outputState();
`;
await client.createContinuousProjection(name, "fromAll().when()");
await client.updateProjection(name, projection);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// This is currently not available in the Rust client
1
const name = `countEvents_Update_${uuid()}`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .outputState();
`;
await client.createContinuousProjection(name, "fromAll().when()");
await client.updateProjection(name, projection);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

You can only update an existing projection. When you try to update a non-existing projection, you'll get an error:

try {
    await managementClient.UpdateAsync("Update Not existing projection", "fromAll().when()");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
    Console.WriteLine("'Update Not existing projection' does not exists and can not be updated");
}
1
2
3
4
5
// This is currently not available in the java client
1
const projectionName = "projection that does not exist";

try {
  await client.updateProjection(projectionName, "fromAll().when()");
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8
// This is currently not available in the Rust client
1
const projectionName = "projection that does not exist";

try {
  await client.updateProjection(projectionName, "fromAll().when()");
} catch (err) {
  if (!isCommandError(err) || !err.message.includes("NotFound")) throw err;
  console.log(`${projectionName} does not exist`);
}
1
2
3
4
5
6
7
8

List all projections

Returns a list of all projections, user defined & system projections. See the projection details section for an explanation of the returned values

var details = managementClient.ListAllAsync();
await foreach (var detail in details) {
    Console.WriteLine(
        $@"{detail.Name}, {detail.Status}, {detail.CheckpointStatus}, {detail.Mode}, {detail.Progress}");
}
1
2
3
4
5
// This is currently not available in the java client
1
// This is currently not available in the nodejs client
1
// This is currently not available in the Rust client
1
// This is currently not available in the nodejs client
1

List continuous projections

Returns a list of all continuous projections. See the projection details section for an explanation of the returned values

var details = managementClient.ListContinuousAsync();
await foreach (var detail in details) {
    Console.WriteLine(
        $@"{detail.Name}, {detail.Status}, {detail.CheckpointStatus}, {detail.Mode}, {detail.Progress}");
}
1
2
3
4
5
// This is currently not available in the java client
1
const projections = await client.listContinuousProjections();

for (const {
  name,
  status,
  checkpointStatus,
  mode,
  progress,
} of projections) {
  console.log(name, status, checkpointStatus, mode, progress);
}
1
2
3
4
5
6
7
8
9
10
11
// This is currently not available in the Rust client
1
const projections = await client.listContinuousProjections();

for (const {
  name,
  status,
  checkpointStatus,
  mode,
  progress,
} of projections) {
  console.log(name, status, checkpointStatus, mode, progress);
}
1
2
3
4
5
6
7
8
9
10
11

Get Status

Gets the status of a named projection. See the projection details section for an explanation of the returned values

await managementClient.CreateContinuousAsync(name, js);
var status = await managementClient.GetStatusAsync(name);
Console.WriteLine(
    $@"{status.Name}, {status.Status}, {status.CheckpointStatus}, {status.Mode}, {status.Progress}");
1
2
3
4
// This is currently not available in the java client
1
const projection = await client.getProjectionStatistics(name);

console.log(
  projection.name,
  projection.status,
  projection.checkpointStatus,
  projection.mode,
  projection.progress
);
1
2
3
4
5
6
7
8
9
// This is currently not available in the Rust client
1
const projection = await client.getProjectionStatistics(name);

console.log(
  projection.name,
  projection.status,
  projection.checkpointStatus,
  projection.mode,
  projection.progress
);
1
2
3
4
5
6
7
8
9

Get state

Retrieves the state of a projection.

Sample available soon
1
// This is currently not available in the java client
1

    const name = `get_state_example`;
    const projection = `
        fromAll()
            .when({
                $init() {
                    return {
                        count: 0,
                    };
                },
                $any(s, e) {
                    s.count += 1;
                }
            })
            .transformBy((state) => state.count)
            .outputState();
    `;

    await client.createContinuousProjection(name, projection);

    // Give it some time to count event
    await delay(500);

    const state = await client.getProjectionState(name);

    console.log(`Counted ${state.count} events.`);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// This is currently not available in the Rust client
1
interface CountProjectionState {
  count: number;
}

const name = `get_state_example`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .transformBy((state) => state.count)
        .outputState();
`;

await client.createContinuousProjection(name, projection);

// Give it some time to count event
await delay(500);

const state = await client.getProjectionState<CountProjectionState>(name);

console.log(`Counted ${state.count} events.`);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

Get result

Retrieves the result of the named projection and partition.

const string js = @"fromAll()
                    .when({
                        $init: function() {
                            return {
                                count: 0
                            };
                        },
                        $any: function(s, e) {
                            s.count += 1;
                        }
                    })
                    .outputState();";
var name = quot;countEvents_Result_{Guid.NewGuid()}";

await managementClient.CreateContinuousAsync(name, js);
await Task.Delay(500);  //give it some time to have a result.

// Results are retrieved either as  JsonDocument or a typed result 
var document = await managementClient.GetResultAsync(name);
var result = await managementClient.GetResultAsync<Result>(name);

Console.WriteLine(DocToString(document));
Console.WriteLine(result);

static string DocToString(JsonDocument d) {
    using var stream = new MemoryStream();
    using Utf8JsonWriter writer = new Utf8JsonWriter(stream, new JsonWriterOptions { Indented = false });
    d.WriteTo(writer);
    writer.Flush();
    return Encoding.UTF8.GetString(stream.ToArray());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Sample available soon
1
const name = `get_result_example`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .transformBy((state) => state.count)
        .outputState();
`;

await client.createContinuousProjection(name, projection);

// Give it some time to have a result.
await delay(500);

const result = await client.getProjectionResult(name);

console.log(`Counted ${result} events.`);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// This is currently not available in the Rust client
1
const name = `get_result_example`;
const projection = `
    fromAll()
        .when({
            $init() {
                return {
                    count: 0,
                };
            },
            $any(s, e) {
                s.count += 1;
            }
        })
        .transformBy((state) => state.count)
        .outputState();
`;

await client.createContinuousProjection(name, projection);

// Give it some time to have a result.
await delay(500);

const result = await client.getProjectionResult<number>(name);

console.log(`Counted ${result} events.`);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

Projection Details

List all, list continuous and get status all return the details and statistics of projections

FieldDescription
Name, EffectiveNameThe name of the projection
StatusA human readable string of the current statuses of the projection (see below)
StateReasonA human readable string explaining the reason of the current projection state
CheckpointStatusA human readable string explaining the current operation performed on the checkpoint : requested, writing
ModeContinuous, OneTime , Transient
CoreProcessingTimeThe total time, in ms, the projection took to handle events since the last restart
ProgressThe progress, in %, indicates how far this projection has processed event, in case of a restart this could be -1% or some number. It will be updated as soon as a new event is appended and processed
WritesInProgressThe number of write requests to emitted streams currently in progress, these writes can be batches of events
ReadsInProgressThe number of read requests currently in progress
PartitionsCachedThe number of cached projection partitions
PositionThe Position of the last processed event
LastCheckpointThe Position of the last checkpoint of this projection
EventsProcessedAfterRestartThe number of events processed since the last restart of this projection
BufferedEventsThe number of events in the projection read buffer
WritePendingEventsBeforeCheckpointThe number of events waiting to be appended to emitted streams before the pending checkpoint can be written
WritePendingEventsAfterCheckpointThe number of events to be appended to emitted streams since the last checkpoint
VersionThis is used internally, the version is increased when the projection is edited or reset
EpochThis is used internally, the epoch is increased when the projection is reset

The Status string is a combination of the following values. The first 3 are the most common one, as the other one are transient values while the projection is initialised or stopped

ValueDescription
RunningThe projection is running and processing events
StoppedThe projection is stopped and is no longer processing new events
FaultedAn error occured in the projection, StateReason will give the fault details, the projection is not processing events
InitialThis is the initial state, before the projection is fully initialised
SuspendedThe projection is suspended and will not process events, this happens while stopping the projection
LoadStateRequestedThe state of the projection is being retrieved, this happens while the projection is starting
StateLoadedThe state of the projection is loaded, this happens while the projection is starting
SubscribedThe projection has successfully subscribed to its readers, this happens while the projection is starting
FaultedStoppingThis happens before the projection is stopped due to an error in the projection
StoppingThe projection is being stopped
CompletingPhaseThis happens while the projection is stopping
PhaseCompletedThis happens while the projection is stopping
Last Updated: 9/22/2021, 1:59:50 PM
Contributors: Mathew McLoughlin