# Subscription basics

Subscriptions allow you to subscribe to a stream and receive notifications for changes in the stream.

You provide an event handler and an optional starting point to the subscription. The handler is called for each event from the starting point onward.

If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.

TIP

Check connecting to EventStoreDB instructions to learn how to configure and use the client SDK.

# Subscribing to a stream

The simplest stream subscription looks like the following :

    await client.SubscribeToStreamAsync("some-stream",
    	async (subscription, evnt, cancellationToken) => {
    		Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
    		await HandleEvent(evnt);
    	});
    SubscriptionListener listener = new SubscriptionListener() {
        @Override
        public void onEvent(Subscription subscription, ResolvedEvent event) {
            System.out.println("Received event"
                    + event.getOriginalEvent().getStreamRevision()
                    + "@" + event.getOriginalEvent().getStreamId());
            HandleEvent(event);
        }
    };
    client.subscribeToStream("some-stream", listener);
    const subscription = client.subscribeToStream("some-stream");
    
    for await (const resolvedEvent of subscription) {
      console.log(
        `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
      );
    
      await handleEvent(resolvedEvent);
    }
    let mut stream = client
        .subscribe_to_stream("some-stream", &Default::default())
        .await?;
    
    while let Some(event) = stream.try_next().await? {
        if let SubEvent::EventAppeared(event) = event {
            // Handles the event...
        }
    }
    const subscription =
      client.subscribeToStream<SomeStreamEvents>("some-stream");
    
    for await (const resolvedEvent of subscription) {
      console.log(
        `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
      );
      await handleEvent(resolvedEvent);
    }
    // Make sure to add code blocks to your code group

    The provided handler will be called for every event in the stream.

    # Subscribing to $all

    Subscribing to $all is much the same as subscribing to a single stream. The handler will be called for every event appended after the starting position.

      await client.SubscribeToAllAsync(
      	async (subscription, evnt, cancellationToken) => {
      		Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
      		await HandleEvent(evnt);
      	});
      SubscriptionListener listener = new SubscriptionListener() {
          @Override
          public void onEvent(Subscription subscription, ResolvedEvent event) {
              System.out.println("Received event"
                      + event.getOriginalEvent().getStreamRevision().getValueUnsigned()
                      + "@" + event.getOriginalEvent().getStreamId());
              HandleEvent(event);
          }
      };
      client.subscribeToAll(listener);
      const subscription = client.subscribeToAll();
      
      for await (const resolvedEvent of subscription) {
        console.log(
          `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
        );
      
        await handleEvent(resolvedEvent);
      }
      let mut stream = client.subscribe_to_all(&Default::default()).await?;
      
      while let Some(event) = stream.try_next().await? {
          if let SubEvent::EventAppeared(event) = event {
              // Handles the event...
          }
      }
      const subscription = client.subscribeToAll();
      
      for await (const resolvedEvent of subscription) {
        console.log(
          `Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
        );
        await handleEvent(resolvedEvent);
      }
      // Make sure to add code blocks to your code group

      # Subscribing from a specific position

      The previous examples will subscribe to the stream from the beginning. This will end up calling the handler for every event in the stream and then wait for new events after that.

      Both the stream and $all subscriptions accept a starting position if you want to read from a specific point onward. If events already exist at the position you subscribe to, they will be read on the server side and sent to the subscription.

      Once caught up, the sever will push any new events received on the streams to the client. There is no difference between catching up and live on the client side.

      WARNING

      The positions provided to the subscriptions are exclusive. You will only receive the next event after the subscribed position.

      # Subscribing to a stream

      To subscribe to a stream from a specific position, you need to provide a stream position. This can be Start, End or a big int (unsigned 64 bit integer) position.

      The following subscribes to the stream some-stream at position 20, this means that events 21 and onward will be handled:

        await client.SubscribeToStreamAsync(
        	"some-stream",
        	StreamPosition.FromInt64(20),
        	EventAppeared);
        client.subscribeToStream(
                "some-stream",
                listener,
                SubscribeToStreamOptions.get()
                        .fromRevision(20)
        );
        const subscription = client.subscribeToStream("some-stream", {
          fromRevision: BigInt(20),
        });
        
        let options = SubscribeToStreamOptions::default().position(StreamPosition::Position(20));
        
        client.subscribe_to_stream("some-stream", &options).await?;
        const subscription = client.subscribeToStream<SomeStreamEvents>(
          "some-stream",
          {
            fromRevision: BigInt(20),
          }
        );
        // Make sure to add code blocks to your code group

        # Subscribing to $all

        Subscribing to the $all stream is much like subscribing to a regular stream. The only difference is how you need to specify the stream position. For the $all stream, you have to provide a Position structure instead, which consists of two big integers - prepare and commit positions. The Position value can be Start, End or a Position created from a commit and prepare position.

        The corresponding $all subscription will subscribe from the event after the one at commit position 1056 and prepare position 1056.

        Please note that this position will need to be a legitimate position in $all.

          await client.SubscribeToAllAsync(
          	new Position(1056, 1056),
          	EventAppeared);
          client.subscribeToAll(
                  listener,
                  SubscribeToAllOptions.get()
                          .fromPosition(new Position(1056, 1056))
          );
          const subscription = client.subscribeToAll({
            fromPosition: {
              commit: BigInt(1056),
              prepare: BigInt(1056),
            },
          });
          
          let options = SubscribeToAllOptions::default().position(StreamPosition::Position(Position {
              commit: 1_056,
              prepare: 1_056,
          }));
          
          client.subscribe_to_all(&options).await?;
          const subscription = client.subscribeToAll({
            fromPosition: {
              commit: BigInt(1056),
              prepare: BigInt(1056),
            },
          });
          // Make sure to add code blocks to your code group

          # Subscribing to a stream for live updates

          You can subscribe to a stream to get live updates by subscribing to the end of the stream:

            await client.SubscribeToStreamAsync(
            	"some-stream",
            	StreamPosition.End,
            	EventAppeared);
            client.subscribeToStream(
                    "some-stream",
                    listener,
                    SubscribeToStreamOptions.get()
                            .fromEnd()
            );
            const subscription = client.subscribeToStream("some-stream", {
              fromRevision: END,
            });
            
            let options = SubscribeToStreamOptions::default().position(StreamPosition::End);
            client.subscribe_to_stream("some-stream", &options).await?;
            const subscription = client.subscribeToStream<SomeStreamEvents>(
              "some-stream",
              {
                fromRevision: END,
              }
            );
            // Make sure to add code blocks to your code group

            And the same works with $all :

              await client.SubscribeToAllAsync(
              	Position.End,
              	EventAppeared);
              client.subscribeToAll(
                      listener,
                      SubscribeToAllOptions.get()
                              .fromEnd()
              );
              const subscription = client.subscribeToAll({
                fromPosition: END,
              });
              
              let options = SubscribeToAllOptions::default().position(StreamPosition::End);
              client.subscribe_to_all(&options).await?;
              const subscription = client.subscribeToAll({
                fromPosition: END,
              });
              // Make sure to add code blocks to your code group

              This won't read through the history of the stream, but will rather notify the handler when a new event appears in the respective stream.

              Keep in mind that when you subscribe to a stream from a certain position, as described above, you will also get live updates after your subscription catches up (processes all the historical events).

              Link-to events point to events in other streams in EventStoreDB. These are generally created by projections such as the $by_event_type projection which links events of the same event type into the same stream. This makes it easier to look up all events of a certain type.

              TIP

              Filtered subscriptions make it easier and faster to subscribe to all events of a certain type or matching a prefix.

              When reading a stream you can specify whether to resolve link-to's or not. By default, link-to events are not resolved. You can change this behaviour by setting the resolveLinkTos parameter to true:

                await client.SubscribeToStreamAsync(
                	"$et-myEventType",
                	StreamPosition.Start,
                	EventAppeared,
                	resolveLinkTos: true);
                client.subscribeToStream(
                        "$et-myEventType",
                        listener,
                        SubscribeToStreamOptions.get()
                                .fromStart()
                                .resolveLinkTos()
                );
                const subscription = client.subscribeToStream("$et-myEventType", {
                  fromRevision: START,
                  resolveLinkTos: true,
                });
                
                let options = SubscribeToStreamOptions::default()
                    .position(StreamPosition::Start)
                    .resolve_link_tos();
                
                client
                    .subscribe_to_stream("$et-myEventType", &options)
                    .await?;
                const subscription = client.subscribeToStream<SomeStreamEvents>(
                  "$et-myEventType",
                  {
                    fromRevision: START,
                    resolveLinkTos: true,
                  }
                );
                // Make sure to add code blocks to your code group

                # Dropped subscriptions

                When a subscription stops or experiences an error, it will be dropped. The subscription provides a subscriptionDropped callback, which will get called when the subscription breaks.

                The subscriptionDropped callback allows you to inspect the reason why the subscription dropped, as well as any exceptions that occurred.

                The possible reasons for a subscription to drop are:

                Reason Why it might happen
                Disposed The subscription got cancelled or disposed by the client.
                SubscriberError An error occurred while handling an event in the subscription handler.
                ServerError An error occurred on the server, and the server closed the subscription. Check the server logs for more information.

                Bear in mind that a subscription can also drop because it is slow. The server tried to push all the live events to the subscription when it is in the live processing mode. If the subscription gets the reading buffer overflow and won't be able to acknowledge the buffer, it will break.

                # Handling subscription drops

                An application, which hosts the subscription, can go offline for a period of time for different reasons. It could be a crash, infrastructure failure, or a new version deployment. As you rarely would want to reprocess all the events again, you'd need to store the current position of the subscription somewhere, and then use it to restore the subscription from the point where it dropped off:

                  var checkpoint = StreamPosition.Start;
                  await client.SubscribeToStreamAsync(
                  	"some-stream",
                  	checkpoint,
                  	eventAppeared: async (subscription, evnt, cancellationToken) => {
                  		await HandleEvent(evnt);
                  		checkpoint = evnt.OriginalEventNumber;
                  	},
                  	subscriptionDropped: ((subscription, reason, exception) => {
                  		Console.WriteLine($"Subscription was dropped due to {reason}. {exception}");
                  		if (reason != SubscriptionDroppedReason.Disposed) {
                  			// Resubscribe if the client didn't stop the subscription
                  			Resubscribe(checkpoint);
                  		}
                  	}));
                  final StreamRevision[] checkpoint = {StreamRevision.START};
                  
                  client.subscribeToStream(
                          "some-stream",
                          new SubscriptionListener() {
                              @Override
                              public void onEvent(Subscription subscription, ResolvedEvent event) {
                                  HandleEvent(event);
                                  checkpoint[0] = event.getOriginalEvent().getStreamRevision();
                              }
                  
                              @Override
                              public void onError(Subscription subscription, Throwable throwable) {
                                  System.out.println("Subscription was dropped due to " + throwable.getMessage());
                                  Resubscribe(checkpoint[0]);
                              }
                          },
                          SubscribeToStreamOptions.get()
                                  .fromRevision(checkpoint[0])
                  );
                  let checkpoint = START;
                  
                  const subscription = client
                    .subscribeToStream("some-stream", {
                      fromRevision: checkpoint,
                    })
                    .on("data", (resolvedEvent) => {
                      handleEvent(resolvedEvent);
                      checkpoint = resolvedEvent.event?.revision ?? checkpoint;
                    });
                  let retry = RetryOptions::default().retry_forever();
                  let options = SubscribeToStreamOptions::default().retry_options(retry);
                  let mut stream = client.subscribe_to_stream("some-stream", &options).await?;
                  
                  while let Some(event) = stream.try_next().await? {
                      if let SubEvent::EventAppeared(event) = event {
                          // Handles the event...
                      }
                  }
                  let checkpoint: ReadRevision = START;
                  
                  const subscription = client
                    .subscribeToStream<SomeStreamEvents>("some-stream", {
                      fromRevision: checkpoint,
                    })
                    .on("data", (resolvedEvent) => {
                      handleEvent(resolvedEvent);
                      checkpoint = resolvedEvent.event?.revision ?? checkpoint;
                    });
                  // Make sure to add code blocks to your code group

                  When subscribed to $all you want to keep the position of the event in the $all stream. As mentioned previously, the $all stream position consists of two big integers (prepare and commit positions), not one:

                    
                    var checkpoint = Position.Start;
                    await client.SubscribeToAllAsync(
                    	checkpoint,
                    	eventAppeared: async (subscription, evnt, cancellationToken) => {
                    		await HandleEvent(evnt);
                    		checkpoint = evnt.OriginalPosition.Value;
                    	},
                    	subscriptionDropped: ((subscription, reason, exception) => {
                    		Console.WriteLine($"Subscription was dropped due to {reason}. {exception}");
                    		if (reason != SubscriptionDroppedReason.Disposed) {
                    			// Resubscribe if the client didn't stop the subscription
                    			Resubscribe(checkpoint);
                    		}
                    	}));
                    final Position[] checkpoint = {Position.START};
                    
                    client.subscribeToAll(
                            new SubscriptionListener() {
                                @Override
                                public void onEvent(Subscription subscription, ResolvedEvent event) {
                                    HandleEvent(event);
                                    checkpoint[0] = event.getOriginalEvent().getPosition();
                                }
                    
                                @Override
                                public void onError(Subscription subscription, Throwable throwable) {
                                    System.out.println("Subscription was dropped due to " + throwable.getMessage());
                                    Resubscribe(checkpoint[0]);
                                }
                            },
                            SubscribeToAllOptions.get()
                                    .fromPosition(checkpoint[0])
                    );
                    let checkpoint = START;
                    
                    const subscription = client
                      .subscribeToAll({
                        fromPosition: checkpoint,
                      })
                      .on("data", (resolvedEvent) => {
                        handleEvent(resolvedEvent);
                        checkpoint = resolvedEvent.event?.position ?? checkpoint;
                      });
                    let retry = RetryOptions::default().retry_forever();
                    let options = SubscribeToAllOptions::default().retry_options(retry);
                    let mut stream = client.subscribe_to_all(&options).await?;
                    
                    while let Some(event) = stream.try_next().await? {
                        if let SubEvent::EventAppeared(event) = event {
                            // Handles the event...
                        }
                    }
                    let checkpoint: ReadPosition = START;
                    
                    const subscription = client
                      .subscribeToAll({
                        fromPosition: checkpoint,
                      })
                      .on("data", (resolvedEvent) => {
                        handleEvent(resolvedEvent);
                        checkpoint = resolvedEvent.event?.position ?? checkpoint;
                      });
                    // Make sure to add code blocks to your code group

                    # Filter options

                    Subscriptions to $all can include a filter option. A filtered subscription will only invoke the event handler if the event matches the provided filter.

                    A simple stream prefix filter looks like this:

                      var prefixStreamFilter = new SubscriptionFilterOptions(StreamFilter.Prefix("test-", "other-"));
                      await client.SubscribeToAllAsync(
                      	EventAppeared,
                      	filterOptions: prefixStreamFilter);
                      SubscriptionFilter filter = SubscriptionFilter.newBuilder()
                              .withStreamNamePrefix("test-")
                              .build();
                      
                      SubscribeToAllOptions options = SubscribeToAllOptions.get()
                              .filter(filter);
                      
                      client.subscribeToAll(
                              listener,
                              options);
                      const subscription = client.subscribeToAll({
                        filter: streamNameFilter({ prefixes: ["test-", "other-"] }),
                      });
                      
                      let filter = SubscriptionFilter::on_stream_name().add_prefix("test-");
                      let options = SubscribeToAllOptions::default().filter(filter);
                      
                      client.subscribe_to_all(&options).await?;
                      const subscription = client.subscribeToAll({
                        filter: streamNameFilter({ prefixes: ["test-", "other-"] }),
                      });
                      // Make sure to add code blocks to your code group

                      The filtering API is described more in-depth in the filtering section.

                      # User credentials

                      The user creating a subscription must have read access to the stream it's subscribing to, and only admin users may subscribe to $all or create filtered subscriptions.

                      The code below shows how you can provide user credentials for a subscription. When you specify subscription credentials explicitly, it will override the default credentials set for the client. If you don't specify any credentials, the client will use the credentials specified for the client, if you specified those.

                        await client.SubscribeToAllAsync(
                        	EventAppeared,
                        	userCredentials: new UserCredentials("admin", "changeit"));
                        UserCredentials credentials = new UserCredentials("admin", "changeit");
                        
                        SubscribeToAllOptions options = SubscribeToAllOptions.get()
                                .authenticated(credentials);
                        
                        client.subscribeToAll(
                                listener,
                                options);
                        const subscription = client.subscribeToStream("some-stream", {
                          credentials: {
                            username: "admin",
                            password: "changeit",
                          },
                        });
                        
                        let options =
                            SubscribeToAllOptions::default().authenticated(Credentials::new("admin", "changeit"));
                        client.subscribe_to_all(&options).await?;
                        const subscription = client.subscribeToStream<SomeStreamEvents>(
                          "some-stream",
                          {
                            credentials: {
                              username: "admin",
                              password: "changeit",
                            },
                          }
                        );
                        // Make sure to add code blocks to your code group
                        Last Updated: 7/30/2021, 8:48:57 AM