Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace PharmaxoScientific.MessageDispatch.KurrentDB;
/// Subscriber for event store.
/// </summary>
public class KurrentDbSubscriber
{
{
/// <summary>
/// Setting this to 100 as 3200 records seems like a sensible balance between checking too often and too infrequently
/// https://docs.kurrent.io/clients/grpc/subscriptions.html#updating-checkpoints-at-regular-intervals
Expand All @@ -33,7 +33,7 @@ public class KurrentDbSubscriber
private CancellationTokenSource _cts;
private DateTime _lastStreamPositionTimestamp;
private Func<Task> _setLastPositions;

private IEventFilter _eventFilter;
private IDispatcher<ResolvedEvent> _dispatcher;
private ILogger _logger;

Expand All @@ -47,15 +47,23 @@ private KurrentDbSubscriber(
IDispatcher<ResolvedEvent> dispatcher,
string streamName,
ILogger logger,
ulong? startingPosition)
=> Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition);
ulong? startingPosition,
IEventFilter eventFilter = null)
=> Init(
kurrentDbClient,
dispatcher,
streamName,
logger,
startingPosition: startingPosition,
eventFilter: eventFilter);

private KurrentDbSubscriber(
KurrentDBClient kurrentDbClient,
IDispatcher<ResolvedEvent> dispatcher,
ILogger logger,
string streamName,
string checkpointFilePath)
string checkpointFilePath,
IEventFilter eventFilter = null)
{
_checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, -1);
var initialCheckpointPosition = _checkpoint.Read();
Expand All @@ -66,7 +74,7 @@ private KurrentDbSubscriber(
startingPosition = (ulong)initialCheckpointPosition;
}

Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition);
Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition, eventFilter: eventFilter);
}

private KurrentDbSubscriber(
Expand Down Expand Up @@ -150,17 +158,20 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition(
/// <param name="kurrentDbClient">KurrentDB connection.</param>
/// <param name="dispatcher">Dispatcher.</param>
/// <param name="logger">Logger.</param>
/// <param name="eventFilter">A filter for server side event filtering.</param>
/// <returns>A new KurrentDbSubscriber object.</returns>
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
KurrentDBClient kurrentDbClient,
IDispatcher<ResolvedEvent> dispatcher,
ILogger logger)
ILogger logger,
IEventFilter eventFilter = null)
=> new KurrentDbSubscriber(
kurrentDbClient,
dispatcher,
AllStreamName,
logger,
null);
null,
eventFilter: eventFilter);

/// <summary>
/// Creates an KurrentDB catchup subscription that is subscribed to all from a position.
Expand All @@ -169,18 +180,21 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
/// <param name="dispatcher">Dispatcher.</param>
/// <param name="logger">Logger.</param>
/// <param name="startingPosition">Starting Position.</param>
/// <param name="eventFilter">A filter for server side event filtering.</param>
/// <returns>A new KurrentDbSubscriber object.</returns>
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition(
KurrentDBClient kurrentDbClient,
IDispatcher<ResolvedEvent> dispatcher,
ILogger logger,
ulong? startingPosition)
ulong? startingPosition,
IEventFilter eventFilter = null)
=> new KurrentDbSubscriber(
kurrentDbClient,
dispatcher,
AllStreamName,
logger,
startingPosition);
startingPosition,
eventFilter);

/// <summary>
/// Creates an KurrentDB catchup subscription subscribed to all using a checkpoint file.
Expand All @@ -189,18 +203,21 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPo
/// <param name="dispatcher">Dispatcher.</param>
/// <param name="logger">Logger.</param>
/// <param name="checkpointFilePath">Path of the checkpoint file.</param>
/// <param name="eventFilter">A filter for server side event filtering.</param>
/// <returns>A new KurrentDbSubscriber object.</returns>
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
KurrentDBClient kurrentDbClient,
IDispatcher<ResolvedEvent> dispatcher,
ILogger logger,
string checkpointFilePath)
string checkpointFilePath,
IEventFilter eventFilter = null)
=> new KurrentDbSubscriber(
kurrentDbClient,
dispatcher,
logger,
AllStreamName,
checkpointFilePath);
checkpointFilePath,
eventFilter);

/// <summary>
/// Start the subscriber.
Expand Down Expand Up @@ -277,7 +294,7 @@ public async void Start()

private StreamSubscriptionResult CreateSubscription()
{
var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval);
var filterOptions = new SubscriptionFilterOptions(_eventFilter ?? EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval);

const bool resolveLinkTos = true;

Expand Down Expand Up @@ -308,14 +325,15 @@ private StreamSubscriptionResult CreateSubscription()
/// </summary>
public void ShutDown() => _cts.Cancel();

private void Init(
KurrentDBClient connection,
private void Init(KurrentDBClient connection,
IDispatcher<ResolvedEvent> dispatcher,
string streamName,
ILogger logger,
ulong? startingPosition = null,
bool liveOnly = false)
bool liveOnly = false,
IEventFilter eventFilter = null)
{
_eventFilter = eventFilter;
_logger = logger;
_startingPosition = startingPosition;
_lastProcessedEventPosition = startingPosition;
Expand All @@ -333,7 +351,8 @@ private void Init(
Direction.Backwards,
Position.End,
maxCount: 1,
resolveLinkTos: false)
resolveLinkTos: false,
eventFilter: eventFilter)
.ToListAsync();

_actualEndOfStreamPosition = lastEventFromStream.First().OriginalEvent.Position.CommitPosition;
Expand Down