diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs index d06b65b..f9095d5 100644 --- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs +++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs @@ -15,7 +15,7 @@ namespace PharmaxoScientific.MessageDispatch.KurrentDB; /// Subscriber for event store. /// public class KurrentDbSubscriber -{ +{ /// /// Setting this to 100 as 3200 records seems like a sensible balance between checking too often and too infrequently /// https://docs.kurrent.io/clients/grpc/subscriptions.html#updating-checkpoints-at-regular-intervals @@ -33,7 +33,7 @@ public class KurrentDbSubscriber private CancellationTokenSource _cts; private DateTime _lastStreamPositionTimestamp; private Func _setLastPositions; - + private IEventFilter _eventFilter; private IDispatcher _dispatcher; private ILogger _logger; @@ -47,15 +47,23 @@ private KurrentDbSubscriber( IDispatcher dispatcher, string streamName, ILogger logger, - ulong? startingPosition) - => Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + ulong? startingPosition, + IEventFilter eventFilter = null) + => Init( + kurrentDbClient, + dispatcher, + streamName, + logger, + startingPosition: startingPosition, + eventFilter: eventFilter); private KurrentDbSubscriber( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, ILogger logger, string streamName, - string checkpointFilePath) + string checkpointFilePath, + IEventFilter eventFilter = null) { _checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, -1); var initialCheckpointPosition = _checkpoint.Read(); @@ -66,7 +74,7 @@ private KurrentDbSubscriber( startingPosition = (ulong)initialCheckpointPosition; } - Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition, eventFilter: eventFilter); } private KurrentDbSubscriber( @@ -150,17 +158,20 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition( /// KurrentDB connection. /// Dispatcher. /// Logger. + /// A filter for server side event filtering. /// A new KurrentDbSubscriber object. public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, - ILogger logger) + ILogger logger, + IEventFilter eventFilter = null) => new KurrentDbSubscriber( kurrentDbClient, dispatcher, AllStreamName, logger, - null); + null, + eventFilter: eventFilter); /// /// Creates an KurrentDB catchup subscription that is subscribed to all from a position. @@ -169,18 +180,21 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll( /// Dispatcher. /// Logger. /// Starting Position. + /// A filter for server side event filtering. /// A new KurrentDbSubscriber object. public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, ILogger logger, - ulong? startingPosition) + ulong? startingPosition, + IEventFilter eventFilter = null) => new KurrentDbSubscriber( kurrentDbClient, dispatcher, AllStreamName, logger, - startingPosition); + startingPosition, + eventFilter); /// /// Creates an KurrentDB catchup subscription subscribed to all using a checkpoint file. @@ -189,18 +203,21 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPo /// Dispatcher. /// Logger. /// Path of the checkpoint file. + /// A filter for server side event filtering. /// A new KurrentDbSubscriber object. public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, ILogger logger, - string checkpointFilePath) + string checkpointFilePath, + IEventFilter eventFilter = null) => new KurrentDbSubscriber( kurrentDbClient, dispatcher, logger, AllStreamName, - checkpointFilePath); + checkpointFilePath, + eventFilter); /// /// Start the subscriber. @@ -277,7 +294,7 @@ public async void Start() private StreamSubscriptionResult CreateSubscription() { - var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval); + var filterOptions = new SubscriptionFilterOptions(_eventFilter ?? EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval); const bool resolveLinkTos = true; @@ -308,14 +325,15 @@ private StreamSubscriptionResult CreateSubscription() /// public void ShutDown() => _cts.Cancel(); - private void Init( - KurrentDBClient connection, + private void Init(KurrentDBClient connection, IDispatcher dispatcher, string streamName, ILogger logger, ulong? startingPosition = null, - bool liveOnly = false) + bool liveOnly = false, + IEventFilter eventFilter = null) { + _eventFilter = eventFilter; _logger = logger; _startingPosition = startingPosition; _lastProcessedEventPosition = startingPosition; @@ -333,7 +351,8 @@ private void Init( Direction.Backwards, Position.End, maxCount: 1, - resolveLinkTos: false) + resolveLinkTos: false, + eventFilter: eventFilter) .ToListAsync(); _actualEndOfStreamPosition = lastEventFromStream.First().OriginalEvent.Position.CommitPosition;