From efe350b8381f9bcc209322ee553b26ff52107c66 Mon Sep 17 00:00:00 2001 From: Sam Matthews Date: Wed, 3 Sep 2025 10:26:50 +0100 Subject: [PATCH 1/2] Add server side event filtering to "All" subscriptions --- .../KurrentDbSubscriber.cs | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs index d06b65b..543ae5e 100644 --- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs +++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs @@ -15,7 +15,7 @@ namespace PharmaxoScientific.MessageDispatch.KurrentDB; /// Subscriber for event store. /// public class KurrentDbSubscriber -{ +{ /// /// Setting this to 100 as 3200 records seems like a sensible balance between checking too often and too infrequently /// https://docs.kurrent.io/clients/grpc/subscriptions.html#updating-checkpoints-at-regular-intervals @@ -47,15 +47,23 @@ private KurrentDbSubscriber( IDispatcher dispatcher, string streamName, ILogger logger, - ulong? startingPosition) - => Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + ulong? startingPosition, + IEventFilter eventFilter = null) + => Init( + kurrentDbClient, + dispatcher, + streamName, + logger, + startingPosition: startingPosition, + eventFilter: eventFilter); private KurrentDbSubscriber( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, ILogger logger, string streamName, - string checkpointFilePath) + string checkpointFilePath, + IEventFilter eventFilter = null) { _checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, -1); var initialCheckpointPosition = _checkpoint.Read(); @@ -66,7 +74,7 @@ private KurrentDbSubscriber( startingPosition = (ulong)initialCheckpointPosition; } - Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition, eventFilter: eventFilter); } private KurrentDbSubscriber( @@ -150,17 +158,20 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition( /// KurrentDB connection. /// Dispatcher. /// Logger. + /// A filter for server side event filtering. /// A new KurrentDbSubscriber object. public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, - ILogger logger) + ILogger logger, + IEventFilter eventFilter = null) => new KurrentDbSubscriber( kurrentDbClient, dispatcher, AllStreamName, logger, - null); + null, + eventFilter: eventFilter); /// /// Creates an KurrentDB catchup subscription that is subscribed to all from a position. @@ -169,18 +180,21 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll( /// Dispatcher. /// Logger. /// Starting Position. + /// A filter for server side event filtering. /// A new KurrentDbSubscriber object. public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, ILogger logger, - ulong? startingPosition) + ulong? startingPosition, + IEventFilter eventFilter = null) => new KurrentDbSubscriber( kurrentDbClient, dispatcher, AllStreamName, logger, - startingPosition); + startingPosition, + eventFilter); /// /// Creates an KurrentDB catchup subscription subscribed to all using a checkpoint file. @@ -189,18 +203,21 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPo /// Dispatcher. /// Logger. /// Path of the checkpoint file. + /// A filter for server side event filtering. /// A new KurrentDbSubscriber object. public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint( KurrentDBClient kurrentDbClient, IDispatcher dispatcher, ILogger logger, - string checkpointFilePath) + string checkpointFilePath, + IEventFilter eventFilter = null) => new KurrentDbSubscriber( kurrentDbClient, dispatcher, logger, AllStreamName, - checkpointFilePath); + checkpointFilePath, + eventFilter); /// /// Start the subscriber. @@ -308,13 +325,13 @@ private StreamSubscriptionResult CreateSubscription() /// public void ShutDown() => _cts.Cancel(); - private void Init( - KurrentDBClient connection, + private void Init(KurrentDBClient connection, IDispatcher dispatcher, string streamName, ILogger logger, ulong? startingPosition = null, - bool liveOnly = false) + bool liveOnly = false, + IEventFilter eventFilter = null) { _logger = logger; _startingPosition = startingPosition; @@ -333,7 +350,8 @@ private void Init( Direction.Backwards, Position.End, maxCount: 1, - resolveLinkTos: false) + resolveLinkTos: false, + eventFilter: eventFilter) .ToListAsync(); _actualEndOfStreamPosition = lastEventFromStream.First().OriginalEvent.Position.CommitPosition; From 3552b4935e5800379a5e012ee3f8410557c6b666 Mon Sep 17 00:00:00 2001 From: Sam Matthews Date: Wed, 3 Sep 2025 15:52:46 +0100 Subject: [PATCH 2/2] Correctly use event filter in subscription --- src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs index 543ae5e..f9095d5 100644 --- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs +++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs @@ -33,7 +33,7 @@ public class KurrentDbSubscriber private CancellationTokenSource _cts; private DateTime _lastStreamPositionTimestamp; private Func _setLastPositions; - + private IEventFilter _eventFilter; private IDispatcher _dispatcher; private ILogger _logger; @@ -294,7 +294,7 @@ public async void Start() private StreamSubscriptionResult CreateSubscription() { - var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval); + var filterOptions = new SubscriptionFilterOptions(_eventFilter ?? EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval); const bool resolveLinkTos = true; @@ -333,6 +333,7 @@ private void Init(KurrentDBClient connection, bool liveOnly = false, IEventFilter eventFilter = null) { + _eventFilter = eventFilter; _logger = logger; _startingPosition = startingPosition; _lastProcessedEventPosition = startingPosition;