diff --git a/LICENSE b/LICENSE index 9d0eab8..855d86f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,9 +1,9 @@ -Copyright (c) 2019, Corsham Science +Copyright (c) 2025, Pharmaxo Scientific All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. -Neither the name of Corsham Science nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. +Neither the name of Pharmaxo Scientific nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md index 1cd6e2d..7d37bc9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# CorshamScience.MessageDispatch.EventStore -A package to use EventStore with CorshamScience.MessageDispatch. +# PharmaxoScientific.MessageDispatch.KurrentDB +A package to use KurrentDB with PharmaxoScientific.MessageDispatch.KurrentDB diff --git a/build.cmd b/build.cmd index d401c47..ca0b654 100644 --- a/build.cmd +++ b/build.cmd @@ -1,10 +1,15 @@ @echo off SET VERSION=0.0.0 -IF NOT [%1]==[] (set VERSION=%1) +IF NOT [%1]==[] (SET VERSION=%1) SET TAG=0.0.0 -IF NOT [%2]==[] (set TAG=%2) +IF NOT [%2]==[] (SET TAG=%2) SET TAG=%TAG:tags/=% -dotnet pack .\src\eventstore\eventstore.csproj -o .\dist -p:Version="%VERSION%" -p:PackageVersion="%VERSION%" -p:Tag="%TAG%" -c Release \ No newline at end of file +dotnet restore .\src\MessageDispatch.KurrentDB.sln -PackagesDirectory .\src\packages -Verbosity detailed + +dotnet format .\src\MessageDispatch.KurrentDB.sln --severity warn --verify-no-changes -v diag +IF %errorlevel% neq 0 EXIT /B %errorlevel% + +dotnet pack .\src\MessageDispatch.KurrentDB\MessageDispatch.KurrentDB.csproj -o .\dist -p:Version="%VERSION%" -p:PackageVersion="%VERSION%" -p:Tag="%TAG%" -c Release \ No newline at end of file diff --git a/build.sh b/build.sh index 534e996..7b920af 100644 --- a/build.sh +++ b/build.sh @@ -9,4 +9,4 @@ if [ -n "$2" ]; then tag="$2" fi tag=${tag/tags\//} -dotnet pack .\\src\\eventstore\\eventstore.csproj -o .\\dist -p:Version="$version" -p:PackageVersion="$version" -p:Tag="$tag" -c Release \ No newline at end of file +dotnet pack .\\src\\MessageDispatch.KurrentDB\\MessageDispatch.KurrentDB.csproj -o .\\dist -p:Version="$version" -p:PackageVersion="$version" -p:Tag="$tag" -c Release \ No newline at end of file diff --git a/src/.editorconfig b/src/.editorconfig new file mode 100644 index 0000000..8799067 --- /dev/null +++ b/src/.editorconfig @@ -0,0 +1,212 @@ +# editorconfig.org + +# Pharmaxo Scientific .net Codestyles editorconfig v1.0 + +# top-most EditorConfig file +root = true + +# Default settings: +# A newline ending every file +# Use 4 spaces as indentation +[*] +insert_final_newline = true +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true + +[project.json] +indent_size = 2 + +# C# and Visual Basic files +[*.{cs,vb}] +charset = utf-8-bom + +# Analyzers +dotnet_analyzer_diagnostic.category-Security.severity = warning +dotnet_code_quality.ca1802.api_surface = private, internal + +# Miscellaneous style rules +dotnet_sort_system_directives_first = true +dotnet_style_predefined_type_for_locals_parameters_members = true:warning +dotnet_style_predefined_type_for_member_access = true:warning + +# avoid this. unless absolutely necessary +dotnet_style_qualification_for_field = false:warning +dotnet_style_qualification_for_property = false:warning +dotnet_style_qualification_for_method = false:warning +dotnet_style_qualification_for_event = false:warning + +# name all constant fields using PascalCase +dotnet_naming_rule.constant_fields_should_be_pascal_case.severity = warning +dotnet_naming_rule.constant_fields_should_be_pascal_case.symbols = constant_fields +dotnet_naming_rule.constant_fields_should_be_pascal_case.style = pascal_case_style +dotnet_naming_symbols.constant_fields.applicable_kinds = field +dotnet_naming_symbols.constant_fields.required_modifiers = const +dotnet_naming_style.pascal_case_style.capitalization = pascal_case + +# static fields should have _ prefix +dotnet_naming_rule.static_fields_should_have_prefix.severity = warning +dotnet_naming_rule.static_fields_should_have_prefix.symbols = static_fields +dotnet_naming_rule.static_fields_should_have_prefix.style = static_prefix_style +dotnet_naming_symbols.static_fields.applicable_kinds = field +dotnet_naming_symbols.static_fields.required_modifiers = static +dotnet_naming_symbols.static_fields.applicable_accessibilities = private, internal, private_protected +dotnet_naming_style.static_prefix_style.required_prefix = _ +dotnet_naming_style.static_prefix_style.capitalization = camel_case + +# internal and private fields should be _camelCase +dotnet_naming_rule.camel_case_for_private_internal_fields.severity = warning +dotnet_naming_rule.camel_case_for_private_internal_fields.symbols = private_internal_fields +dotnet_naming_rule.camel_case_for_private_internal_fields.style = camel_case_underscore_style +dotnet_naming_symbols.private_internal_fields.applicable_kinds = field +dotnet_naming_symbols.private_internal_fields.applicable_accessibilities = private, internal +dotnet_naming_style.camel_case_underscore_style.required_prefix = _ +dotnet_naming_style.camel_case_underscore_style.capitalization = camel_case + +# Code quality +dotnet_style_readonly_field = true:warning +dotnet_code_quality_unused_parameters = non_public:warning + +# Expression-level preferences +dotnet_style_object_initializer = true:suggestion +dotnet_style_collection_initializer = true:suggestion +dotnet_style_explicit_tuple_names = true:suggestion +dotnet_style_coalesce_expression = true:suggestion +dotnet_style_null_propagation = true:suggestion +dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion +dotnet_style_prefer_inferred_tuple_names = true:suggestion +dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion +dotnet_style_prefer_auto_properties = true:suggestion +dotnet_style_prefer_conditional_expression_over_assignment = true:suggestion +dotnet_style_prefer_conditional_expression_over_return = true:suggestion + +# CA2208: Instantiate argument exceptions correctly +dotnet_diagnostic.CA2208.severity = error + +# C# files +[*.cs] +# New line preferences +csharp_new_line_before_open_brace = all +csharp_new_line_before_else = true +csharp_new_line_before_catch = true +csharp_new_line_before_finally = true +csharp_new_line_before_members_in_object_initializers = true +csharp_new_line_before_members_in_anonymous_types = true +csharp_new_line_between_query_expression_clauses = true + +# Experimental New line rules + +csharp_style_allow_blank_lines_between_consecutive_braces_experimental = false:warning +dotnet_style_allow_multiple_blank_lines_experimental = false:warning + +# Indentation preferences +csharp_indent_block_contents = true +csharp_indent_braces = false +csharp_indent_case_contents = true +csharp_indent_case_contents_when_block = true +csharp_indent_switch_labels = true +csharp_indent_labels = one_less_than_current + +# Modifier preferences +csharp_preferred_modifier_order = public,private,protected,internal,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,volatile,async:warning + +# Code style defaults +csharp_using_directive_placement = outside_namespace:warning +csharp_prefer_braces = true:warning +csharp_preserve_single_line_blocks = true:warning +csharp_preserve_single_line_statements = false:warning +csharp_prefer_static_local_function = true:suggestion +csharp_prefer_simple_using_statement = false:none +csharp_style_prefer_switch_expression = true:suggestion + +# Expression-bodied members +csharp_style_expression_bodied_methods = when_on_single_line:suggestion +csharp_style_expression_bodied_constructors = when_on_single_line:suggestion +csharp_style_expression_bodied_operators = when_on_single_line:suggestion +csharp_style_expression_bodied_properties = when_on_single_line:suggestion +csharp_style_expression_bodied_indexers = when_on_single_line:suggestion +csharp_style_expression_bodied_accessors = when_on_single_line:suggestion +csharp_style_expression_bodied_lambdas = when_on_single_line:suggestion +csharp_style_expression_bodied_local_functions = when_on_single_line:suggestion + +# Pattern matching +csharp_style_pattern_matching_over_is_with_cast_check = true:suggestion +csharp_style_pattern_matching_over_as_with_null_check = true:suggestion +csharp_style_inlined_variable_declaration = true:suggestion + +# Expression-level preferences +csharp_prefer_simple_default_expression = true:suggestion + +# Null checking preferences +csharp_style_throw_expression = true:suggestion +csharp_style_conditional_delegate_call = true:suggestion + +# Other features +csharp_style_prefer_index_operator = false:none +csharp_style_prefer_range_operator = false:none +csharp_style_pattern_local_over_anonymous_function = false:none + +# Space preferences +csharp_space_after_cast = false +csharp_space_after_colon_in_inheritance_clause = true +csharp_space_after_comma = true +csharp_space_after_dot = false +csharp_space_after_keywords_in_control_flow_statements = true +csharp_space_after_semicolon_in_for_statement = true +csharp_space_around_binary_operators = before_and_after +csharp_space_around_declaration_statements = do_not_ignore +csharp_space_before_colon_in_inheritance_clause = true +csharp_space_before_comma = false +csharp_space_before_dot = false +csharp_space_before_open_square_brackets = false +csharp_space_before_semicolon_in_for_statement = false +csharp_space_between_empty_square_brackets = false +csharp_space_between_method_call_empty_parameter_list_parentheses = false +csharp_space_between_method_call_name_and_opening_parenthesis = false +csharp_space_between_method_call_parameter_list_parentheses = false +csharp_space_between_method_declaration_empty_parameter_list_parentheses = false +csharp_space_between_method_declaration_name_and_open_parenthesis = false +csharp_space_between_method_declaration_parameter_list_parentheses = false +csharp_space_between_parentheses = false +csharp_space_between_square_brackets = false + +# Namespace preference +csharp_style_namespace_declarations = file_scoped:warning +dotnet_style_namespace_match_folder = true:suggestion + +# Types: Will suggest var in all instances, but does not enforce it. +csharp_style_var_for_built_in_types = true:suggestion +csharp_style_var_when_type_is_apparent = false:none +csharp_style_var_elsewhere = true:suggestion + +# License header (second line required to set severity level for this) +file_header_template = Copyright (c) Pharmaxo. All rights reserved. +dotnet_diagnostic.IDE0073.severity = warning + +# Xml project files +[*.{csproj,vbproj,vcxproj,vcxproj.filters,proj,nativeproj,locproj}] +indent_size = 2 + +# Xml build files +[*.builds] +indent_size = 2 + +# Xml files +[*.{xml,stylecop,resx,ruleset}] +indent_size = 2 + +# Xml config files +[*.{props,targets,config,nuspec}] +indent_size = 2 + +# Shell scripts +[*.sh] +end_of_line = lf + +[*.{cmd, bat}] +end_of_line = crlf + +# Markdown files +[*.md] + # Double trailing spaces can be used for BR tags, and other instances are enforced by Markdownlint +trim_trailing_whitespace = false diff --git a/src/Directory.Build.targets b/src/Directory.Build.targets new file mode 100644 index 0000000..f1bdc6a --- /dev/null +++ b/src/Directory.Build.targets @@ -0,0 +1,5 @@ + + + true + + diff --git a/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj new file mode 100644 index 0000000..126a79b --- /dev/null +++ b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj @@ -0,0 +1,30 @@ + + + + enable + enable + + false + true + net8.0;net481 + latest + + + + + + + + + + + + + + + + + + + + diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs new file mode 100644 index 0000000..92d8bb3 --- /dev/null +++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs @@ -0,0 +1,526 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Text; +using System.Text.Json; +using CorshamScience.MessageDispatch.Core; +using DotNet.Testcontainers.Builders; +using KurrentDB.Client; +using Microsoft.Extensions.Logging.Abstractions; +using PharmaxoScientific.MessageDispatch.KurrentDB; +using Testcontainers.EventStoreDb; + +namespace MessageDispatch.KurrentDB.Tests; + +public class SubscriberTests +{ + private const string StreamName = "stream1"; + private string _connectionString; + private KurrentDBClient _kurrentDbClient; + private AwaitableDispatcherSpy _dispatcher; + private KurrentDbSubscriber? _subscriber; + + [SetUp] + public async Task Setup() + { + const int eventStoreHostPort = 1234; + const string eventStoreVersion = "24.10.5"; + + var eventStoreImageName = RuntimeInformation.OSArchitecture == Architecture.Arm64 + ? $"ghcr.io/eventstore/eventstore:{eventStoreVersion}-alpha-arm64v8" + : $"eventstore/eventstore:{eventStoreVersion}-bookworm-slim"; + + var eventStoreContainer = BuildEventStoreContainer(eventStoreImageName, eventStoreHostPort); + await eventStoreContainer.StartAsync(); + + var mappedHostPort = eventStoreContainer.GetMappedPublicPort(eventStoreHostPort); + _connectionString = $"esdb://admin:changeit@localhost:{mappedHostPort}?tls=true&tlsVerifyCert=false"; + + _kurrentDbClient = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString)); + _dispatcher = new AwaitableDispatcherSpy(); + } + + [TearDown] + public async Task TearDown() + { + await _kurrentDbClient.DisposeAsync(); + _subscriber?.ShutDown(); + } + + [Test] + public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded_DispatchesEventsAndBecomesLive() + { + _subscriber = KurrentDbSubscriber.CreateLiveSubscription( + _kurrentDbClient, + _dispatcher, + StreamName, + new NullLogger()); + + _subscriber.Start(); + + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List events = [event1, event2, event3]; + + await AppendEventsToStreamAsync(event1, event2, event3); + await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(events)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateLiveSubscription_GivenExistingEventsInStreamWhenNewEventsAdded_DispatchesNewEventsAndBecomesLive() + { + _subscriber = KurrentDbSubscriber.CreateLiveSubscription( + _kurrentDbClient, + _dispatcher, + StreamName, + new NullLogger()); + + var oldEvent1 = SimpleEvent.Create(); + var oldEvent2 = SimpleEvent.Create(); + + await AppendEventsToStreamAsync(oldEvent1, oldEvent2); + + _subscriber.Start(); + + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List events = [event1, event2, event3]; + + await AppendEventsToStreamAsync(event1, event2, event3); + await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(events)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_DispatchesEventsAndBecomesLive() + { + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll( + _kurrentDbClient, + _dispatcher, + new NullLogger()); + + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List events = [event1, event2, event3]; + + await AppendEventsToStreamAsync(event1, event2, event3); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(events)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionSubscribedToAll_GivenNoEventsInStreamGivenNewEvents_DispatchesEventsAndBecomesLive() + { + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll( + _kurrentDbClient, + _dispatcher, + new NullLogger()); + + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List events = [event1, event2, event3]; + + _subscriber.Start(); + + await AppendEventsToStreamAsync(event1, event2, event3); + await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(events)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive() + { + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition( + _kurrentDbClient, + _dispatcher, + StreamName, + new NullLogger(), + 1); + + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List events = [event3]; + + await AppendEventsToStreamAsync(event1, event2, event3); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(events)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionSubscribedToAllFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive() + { + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List eventsExpectedToBeDispatched = [event3]; + + await AppendEventsToStreamAsync(event1); + var startingPosition = await AppendEventsToStreamAsync(event2); + await AppendEventsToStreamAsync(event3); + + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllFromPosition( + _kurrentDbClient, + _dispatcher, + new NullLogger(), + startingPosition.LogPosition.CommitPosition); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive() + { + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List eventsExpectedToBeDispatched = [event1, event2, event3]; + + await AppendEventsToStreamAsync(event1); + await AppendEventsToStreamAsync(event2); + await AppendEventsToStreamAsync(event3); + + var checkpointFileName = Path.GetRandomFileName(); + + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint( + _kurrentDbClient, + _dispatcher, + new NullLogger(), + checkpointFileName); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenEventsInStreamAndExistingCheckpointFile_DispatchesEventsFromPositionAndBecomesLive() + { + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List eventsExpectedToBeDispatched = [event3]; + + await AppendEventsToStreamAsync(event1); + var startingPosition = await AppendEventsToStreamAsync(event2); + await AppendEventsToStreamAsync(event3); + + var checkpointFileName = Path.GetRandomFileName(); + var checkpoint = new WriteThroughFileCheckpoint(checkpointFileName); + checkpoint.Write((long)startingPosition.LogPosition.CommitPosition); + + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint( + _kurrentDbClient, + _dispatcher, + new NullLogger(), + checkpointFileName); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive() + { + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List eventsExpectedToBeDispatched = [event1, event2, event3]; + + await AppendEventsToStreamAsync(event1); + await AppendEventsToStreamAsync(event2); + await AppendEventsToStreamAsync(event3); + + var checkpointFileName = Path.GetRandomFileName(); + + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionUsingCheckpoint( + _kurrentDbClient, + _dispatcher, + StreamName, + new NullLogger(), + checkpointFileName); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndExistingCheckpointFile_DispatchesEventsFromPositionAndBecomesLive() + { + var event1 = SimpleEvent.Create(); + var event2 = SimpleEvent.Create(); + var event3 = SimpleEvent.Create(); + + List eventsExpectedToBeDispatched = [event3]; + + await AppendEventsToStreamAsync(event1); + await AppendEventsToStreamAsync(event2); + await AppendEventsToStreamAsync(event3); + + var checkpointFileName = Path.GetRandomFileName(); + + var checkpoint = new WriteThroughFileCheckpoint(checkpointFileName); + checkpoint.Write(1); + + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionUsingCheckpoint( + _kurrentDbClient, + _dispatcher, + StreamName, + new NullLogger(), + checkpointFileName); + + _subscriber.Start(); + + await _dispatcher.WaitForEventsToBeDispatched(event3); + + var deserializedDispatchedEvents = + _dispatcher.DispatchedEvents.Select(DeserializeEventData); + + Assert.Multiple(() => + { + Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched)); + Assert.That(_subscriber.IsLive); + }); + } + + [Test] + public async Task IsLive_WhenCatchingUpUsingLinkedEventsGivenMissingLinkedEvent_ReturnsTrueOnceCaughtUp() + { + await AppendEventsToStreamAsync(SimpleEvent.Create(), SimpleEvent.Create()); + + const string linkedStream = "non-system"; + var event1LinkedData = new EventData( + Uuid.NewUuid(), + SystemEventTypes.LinkTo, + Encoding.UTF8.GetBytes($"0@{StreamName}") + ); + + var event2LinkedData = new EventData( + Uuid.NewUuid(), + SystemEventTypes.LinkTo, + Encoding.UTF8.GetBytes($"1@{StreamName}") + ); + + var deletedLinkData = new EventData( + Uuid.NewUuid(), + SystemEventTypes.LinkTo, + Encoding.UTF8.GetBytes($"2@{StreamName}") + ); + + await _kurrentDbClient.AppendToStreamAsync( + linkedStream, + StreamState.NoStream, + [event1LinkedData, event2LinkedData, deletedLinkData]); + + _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition( + _kurrentDbClient, + _dispatcher, + linkedStream, + new NullLogger(), + null); + + _subscriber.Start(); + + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed < TimeSpan.FromSeconds(5)) + { + if (_subscriber.IsLive) + { + break; + } + + Thread.Sleep(TimeSpan.FromMilliseconds(100)); + } + + Assert.That(_subscriber.IsLive, "Subscriber was not live"); + } + + private class SimpleEvent + { + // ReSharper disable once UnusedAutoPropertyAccessor.Local + // ReSharper disable once MemberCanBePrivate.Local + public Guid Id { get; } + + // ReSharper disable once MemberCanBePrivate.Local + public SimpleEvent(Guid id) => Id = id; + + public static SimpleEvent Create() => new(Guid.NewGuid()); + + public override bool Equals(object? obj) => obj is SimpleEvent other && Id.Equals(other.Id); + + public override int GetHashCode() => Id.GetHashCode(); + + public override string ToString() => Id.ToString(); + } + + private class AwaitableDispatcherSpy : IDispatcher + { + public List DispatchedEvents { get; } = []; + + public void Dispatch(ResolvedEvent message) => DispatchedEvents.Add(message); + + public Task WaitForEventsToBeDispatched(params object[] events) + { + if (events.Length == 0) + { + return Task.CompletedTask; + } + + var iterations = 0; + while (DispatchedEvents.Count != events.Length) + { + Thread.Sleep(100); + iterations++; + + if (iterations > 10) + { + throw new TimeoutException("Expected events weren't dispatched within the allotted time."); + } + } + + return Task.CompletedTask; + } + } + + private static T? DeserializeEventData(ResolvedEvent message) => + JsonSerializer.Deserialize(Encoding.UTF8.GetString(message.Event.Data.Span.ToArray())); + + private static EventStoreDbContainer BuildEventStoreContainer(string imageName, int hostPort) => + new EventStoreDbBuilder() + .WithImage(imageName) + .WithCleanUp(true) + .WithCreateParameterModifier(cmd => cmd.User = "root") + .WithPortBinding(hostPort, true) + .WithEnvironment(new Dictionary + { + { "EVENTSTORE_DEV", "true" }, + { "EVENTSTORE_INSECURE", "false" }, + { "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true" }, + { "EVENTSTORE_HTTP_PORT", hostPort.ToString() }, + { "EVENTSTORE_RUN_PROJECTIONS", "All" }, + }) + .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(hostPort)) + .Build(); + + private static EventData ToEventData(object data, JsonSerializerOptions? options = null) + { + var metaData = new { ClrType = data.GetType().AssemblyQualifiedName, }; + + var type = data.GetType().Name; + + return new EventData( + Uuid.NewUuid(), + type, + Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data, options)), + Encoding.UTF8.GetBytes(JsonSerializer.Serialize(metaData, options))); + } + + private async Task AppendEventsToStreamAsync(params object[] events) + { + var eventData = events.Select(e => ToEventData(e)); + var client = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString)); + + return await client.AppendToStreamAsync(StreamName, StreamState.Any, eventData); + } +} diff --git a/src/CR.MessageDispatch.EventStore.sln b/src/MessageDispatch.KurrentDB.sln similarity index 53% rename from src/CR.MessageDispatch.EventStore.sln rename to src/MessageDispatch.KurrentDB.sln index 3abbc73..b502ac7 100644 --- a/src/CR.MessageDispatch.EventStore.sln +++ b/src/MessageDispatch.KurrentDB.sln @@ -1,9 +1,11 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.27130.2010 +# Visual Studio Version 17 +VisualStudioVersion = 17.12.35931.192 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "eventstore", "eventstore\eventstore.csproj", "{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessageDispatch.KurrentDB", "MessageDispatch.KurrentDB\MessageDispatch.KurrentDB.csproj", "{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessageDispatch.KurrentDB.Tests", "MessageDispatch.KurrentDB.Tests\MessageDispatch.KurrentDB.Tests.csproj", "{10045A11-D589-4A7F-BC17-C4CE508B04F4}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -15,6 +17,10 @@ Global {8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Debug|Any CPU.Build.0 = Debug|Any CPU {8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Release|Any CPU.ActiveCfg = Release|Any CPU {8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Release|Any CPU.Build.0 = Release|Any CPU + {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/MessageDispatch.KurrentDB/CatchupProgress.cs b/src/MessageDispatch.KurrentDB/CatchupProgress.cs new file mode 100644 index 0000000..fe59e1d --- /dev/null +++ b/src/MessageDispatch.KurrentDB/CatchupProgress.cs @@ -0,0 +1,82 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// Class to handle calculating catchup progress. +/// +public class CatchupProgress +{ + /// + /// Initializes a new instance of the class. + /// + /// The last processed event position. + /// The name of the stream which is being caught up on (or $all if this is subscribed to all). + /// The end of the stream position (stream position for stream subscription of commit position for all subscription). + /// The starting position (Event number for stream subscription or commit position for all subscription). + /// Whether the subscriber is subscribed to all. + public CatchupProgress( + ulong lastProcessedEventPosition, + string streamName, + ulong endOfStreamPosition, + ulong startPosition, + bool subscribeToAll) + { + IsAllSubscription = subscribeToAll; + LastProcessedEventPosition = lastProcessedEventPosition; + StartPosition = startPosition; + StreamName = streamName; + EndOfStreamPosition = endOfStreamPosition; + } + + /// + /// Gets a value indicating whether the subscriber is subscribed to all. + /// + public bool IsAllSubscription { get; } + + /// + /// Gets the name of the stream ($all if this is an all subscription). + /// + public string StreamName { get; } + + /// + /// Gets the starting position (Event number for stream subscription or commit position for all subscription). + /// + public ulong StartPosition { get; } + + /// + /// Gets the last processed event (stream position for stream subscription of commit position for all subscription). + /// + public ulong LastProcessedEventPosition { get; } + + /// + /// Gets the end of the stream position (stream position for stream subscription of commit position for all subscription). + /// + public ulong EndOfStreamPosition { get; } + + /// + /// Gets the percentage of events in the stream which have been processed (either by number of events or position in the transaction log). + /// + public decimal OverallPercentage => + LastProcessedEventPosition == 0 || EndOfStreamPosition == 0 + ? 0.0m + : (decimal)LastProcessedEventPosition / EndOfStreamPosition * 100; + + /// + /// Gets the percentage of events in the stream which require catching up on, which have been processed (either by number of events or position in the transaction log). + /// + public decimal CatchupPercentage => + LastProcessedEventPosition - StartPosition == 0 || EndOfStreamPosition - StartPosition == 0 + ? 0.0m + : ((decimal)LastProcessedEventPosition - StartPosition) / (EndOfStreamPosition - StartPosition) * 100; + + /// + /// Generates a string describing the state of the stream catch up progress. + /// + /// A string describing the state of the stream catch up progress. + public override string ToString() + { + return + $"[{StreamName}] Overall Pos: {OverallPercentage:0.#}% ({LastProcessedEventPosition}/{EndOfStreamPosition}), Caught up: {CatchupPercentage:0.#}% ({LastProcessedEventPosition - StartPosition}/{EndOfStreamPosition - StartPosition})"; + } +} diff --git a/src/MessageDispatch.KurrentDB/CheckpointingWrappingDispatcher.cs b/src/MessageDispatch.KurrentDB/CheckpointingWrappingDispatcher.cs new file mode 100644 index 0000000..2535270 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/CheckpointingWrappingDispatcher.cs @@ -0,0 +1,76 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System; +using System.Text; +using CorshamScience.MessageDispatch.Core; +using KurrentDB.Client; +using Newtonsoft.Json.Linq; + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// A wrapping event dispatcher which keeps track of a checkpoint, and whether the dispatched event has been previously processed or not. +/// +public class CheckpointingWrappingDispatcher : KurrentDbAggregateEventDispatcher +{ + private readonly WriteThroughFileCheckpoint _checkpoint; + private readonly long _startupCheckpointValue; + + /// + /// Initializes a new instance of the class. + /// + /// The file to write a checkpoint to. + /// The initial value to write. + /// The handler methods for processing messages with. + /// The metadata key. + public CheckpointingWrappingDispatcher( + string checkpointFilePath, + long initValue, + IMessageHandlerLookup handlers, + string metadataKey = null) + : base(handlers, null, metadataKey) + { + _checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, initValue); + _startupCheckpointValue = _checkpoint.Read(); + } + + /// + public override void Dispatch(ResolvedEvent message) + { + base.Dispatch(message); + + var previouslyProcessed = message.OriginalEventNumber.ToInt64() <= _startupCheckpointValue; + + if (previouslyProcessed) + { + return; + } + + _checkpoint.Write(message.OriginalEventNumber.ToInt64()); + } + + /// + protected override bool TryDeserialize( + Type messageType, + ResolvedEvent rawMessage, + out object deserialized) + { + deserialized = null!; + + var previouslyProcessed = rawMessage.OriginalEventNumber.ToInt64() <= _startupCheckpointValue; + + try + { + var jsonString = Encoding.UTF8.GetString(rawMessage.Event.Data.Span.ToArray()); + var @event = JObject.Parse(jsonString); + + deserialized = new EventWrapper(@event, previouslyProcessed); + + return deserialized != null; + } + catch (Exception) + { + return false; + } + } +} diff --git a/src/MessageDispatch.KurrentDB/EventWrapper.cs b/src/MessageDispatch.KurrentDB/EventWrapper.cs new file mode 100644 index 0000000..0e4abb2 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/EventWrapper.cs @@ -0,0 +1,35 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using Newtonsoft.Json.Linq; + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// Represents a wrapper for an event. +/// +public class EventWrapper +{ + /// + /// Initializes a new instance of the class. + /// + /// The event to wrap. + /// A value indicating whether the event has been previously processed. + public EventWrapper(JObject @event, bool previouslyProcessed) + { + Event = @event; + PreviouslyProcessed = previouslyProcessed; + } + + /// + /// Gets a value indicating whether this event has been previously processed before. + /// + public bool PreviouslyProcessed { get; } + + /// + /// Gets the event data. + /// + public JObject Event { get; } = null!; + + /// + public override string ToString() => $"Event: {Event}, PreviouslyProcessed: {PreviouslyProcessed}"; +} diff --git a/src/eventstore/GlobalSuppressions.cs b/src/MessageDispatch.KurrentDB/GlobalSuppressions.cs similarity index 69% rename from src/eventstore/GlobalSuppressions.cs rename to src/MessageDispatch.KurrentDB/GlobalSuppressions.cs index 758d81e..ac35db8 100644 --- a/src/eventstore/GlobalSuppressions.cs +++ b/src/MessageDispatch.KurrentDB/GlobalSuppressions.cs @@ -1,5 +1,3 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// +// Copyright (c) Pharmaxo. All rights reserved. [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1633:File should have header", Justification = "The file was taken from the eventstore codebase so it is not valid to add a copyright header.", Scope = "namespace", Target = "~N:CorshamScience.MessageDispatch.EventStore")] diff --git a/src/MessageDispatch.KurrentDB/KurrentDbAggregateEventDispatcher.cs b/src/MessageDispatch.KurrentDB/KurrentDbAggregateEventDispatcher.cs new file mode 100644 index 0000000..5556ba1 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/KurrentDbAggregateEventDispatcher.cs @@ -0,0 +1,124 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Text; +using CorshamScience.MessageDispatch.Core; +using KurrentDB.Client; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// +/// A deserializing event dispatcher for events produced by CorshamScience.AggregatRepository. +/// +// ReSharper disable once UnusedMember.Global +public class KurrentDbAggregateEventDispatcher : DeserializingMessageDispatcher +{ + private readonly JsonSerializerSettings _serializerSettings; + + private readonly Dictionary _typeCache = new(); + private readonly string _metadataKey; + +#pragma warning disable SA1648 // inheritdoc should be used with inheriting class + /// + /// + /// Initializes a new instance of the class. + /// + /// The handler methods for processing messages with. + /// Determines the settings for the JSON serialization of events. + /// Optional parameter for the metadata key. Default is "ClrType" + // ReSharper disable once UnusedMember.Global + public KurrentDbAggregateEventDispatcher( + IMessageHandlerLookup handlers, + JsonSerializerSettings serializerSettings = null, + string metadataKey = null) + : base(handlers) + { + _serializerSettings = serializerSettings ?? new JsonSerializerSettings(); + _metadataKey = metadataKey ?? "ClrType"; + } +#pragma warning restore SA1648 // inheritdoc should be used with inheriting class + + /// + protected override bool TryGetMessageType(ResolvedEvent rawMessage, out Type type) + { + type = null; + + // optimization: don't even bother trying to deserialize metadata for system events + if (rawMessage.Event.EventType.StartsWith("$") || rawMessage.Event.Metadata.Length == 0) + { + return false; + } + + try + { + IDictionary metadata = JObject.Parse(Encoding.UTF8.GetString(rawMessage.Event.Metadata.Span.ToArray())); + + if (!metadata.ContainsKey(_metadataKey)) + { + return false; + } + + var typeString = (string)metadata[_metadataKey]; + + if (!_typeCache.TryGetValue(typeString, out var cached)) + { + try + { + cached = Type.GetType( + typeString, + (assemblyName) => + { + assemblyName.Version = null; + return System.Reflection.Assembly.Load(assemblyName); + }, + null, + true, + true); + } + catch (Exception) + { + cached = typeof(TypeNotFound); + } + + _typeCache.Add(typeString, cached); + } + + if (cached?.Name.Equals("TypeNotFound") ?? false) + { + return false; + } + + type = cached; + return true; + } + catch (Exception) + { + return false; + } + } + + /// + protected override bool TryDeserialize(Type messageType, ResolvedEvent rawMessage, out object deserialized) + { + deserialized = null; + + try + { + var jsonString = Encoding.UTF8.GetString(rawMessage.Event.Data.Span.ToArray()); + deserialized = JsonConvert.DeserializeObject(jsonString, messageType, _serializerSettings); + return deserialized != null; + } + catch (Exception) + { + return false; + } + } + + private class TypeNotFound + { + } +} diff --git a/src/MessageDispatch.KurrentDB/KurrentDbJObjectDispatcher.cs b/src/MessageDispatch.KurrentDB/KurrentDbJObjectDispatcher.cs new file mode 100644 index 0000000..bf37432 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/KurrentDbJObjectDispatcher.cs @@ -0,0 +1,53 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System; +using System.Text; +using CorshamScience.MessageDispatch.Core; +using KurrentDB.Client; +using Newtonsoft.Json.Linq; + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// +/// A message dispatcher that deserializes messages to a JObject upon dispatch. +/// +// ReSharper disable once UnusedMember.Global +public class KurrentDbjObjectDispatcher : DeserializingMessageDispatcher +{ +#pragma warning disable SA1648 // inheritdoc should be used with inheriting class + /// + /// + /// Initializes a new instance of the class. + /// + /// Lookups for the handlers which the class can use to process messages. + // ReSharper disable once UnusedMember.Global + public KurrentDbjObjectDispatcher(IMessageHandlerLookup handlers) + : base(handlers) + { + } +#pragma warning restore SA1648 // inheritdoc should be used with inheriting class + + /// + protected override bool TryGetMessageType(ResolvedEvent rawMessage, out string type) + { + type = rawMessage.Event.EventType; + return true; + } + + /// + protected override bool TryDeserialize(string messageType, ResolvedEvent rawMessage, out object deserialized) + { + deserialized = null; + + try + { + deserialized = JObject.Parse(Encoding.UTF8.GetString(rawMessage.Event.Data.Span.ToArray())); + return true; + } + catch (Exception) + { + return false; + } + } +} diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs new file mode 100644 index 0000000..8038ef5 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs @@ -0,0 +1,420 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using CorshamScience.MessageDispatch.Core; +using KurrentDB.Client; +using Microsoft.Extensions.Logging; +using static KurrentDB.Client.KurrentDBClient; + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// Subscriber for event store. +/// +public class KurrentDbSubscriber +{ + /// + /// Setting this to 100 as 3200 records seems like a sensible balance between checking too often and too infrequently + /// https://docs.kurrent.io/clients/grpc/subscriptions.html#updating-checkpoints-at-regular-intervals + /// + private const uint CheckpointInterval = 100; + private const string AllStreamName = "$all"; + private readonly WriteThroughFileCheckpoint _checkpoint; + private KurrentDBClient _kurrentDbClient; + private ulong? _startingPosition; + private string _streamName; + private bool _liveOnly; + private bool _subscribeToAll; + private ulong? _lastProcessedEventPosition; + private ulong _actualEndOfStreamPosition; + private CancellationTokenSource _cts; + private DateTime _lastStreamPositionTimestamp; + private Func _setLastPositions; + + private IDispatcher _dispatcher; + private ILogger _logger; + + /// + /// Gets a value indicating whether the view model is ready or not. + /// + public bool IsLive; + + private KurrentDbSubscriber( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + string streamName, + ILogger logger, + ulong? startingPosition) + => Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + + private KurrentDbSubscriber( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + ILogger logger, + string streamName, + string checkpointFilePath) + { + _checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, -1); + var initialCheckpointPosition = _checkpoint.Read(); + ulong? startingPosition = null; + + if (initialCheckpointPosition != -1) + { + startingPosition = (ulong)initialCheckpointPosition; + } + + Init(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + } + + private KurrentDbSubscriber( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + string streamName, + ILogger logger) + => Init(kurrentDbClient, dispatcher, streamName, logger, liveOnly: true); + + /// + /// Gets a new catchup progress object. + /// + // ReSharper disable once UnusedMember.Global + public CatchupProgress CatchupProgress + { + get + { + var lastStreamPosition = GetEndOfStreamPosition().Result; + + return new CatchupProgress( + _lastProcessedEventPosition ?? 0, + _streamName, + lastStreamPosition, + _startingPosition ?? 0, + _subscribeToAll); + } + } + + /// + /// Creates a live KurrentDB subscription. + /// + /// KurrentDB connection. + /// Dispatcher. + /// Stream name to push events into. + /// Logger. + /// A new KurrentDbSubscriber object. + public static KurrentDbSubscriber CreateLiveSubscription( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + string streamName, + ILogger logger) + => new KurrentDbSubscriber(kurrentDbClient, dispatcher, streamName, logger); + + /// + /// Creates an KurrentDB catchup subscription using a checkpoint file. + /// + /// KurrentDB connection. + /// Dispatcher. + /// Stream name to push events into. + /// Logger. + /// Path of the checkpoint file. + /// A new KurrentDbSubscriber object. + public static KurrentDbSubscriber CreateCatchupSubscriptionUsingCheckpoint( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + string streamName, + ILogger logger, + string checkpointFilePath) + => new KurrentDbSubscriber(kurrentDbClient, dispatcher, logger, streamName, checkpointFilePath); + + /// + /// Creates an KurrentDB catchup subscription from a position. + /// + /// KurrentDB connection. + /// Dispatcher. + /// Stream name to push events into. + /// Logger. + /// Starting Position. + /// A new KurrentDbSubscriber object. + public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + string streamName, + ILogger logger, + ulong? startingPosition) + => new KurrentDbSubscriber(kurrentDbClient, dispatcher, streamName, logger, startingPosition); + + /// + /// Creates an KurrentDB catchup subscription that is subscribed to all from the start. + /// + /// KurrentDB connection. + /// Dispatcher. + /// Logger. + /// A new KurrentDbSubscriber object. + public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + ILogger logger) + => new KurrentDbSubscriber( + kurrentDbClient, + dispatcher, + AllStreamName, + logger, + null); + + /// + /// Creates an KurrentDB catchup subscription that is subscribed to all from a position. + /// + /// KurrentDB connection. + /// Dispatcher. + /// Logger. + /// Starting Position. + /// A new KurrentDbSubscriber object. + public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + ILogger logger, + ulong? startingPosition) + => new KurrentDbSubscriber( + kurrentDbClient, + dispatcher, + AllStreamName, + logger, + startingPosition); + + /// + /// Creates an KurrentDB catchup subscription subscribed to all using a checkpoint file. + /// + /// KurrentDB connection. + /// Dispatcher. + /// Logger. + /// Path of the checkpoint file. + /// A new KurrentDbSubscriber object. + public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint( + KurrentDBClient kurrentDbClient, + IDispatcher dispatcher, + ILogger logger, + string checkpointFilePath) + => new KurrentDbSubscriber( + kurrentDbClient, + dispatcher, + logger, + AllStreamName, + checkpointFilePath); + + /// + /// Start the subscriber. + /// + public async void Start() + { + _cts = new CancellationTokenSource(); + + while (true) + { + try + { + var subscription = CreateSubscription(); + _logger.LogInformation("Subscribed to '{StreamName}'", _streamName); + + await foreach (var message in subscription.Messages) + { + switch (message) + { + case StreamMessage.Event(var @event): + ProcessEvent(@event); + + var lastProcessedEventPosition = GetLastProcessedPosition(@event); + + if (_liveOnly && _lastProcessedEventPosition is null) + { + _startingPosition = lastProcessedEventPosition; + } + + _lastProcessedEventPosition = lastProcessedEventPosition; + break; + case StreamMessage.AllStreamCheckpointReached(var allPosition): + _lastProcessedEventPosition = allPosition.CommitPosition; + WriteCheckpoint((ulong)_lastProcessedEventPosition); + break; + case StreamMessage.CaughtUp: + _logger.LogInformation("Stream caught up: {0}", _streamName); + IsLive = true; + break; + case StreamMessage.FellBehind: + _logger.LogWarning("Stream falling behind: {0}", _streamName); + IsLive = false; + break; + } + } + } + // User initiated drop, do not resubscribe + catch (OperationCanceledException ex) + { + IsLive = false; + _logger.LogInformation(ex, "Event Store subscription dropped {0}", SubscriptionDroppedReason.Disposed); + break; + } + // User initiated drop, do not resubscribe + catch (ObjectDisposedException ex) + { + IsLive = false; + _logger.LogInformation(ex, "Event Store subscription dropped {0}", SubscriptionDroppedReason.Disposed); + break; + } + catch (Exception ex) + { + IsLive = false; + _logger.LogError(ex, "Event Store subscription dropped {0}", SubscriptionDroppedReason.SubscriberError); + Console.WriteLine(ex); + } + + // Sleep between reconnections to not flood the database or not kill the CPU with infinite loop + // Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time + await Task.Delay(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000)); + } + } + + private StreamSubscriptionResult CreateSubscription() + { + var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents(), checkpointInterval: CheckpointInterval); + + const bool resolveLinkTos = true; + + if (_subscribeToAll) + { + var subscriptionStart = FromAll.End; + if (!_liveOnly) + { + subscriptionStart = _startingPosition.HasValue ? FromAll.After(new Position(_startingPosition.Value, _startingPosition.Value)) : FromAll.Start; + } + + return _kurrentDbClient.SubscribeToAll(subscriptionStart, resolveLinkTos, filterOptions, cancellationToken: _cts.Token); + } + else + { + var subscriptionStart = FromStream.End; + if (!_liveOnly) + { + subscriptionStart = _startingPosition.HasValue ? FromStream.After(new StreamPosition(_startingPosition.Value)) : FromStream.Start; + } + + return _kurrentDbClient.SubscribeToStream(_streamName, subscriptionStart, resolveLinkTos, cancellationToken: _cts.Token); + } + } + + /// + /// Shut down the subscription. + /// + public void ShutDown() => _cts.Cancel(); + + private void Init( + KurrentDBClient connection, + IDispatcher dispatcher, + string streamName, + ILogger logger, + ulong? startingPosition = null, + bool liveOnly = false) + { + _logger = logger; + _startingPosition = startingPosition; + _lastProcessedEventPosition = startingPosition; + _dispatcher = dispatcher; + _streamName = streamName; + _kurrentDbClient = connection; + _liveOnly = liveOnly; + _subscribeToAll = streamName == AllStreamName; + _lastStreamPositionTimestamp = DateTime.MinValue; + + _setLastPositions = _subscribeToAll + ? async () => + { + var lastEventFromStream = await _kurrentDbClient.ReadAllAsync( + Direction.Backwards, + Position.End, + maxCount: 1, + resolveLinkTos: false) + .ToListAsync(); + + _actualEndOfStreamPosition = lastEventFromStream.First().OriginalEvent.Position.CommitPosition; + } + : async () => + { + var lastEventFromStream = await _kurrentDbClient.ReadStreamAsync( + Direction.Backwards, + _streamName, + StreamPosition.End, + maxCount: 1, + resolveLinkTos: false) + .ToListAsync(); + + _actualEndOfStreamPosition = lastEventFromStream.First().OriginalEventNumber.ToUInt64(); + }; + } + + private void ProcessEvent(ResolvedEvent resolvedEvent) + { + // ReSharper disable once ConditionIsAlwaysTrueOrFalse - the linked event could be null if the original event was deleted. + if (resolvedEvent.Event is null || resolvedEvent.Event.EventType.StartsWith("$")) + { + return; + } + + try + { + _dispatcher.Dispatch(resolvedEvent); + + var checkpointNumber = GetLastProcessedPosition(resolvedEvent); + + WriteCheckpoint(checkpointNumber); + _logger.LogTrace( + "Event dispatched from subscriber ({0}/{1})", + resolvedEvent.Event.EventStreamId, + resolvedEvent.Event.EventNumber); + } + catch (Exception ex) + { + _logger.LogError( + ex, + "Error dispatching event from subscriber ({0}/{1})", + resolvedEvent.Event.EventStreamId, + resolvedEvent.Event.EventNumber); + } + } + + private ulong GetLastProcessedPosition(ResolvedEvent resolvedEvent) => + _subscribeToAll + ? resolvedEvent.OriginalEvent.Position.CommitPosition + : resolvedEvent.OriginalEventNumber.ToUInt64(); + + private void WriteCheckpoint(ulong checkpointNumber) + { + if (_checkpoint == null) + { + return; + } + + if (checkpointNumber > long.MaxValue) + { + _logger.LogError( + "Value is too large to be checkpointed. Checkpoint number {CheckpointNumber}", + checkpointNumber); + return; + } + + _checkpoint.Write((long)checkpointNumber); + _logger.LogTrace("Checkpoint written. Checkpoint number {CheckpointNumber}", checkpointNumber); + } + + private async Task GetEndOfStreamPosition() + { + var streamPositionIsStale = (DateTime.UtcNow - _lastStreamPositionTimestamp) > TimeSpan.FromSeconds(10); + + if (!_cts.Token.IsCancellationRequested && streamPositionIsStale) + { + await _setLastPositions(); + _lastStreamPositionTimestamp = DateTime.UtcNow; + } + + return _actualEndOfStreamPosition; + } +} diff --git a/src/eventstore/Logo.png b/src/MessageDispatch.KurrentDB/Logo.png similarity index 100% rename from src/eventstore/Logo.png rename to src/MessageDispatch.KurrentDB/Logo.png diff --git a/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj b/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj new file mode 100644 index 0000000..09f98b3 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj @@ -0,0 +1,50 @@ + + + + net8.0;net481 + latest + PharmaxoScientific.MessageDispatch.KurrentDB + PharmaxoScientific.MessageDispatch.KurrentDB + Pharmaxo Scientific + Pharmaxo Scientific + Pharmaxo Scientific + https://github.com/qphl/MessageDispatch.KurrentDB/blob/master/LICENSE + https://github.com/qphl/MessageDispatch.KurrentDB + https://github.com/qphl/MessageDispatch.KurrentDB + Message Dispatching, Event Sourcing, KurrentDB + PharmaxoScientific.MessageDispatch.KurrentDB + PharmaxoScientific.MessageDispatch.KurrentDB + A package to use KurrentDB to get Events to Dispatch using PharmaxoScientific.MessageDispatch.KurrentDB. + https://GitHub.com/qphl/MessageDispatch.KurrentDB/releases/tag/$(Tag) + BSD-3-Clause + Logo.png + + + + bin\Debug\net8.0\PharmaxoScientific.MessageDispatch.KurrentDB.xml + + + + bin\Release\net8.0\PharmaxoScientific.MessageDispatch.KurrentDB.xml + true + + + + + + + + + + + + + True + + + + + + + + diff --git a/src/MessageDispatch.KurrentDB/NoSynchronizationContextScope.cs b/src/MessageDispatch.KurrentDB/NoSynchronizationContextScope.cs new file mode 100644 index 0000000..965a64c --- /dev/null +++ b/src/MessageDispatch.KurrentDB/NoSynchronizationContextScope.cs @@ -0,0 +1,34 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System; +using System.Threading; + +/* This file is taken from Event Store codebase + https://github.com/EventStore/samples/blob/main/CQRS_Flow/.NET/Core/Core/Threading/NoSynchronizationContextScope.cs + As such we should not add a Pharmaxo Scientific copyright file header */ + +// ReSharper disable InconsistentNaming +#pragma warning disable CS8632, SA1600, SX1309 + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +internal static class NoSynchronizationContextScope +{ + public static Disposable Enter() + { + var context = SynchronizationContext.Current; + SynchronizationContext.SetSynchronizationContext(null); + return new Disposable(context); + } + + public struct Disposable : IDisposable + { + private readonly SynchronizationContext? _synchronizationContext; + + public Disposable(SynchronizationContext? synchronizationContext) => _synchronizationContext = synchronizationContext; + + public void Dispose() => + SynchronizationContext.SetSynchronizationContext(_synchronizationContext); + } +} +#pragma warning restore CS8632, SA1600, SX1309 diff --git a/src/MessageDispatch.KurrentDB/SimpleKurrentDbDispatcher.cs b/src/MessageDispatch.KurrentDB/SimpleKurrentDbDispatcher.cs new file mode 100644 index 0000000..8ee6867 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/SimpleKurrentDbDispatcher.cs @@ -0,0 +1,61 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Text; +using CorshamScience.MessageDispatch.Core; +using KurrentDB.Client; +using Newtonsoft.Json; + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// +/// A simple event store dispatcher. +/// +public class SimpleKurrentDbDispatcher : DeserializingMessageDispatcher +{ + private readonly Dictionary _eventTypeMapping; + private readonly JsonSerializerSettings _serializerSettings; + +#pragma warning disable SA1648 // inheritdoc should be used with inheriting class + /// + /// + /// Initializes a new instance of the class. + /// + /// Message handler lookup of a type. + /// Event Type Map. + /// Json Serializer settings. + // ReSharper disable once UnusedMember.Global + public SimpleKurrentDbDispatcher(IMessageHandlerLookup handlers, Dictionary eventTypeMapping, JsonSerializerSettings serializerSettings = null) + : base(handlers) + { + _eventTypeMapping = eventTypeMapping; + _serializerSettings = serializerSettings ?? new JsonSerializerSettings(); + } +#pragma warning restore SA1648 // inheritdoc should be used with inheriting class + + /// + protected override bool TryGetMessageType(ResolvedEvent rawMessage, out Type type) + { + var eventType = rawMessage.Event.EventType; + return _eventTypeMapping.TryGetValue(eventType, out type); + } + + /// + protected override bool TryDeserialize(Type messageType, ResolvedEvent rawMessage, out object deserialized) + { + deserialized = null; + + try + { + var jsonString = Encoding.UTF8.GetString(rawMessage.Event.Data.Span.ToArray()); + deserialized = JsonConvert.DeserializeObject(jsonString, messageType, _serializerSettings); + return deserialized != null; + } + catch (Exception) + { + return false; + } + } +} diff --git a/src/MessageDispatch.KurrentDB/WriteThroughFileCheckpoint.cs b/src/MessageDispatch.KurrentDB/WriteThroughFileCheckpoint.cs new file mode 100644 index 0000000..6590451 --- /dev/null +++ b/src/MessageDispatch.KurrentDB/WriteThroughFileCheckpoint.cs @@ -0,0 +1,77 @@ +// Copyright (c) Pharmaxo. All rights reserved. + +using System.IO; + +#pragma warning disable CA1001 + +namespace PharmaxoScientific.MessageDispatch.KurrentDB; + +/// +/// Writes a checkpoint to a file pulled from event store. +/// +internal class WriteThroughFileCheckpoint +{ + private readonly object _streamLock = new(); + private readonly FileStream _fileStream; + private readonly BinaryReader _reader; + private readonly BinaryWriter _writer; + private long? _lastWritten; + + /// + /// Initializes a new instance of the class. + /// + /// The file to write a checkpoint to. + /// The initial value to write. + public WriteThroughFileCheckpoint(string filePath, long initValue = 0L) + { + var alreadyExisted = File.Exists(filePath); + + _fileStream = new FileStream(filePath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite); + + if (_fileStream.Length != sizeof(long)) + { + _fileStream.SetLength(8); + } + + _reader = new BinaryReader(_fileStream); + _writer = new BinaryWriter(_fileStream); + + if (!alreadyExisted) + { + Write(initValue); + } + } + + /// + /// Reads the current checkpoint. + /// + /// The current checkpoint. + public long Read() + { + lock (_streamLock) + { + if (_lastWritten.HasValue) + { + return _lastWritten.Value; + } + + _fileStream.Seek(0, SeekOrigin.Begin); + return _reader.ReadInt64(); + } + } + + /// + /// Writes the checkpoint value and optionally flushes the underlying stream. + /// + /// The checkpoint value to write. + public void Write(long checkpoint) + { + lock (_streamLock) + { + _fileStream.Seek(0, SeekOrigin.Begin); + _writer.Write(checkpoint); + _lastWritten = checkpoint; + _fileStream.Flush(flushToDisk: true); + } + } +} diff --git a/src/eventstore/CatchupProgress.cs b/src/eventstore/CatchupProgress.cs deleted file mode 100644 index 3da9850..0000000 --- a/src/eventstore/CatchupProgress.cs +++ /dev/null @@ -1,85 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - /// - /// Class to handle calculating catchup progress. - /// - public class CatchupProgress - { - /// - /// Initializes a new instance of the class. - /// - /// The last processed event position. - /// The name of the stream which is being caught up on (or $all if this is subscribed to all). - /// The end of the stream position (stream position for stream subscription of commit position for all subscription). - /// The starting position (Event number for stream subscription or commit position for all subscription). - /// Whether the subscriber is subscribed to all. - public CatchupProgress( - ulong lastProcessedEventPosition, - string streamName, - ulong endOfStreamPosition, - ulong startPosition, - bool subscribeToAll) - { - IsAllSubscription = subscribeToAll; - LastProcessedEventPosition = lastProcessedEventPosition; - StartPosition = startPosition; - StreamName = streamName; - EndOfStreamPosition = endOfStreamPosition; - } - - /// - /// Gets a value indicating whether the subscriber is subscribed to all. - /// - public bool IsAllSubscription { get; } - - /// - /// Gets the name of the stream ($all if this is an all subscription). - /// - public string StreamName { get; } - - /// - /// Gets the starting position (Event number for stream subscription or commit position for all subscription). - /// - public ulong StartPosition { get; } - - /// - /// Gets the last processed event (stream position for stream subscription of commit position for all subscription). - /// - public ulong LastProcessedEventPosition { get; } - - /// - /// Gets the end of the stream position (stream position for stream subscription of commit position for all subscription). - /// - public ulong EndOfStreamPosition { get; } - - /// - /// Gets the percentage of events in the stream which have been processed (either by number of events or position in the transaction log). - /// - public decimal OverallPercentage => - LastProcessedEventPosition == 0 || EndOfStreamPosition == 0 - ? 0.0m - : (decimal)LastProcessedEventPosition / EndOfStreamPosition * 100; - - /// - /// Gets the percentage of events in the stream which require catching up on, which have been processed (either by number of events or position in the transaction log). - /// - public decimal CatchupPercentage => - LastProcessedEventPosition - StartPosition == 0 || EndOfStreamPosition - StartPosition == 0 - ? 0.0m - : ((decimal)LastProcessedEventPosition - StartPosition) / (EndOfStreamPosition - StartPosition) * 100; - - /// - /// Generates a string describing the state of the stream catch up progress. - /// - /// A string describing the state of the stream catch up progress. - public override string ToString() - { - return - $"[{StreamName}] Overall Pos: {OverallPercentage:0.#}% ({LastProcessedEventPosition}/{EndOfStreamPosition}), Caught up: {CatchupPercentage:0.#}% ({LastProcessedEventPosition - StartPosition}/{EndOfStreamPosition - StartPosition})"; - } - } -} diff --git a/src/eventstore/CheckpointingWrappingDispatcher.cs b/src/eventstore/CheckpointingWrappingDispatcher.cs deleted file mode 100644 index 1b074f5..0000000 --- a/src/eventstore/CheckpointingWrappingDispatcher.cs +++ /dev/null @@ -1,79 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System; - using System.Text; - using CorshamScience.MessageDispatch.Core; - using global::EventStore.Client; - using Newtonsoft.Json.Linq; - - /// - /// A wrapping event dispatcher which keeps track of a checkpoint, and whether the dispatched event has been previously processed or not. - /// - public class CheckpointingWrappingDispatcher : EventStoreAggregateEventDispatcher - { - private readonly WriteThroughFileCheckpoint _checkpoint; - private readonly long _startupCheckpointValue; - - /// - /// Initializes a new instance of the class. - /// - /// The file to write a checkpoint to. - /// The initial value to write. - /// The handler methods for processing messages with. - /// The metadata key. - public CheckpointingWrappingDispatcher( - string checkpointFilePath, - long initValue, - IMessageHandlerLookup handlers, - string metadataKey = null) - : base(handlers, null, metadataKey) - { - _checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, initValue); - _startupCheckpointValue = _checkpoint.Read(); - } - - /// - public override void Dispatch(ResolvedEvent message) - { - base.Dispatch(message); - - var previouslyProcessed = message.OriginalEventNumber.ToInt64() <= _startupCheckpointValue; - - if (previouslyProcessed) - { - return; - } - - _checkpoint.Write(message.OriginalEventNumber.ToInt64()); - } - - /// - protected override bool TryDeserialize( - Type messageType, - ResolvedEvent rawMessage, - out object deserialized) - { - deserialized = null!; - - var previouslyProcessed = rawMessage.OriginalEventNumber.ToInt64() <= _startupCheckpointValue; - - try - { - var jsonString = Encoding.UTF8.GetString(rawMessage.Event.Data.Span); - var @event = JObject.Parse(jsonString); - - deserialized = new EventWrapper(@event, previouslyProcessed); - - return deserialized != null; - } - catch (Exception) - { - return false; - } - } - } -} diff --git a/src/eventstore/EventStoreAggregateEventDispatcher.cs b/src/eventstore/EventStoreAggregateEventDispatcher.cs deleted file mode 100644 index 234f676..0000000 --- a/src/eventstore/EventStoreAggregateEventDispatcher.cs +++ /dev/null @@ -1,125 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System; - using System.Collections.Generic; - using System.Text; - using CorshamScience.MessageDispatch.Core; - using global::EventStore.Client; - using Newtonsoft.Json; - using Newtonsoft.Json.Linq; - - /// - /// - /// A deserializing event dispatcher for events produced by CorshamScience.AggregatRepository. - /// - // ReSharper disable once UnusedMember.Global - public class EventStoreAggregateEventDispatcher : DeserializingMessageDispatcher - { - private readonly JsonSerializerSettings _serializerSettings; - private readonly Dictionary _typeCache = new Dictionary(); - private readonly string _metadataKey; - -#pragma warning disable SA1648 // inheritdoc should be used with inheriting class - /// - /// - /// Initializes a new instance of the class. - /// - /// The handler methods for processing messages with. - /// Determines the settings for the JSON serialization of events. - // ReSharper disable once UnusedMember.Global - public EventStoreAggregateEventDispatcher( - IMessageHandlerLookup handlers, - JsonSerializerSettings serializerSettings = null, - string metadataKey = null) - : base(handlers) - { - _serializerSettings = serializerSettings ?? new JsonSerializerSettings(); - _metadataKey = metadataKey ?? "ClrType"; - } -#pragma warning restore SA1648 // inheritdoc should be used with inheriting class - - /// - protected override bool TryGetMessageType(ResolvedEvent rawMessage, out Type type) - { - type = null; - - // optimization: don't even bother trying to deserialize metadata for system events - if (rawMessage.Event.EventType.StartsWith("$") || rawMessage.Event.Metadata.Length == 0) - { - return false; - } - - try - { - IDictionary metadata = JObject.Parse(Encoding.UTF8.GetString(rawMessage.Event.Metadata.Span)); - - if (!metadata.ContainsKey(_metadataKey)) - { - return false; - } - - string typeString = (string)metadata[_metadataKey]; - - if (!_typeCache.TryGetValue(typeString, out var cached)) - { - try - { - cached = Type.GetType( - typeString, - (assemblyName) => - { - assemblyName.Version = null; - return System.Reflection.Assembly.Load(assemblyName); - }, - null, - true, - true); - } - catch (Exception) - { - cached = typeof(TypeNotFound); - } - - _typeCache.Add(typeString, cached); - } - - if (cached?.Name.Equals("TypeNotFound") ?? false) - { - return false; - } - - type = cached; - return true; - } - catch (Exception) - { - return false; - } - } - - /// - protected override bool TryDeserialize(Type messageType, ResolvedEvent rawMessage, out object deserialized) - { - deserialized = null; - - try - { - var jsonString = Encoding.UTF8.GetString(rawMessage.Event.Data.Span); - deserialized = JsonConvert.DeserializeObject(jsonString, messageType, _serializerSettings); - return deserialized != null; - } - catch (Exception) - { - return false; - } - } - - private class TypeNotFound - { - } - } -} diff --git a/src/eventstore/EventStoreJObjectDispatcher.cs b/src/eventstore/EventStoreJObjectDispatcher.cs deleted file mode 100644 index 83db131..0000000 --- a/src/eventstore/EventStoreJObjectDispatcher.cs +++ /dev/null @@ -1,56 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System; - using System.Text; - using CorshamScience.MessageDispatch.Core; - using global::EventStore.Client; - using Newtonsoft.Json.Linq; - - /// - /// - /// A message dispatcher that deserializes messages to a JObject upon dispatch. - /// - // ReSharper disable once UnusedMember.Global - public class EventStoreJObjectDispatcher : DeserializingMessageDispatcher - { -#pragma warning disable SA1648 // inheritdoc should be used with inheriting class - /// - /// - /// Initializes a new instance of the class. - /// - /// Lookups for the handlers which the class can use to process messages. - // ReSharper disable once UnusedMember.Global - public EventStoreJObjectDispatcher(IMessageHandlerLookup handlers) - : base(handlers) - { - } -#pragma warning restore SA1648 // inheritdoc should be used with inheriting class - - /// - protected override bool TryGetMessageType(ResolvedEvent rawMessage, out string type) - { - type = rawMessage.Event.EventType; - return true; - } - - /// - protected override bool TryDeserialize(string messageType, ResolvedEvent rawMessage, out object deserialized) - { - deserialized = null; - - try - { - deserialized = JObject.Parse(Encoding.UTF8.GetString(rawMessage.Event.Data.Span)); - return true; - } - catch (Exception) - { - return false; - } - } - } -} diff --git a/src/eventstore/EventStoreSubscriber.cs b/src/eventstore/EventStoreSubscriber.cs deleted file mode 100644 index 4d7030e..0000000 --- a/src/eventstore/EventStoreSubscriber.cs +++ /dev/null @@ -1,549 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using CorshamScience.MessageDispatch.Core; - using global::EventStore.Client; - using Microsoft.Extensions.Logging; - - /// - /// Subscriber for event store. - /// - public class EventStoreSubscriber - { - private const string AllStreamName = "$all"; - private readonly WriteThroughFileCheckpoint _checkpoint; - private readonly object _subscriptionLock = new object(); - - private EventStoreClient _eventStoreClient; - private ulong? _startingPosition; - private StreamSubscription _subscription; - private string _streamName; - private bool _liveOnly; - private bool _isSubscribed; - private bool _isSubscriptionLive; - private bool _subscribeToAll; - private ulong? _lastProcessedEventPosition; - private ulong _actualEndOfStreamPosition; - private ulong _liveEventThreshold; - private ulong _liveThresholdPosition; - private DateTime _lastStreamPositionTimestamp; - private Func _setLastPositions; - - private IDispatcher _dispatcher; - private ILogger _logger; - - private EventStoreSubscriber( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - string streamName, - ILogger logger, - ulong? startingPosition, - ulong liveEventThreshold) - => Init(eventStoreClient, dispatcher, streamName, logger, liveEventThreshold, startingPosition); - - private EventStoreSubscriber( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - ILogger logger, - string streamName, - string checkpointFilePath, - ulong liveEventThreshold) - { - _checkpoint = new WriteThroughFileCheckpoint(checkpointFilePath, -1); - var initialCheckpointPosition = _checkpoint.Read(); - ulong? startingPosition = null; - - if (initialCheckpointPosition != -1) - { - startingPosition = (ulong)initialCheckpointPosition; - } - - Init(eventStoreClient, dispatcher, streamName, logger, liveEventThreshold, startingPosition); - } - - private EventStoreSubscriber( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - string streamName, - ILogger logger, - ulong liveEventThreshold) - => Init(eventStoreClient, dispatcher, streamName, logger, liveEventThreshold, liveOnly: true); - - /// - /// Gets a new catchup progress object. - /// - // ReSharper disable once UnusedMember.Global - public CatchupProgress CatchupProgress - { - get - { - var lastStreamPosition = GetLastPositions().Result; - - return new CatchupProgress( - _lastProcessedEventPosition ?? 0, - _streamName, - lastStreamPosition.actualEndOfStreamPosition, - _startingPosition ?? 0, - _subscribeToAll); - } - } - - /// - /// Gets a value indicating whether the view model is ready or not. - /// - /// Returns true if catchup is within threshold. - public bool IsLive - { - get - { - // if we aren't subscribed, it doesn't count as live - if (!_isSubscribed) - { - return false; - } - - // if we are still subscribed, and we have ever been live, we are still live - if (_isSubscribed && _isSubscriptionLive) - { - return true; - } - - var lastStreamPosition = GetLastPositions().Result; - - _isSubscriptionLive = (_liveOnly && _lastProcessedEventPosition is null && _isSubscribed) || - _lastProcessedEventPosition >= lastStreamPosition.liveThresholdPosition; - return _isSubscriptionLive; - } - } - - /// - /// Creates a live eventstore subscription. - /// - /// Eventstore connection. - /// Dispatcher. - /// Stream name to push events into. - /// Logger. - /// Proximity to end of stream before subscription considered live. - /// A new EventStoreSubscriber object. - // ReSharper disable once UnusedMember.Global - public static EventStoreSubscriber CreateLiveSubscription( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - string streamName, - ILogger logger, - ulong liveEventThreshold = 10) - => new EventStoreSubscriber(eventStoreClient, dispatcher, streamName, logger, liveEventThreshold); - - /// - /// Creates an eventstore catchup subscription using a checkpoint file. - /// - /// Eventstore connection. - /// Dispatcher. - /// Stream name to push events into. - /// Logger. - /// Path of the checkpoint file. - /// Proximity to end of stream before subscription considered live. - /// A new EventStoreSubscriber object. - // ReSharper disable once UnusedMember.Global - public static EventStoreSubscriber CreateCatchupSubscriptionUsingCheckpoint( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - string streamName, - ILogger logger, - string checkpointFilePath, - ulong liveEventThreshold = 10) - => new EventStoreSubscriber(eventStoreClient, dispatcher, logger, streamName, checkpointFilePath, liveEventThreshold); - - /// - /// Creates an eventstore catchup subscription from a position. - /// - /// Eventstore connection. - /// Dispatcher. - /// Stream name to push events into. - /// Logger. - /// Starting Position. - /// Proximity to end of stream before subscription considered live. - /// A new EventStoreSubscriber object. - // ReSharper disable once UnusedMember.Global - public static EventStoreSubscriber CreateCatchupSubscriptionFromPosition( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - string streamName, - ILogger logger, - ulong? startingPosition, - ulong liveEventThreshold = 10) - => new EventStoreSubscriber(eventStoreClient, dispatcher, streamName, logger, startingPosition, liveEventThreshold); - - /// - /// Creates an eventstore catchup subscription that is subscribed to all from the start. - /// - /// Eventstore connection. - /// Dispatcher. - /// Logger. - /// Proximity to end of stream before subscription considered live. - /// A new EventStoreSubscriber object. - // ReSharper disable once UnusedMember.Global - public static EventStoreSubscriber CreateCatchupSubscriptionSubscribedToAll( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - ILogger logger, - ulong liveEventThreshold = 10) - => new EventStoreSubscriber( - eventStoreClient, - dispatcher, - AllStreamName, - logger, - liveEventThreshold); - - /// - /// Creates an eventstore catchup subscription that is subscribed to all from a position. - /// - /// Eventstore connection. - /// Dispatcher. - /// Logger. - /// Starting Position. - /// Proximity to end of stream before subscription considered live. - /// A new EventStoreSubscriber object. - // ReSharper disable once UnusedMember.Global - public static EventStoreSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - ILogger logger, - ulong? startingPosition, - ulong liveEventThreshold = 10) - => new EventStoreSubscriber( - eventStoreClient, - dispatcher, - AllStreamName, - logger, - startingPosition, - liveEventThreshold); - - /// - /// Creates an eventstore catchup subscription subscribed to all using a checkpoint file. - /// - /// Eventstore connection. - /// Dispatcher. - /// Logger. - /// Path of the checkpoint file. - /// Proximity to end of stream before subscription considered live. - /// A new EventStoreSubscriber object. - // ReSharper disable once UnusedMember.Global - public static EventStoreSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint( - EventStoreClient eventStoreClient, - IDispatcher dispatcher, - ILogger logger, - string checkpointFilePath, - ulong liveEventThreshold = 10) - => new EventStoreSubscriber( - eventStoreClient, - dispatcher, - logger, - AllStreamName, - checkpointFilePath, - liveEventThreshold); - - /// - /// Start the subscriber. - /// - // ReSharper disable once MemberCanBePrivate.Global - public void Start() - { - while (true) - { - _isSubscribed = false; - - try - { - Monitor.Enter(_subscriptionLock); - - KillSubscription(); - - // No synchronization context is needed to disable synchronization context. - // That enables running asynchronous method not causing deadlocks. - // As this is a background process then we don't need to have async context here. - using (NoSynchronizationContextScope.Enter()) - { - var filterOptions = new SubscriptionFilterOptions( - EventTypeFilter.ExcludeSystemEvents(), - checkpointReached: CheckpointReached); - const bool resolveLinkTos = true; - - Task Appeared( - StreamSubscription streamSubscription, - ResolvedEvent e, - CancellationToken cancellationToken) => - EventAppeared(e); - - switch (_liveOnly) - { - case true when !_subscribeToAll: - _subscription = _eventStoreClient.SubscribeToStreamAsync( - _streamName, - FromStream.End, - Appeared, - resolveLinkTos, - SubscriptionDropped).Result; - break; - case false when !_subscribeToAll: - { - var fromStream = _startingPosition.HasValue ? - FromStream.After(new StreamPosition(_startingPosition.Value)) : - FromStream.Start; - - _subscription = _eventStoreClient.SubscribeToStreamAsync( - _streamName, - fromStream, - Appeared, - resolveLinkTos, - SubscriptionDropped).Result; - break; - } - - case true when _subscribeToAll: - _subscription = _eventStoreClient.SubscribeToAllAsync( - FromAll.End, - Appeared, - resolveLinkTos, - SubscriptionDropped, - filterOptions) - .Result; - break; - case false when _subscribeToAll: - var fromAll = _startingPosition.HasValue ? - FromAll.After(new Position(_startingPosition.Value, _startingPosition.Value)) : - FromAll.Start; - - _subscription = _eventStoreClient.SubscribeToAllAsync( - fromAll, - Appeared, - resolveLinkTos, - SubscriptionDropped, - filterOptions) - .Result; - break; - } - } - - _isSubscribed = true; - _logger.LogInformation("Subscribed to '{StreamName}'", _streamName); - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Failed to resubscribe to '{StreamName}' dropped with '{ExceptionMessage}{ExceptionStackTrace}'", _streamName, ex.Message, ex.StackTrace); - } - finally - { - Monitor.Exit(_subscriptionLock); - } - - if (_isSubscribed) - { - break; - } - - // Sleep between reconnections to not flood the database or not kill the CPU with infinite loop - // Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time - Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000)); - } - } - - /// - /// Shut down the subscription. - /// - // ReSharper disable once UnusedMember.Global - public void ShutDown() - { - lock (_subscriptionLock) - { - KillSubscription(); - } - } - - private void Init( - EventStoreClient connection, - IDispatcher dispatcher, - string streamName, - ILogger logger, - ulong liveEventThreshold, - ulong? startingPosition = null, - bool liveOnly = false) - { - _logger = logger; - _startingPosition = startingPosition; - _lastProcessedEventPosition = startingPosition; - _dispatcher = dispatcher; - _streamName = streamName; - _eventStoreClient = connection; - _liveOnly = liveOnly; - _subscribeToAll = streamName == AllStreamName; - _liveEventThreshold = liveEventThreshold; - _liveThresholdPosition = StreamPosition.End; - _lastStreamPositionTimestamp = DateTime.MinValue; - - _setLastPositions = _subscribeToAll - ? async () => - { - var eventsWithinThreshold = await _eventStoreClient.ReadAllAsync( - Direction.Backwards, - Position.End, - maxCount: (long)_liveEventThreshold) - .ToListAsync(); - - _liveThresholdPosition = eventsWithinThreshold.Last().OriginalEvent.Position.CommitPosition; - _actualEndOfStreamPosition = eventsWithinThreshold.First().OriginalEvent.Position.CommitPosition; - } - : async () => - { - var eventsWithinThreshold = await _eventStoreClient.ReadStreamAsync( - Direction.Backwards, - _streamName, - StreamPosition.End, - maxCount: (long)_liveEventThreshold, - resolveLinkTos: false) - .ToListAsync(); - - _liveThresholdPosition = eventsWithinThreshold.Last().OriginalEventNumber.ToUInt64(); - _actualEndOfStreamPosition = eventsWithinThreshold.First().OriginalEventNumber.ToUInt64(); - }; - } - - private void SubscriptionDropped(StreamSubscription eventStoreCatchUpSubscription, SubscriptionDroppedReason subscriptionDropReason, Exception ex) - { - if (ex != null) - { - _logger.LogInformation(ex, "Event Store subscription dropped {0}", subscriptionDropReason.ToString()); - } - else - { - _logger.LogInformation("Event Store subscription dropped {0}", subscriptionDropReason.ToString()); - } - - if (subscriptionDropReason == SubscriptionDroppedReason.Disposed) - { - _logger.LogInformation("Not attempting to restart subscription was disposed. Subscription is dead."); - return; - } - - _isSubscribed = false; - - // if the subscription drops, set its 'liveness' to false - _isSubscriptionLive = false; - _startingPosition = _lastProcessedEventPosition; - Start(); - } - - private Task EventAppeared(ResolvedEvent resolvedEvent) - { - ProcessEvent(resolvedEvent); - - var lastProcessedEventPosition = GetLastProcessedPosition(resolvedEvent); - - if (_liveOnly && _lastProcessedEventPosition is null) - { - _startingPosition = lastProcessedEventPosition; - } - - _lastProcessedEventPosition = lastProcessedEventPosition; - - return Task.CompletedTask; - } - - private void ProcessEvent(ResolvedEvent resolvedEvent) - { - if (resolvedEvent.Event == null || resolvedEvent.Event.EventType.StartsWith("$")) - { - return; - } - - try - { - _dispatcher.Dispatch(resolvedEvent); - - var checkpointNumber = GetLastProcessedPosition(resolvedEvent); - - WriteCheckpoint(checkpointNumber); - _logger.LogTrace( - "Event dispatched from Eventstore subscriber ({0}/{1})", - resolvedEvent.Event.EventStreamId, - resolvedEvent.Event.EventNumber); - } - catch (Exception ex) - { - _logger.LogError( - ex, - "Error dispatching event from Event Store subscriber ({0}/{1})", - resolvedEvent.Event.EventStreamId, - resolvedEvent.Event.EventNumber); - } - } - - private ulong GetLastProcessedPosition(ResolvedEvent resolvedEvent) - { - return _subscribeToAll - ? resolvedEvent.OriginalEvent.Position.CommitPosition - : resolvedEvent.OriginalEventNumber.ToUInt64(); - } - - private void WriteCheckpoint(ulong checkpointNumber) - { - if (_checkpoint == null) - { - return; - } - - if (checkpointNumber > long.MaxValue) - { - _logger.LogError( - "Value is too large to be checkpointed. Checkpoint number {CheckpointNumber}", - checkpointNumber); - return; - } - - _checkpoint.Write((long)checkpointNumber); - _logger.LogTrace("Checkpoint written. Checkpoint number {CheckpointNumber}", checkpointNumber); - } - - private async Task<(ulong liveThresholdPosition, ulong actualEndOfStreamPosition)> GetLastPositions() - { - var streamPositionIsStale = (DateTime.UtcNow - _lastStreamPositionTimestamp) > TimeSpan.FromSeconds(10); - - if (_isSubscribed && streamPositionIsStale) - { - await _setLastPositions(); - _lastStreamPositionTimestamp = DateTime.UtcNow; - } - - return (_liveThresholdPosition, _actualEndOfStreamPosition); - } - - private void KillSubscription() - { - if (_subscription != null) - { - _subscription.Dispose(); - _subscription = null; - } - - _isSubscribed = false; - } - - private Task CheckpointReached( - StreamSubscription streamSubscription, - Position position, - CancellationToken cancellationToken) - { - _lastProcessedEventPosition = position.CommitPosition; - WriteCheckpoint((ulong)_lastProcessedEventPosition); - - return Task.CompletedTask; - } - } -} diff --git a/src/eventstore/EventWrapper.cs b/src/eventstore/EventWrapper.cs deleted file mode 100644 index 5e7fb5f..0000000 --- a/src/eventstore/EventWrapper.cs +++ /dev/null @@ -1,38 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - using Newtonsoft.Json.Linq; - - /// - /// Represents a wrapper for an event. - /// - public class EventWrapper - { - /// - /// Initializes a new instance of the class. - /// - /// The event to wrap. - /// A value indicating whether the event has been previously processed. - public EventWrapper(JObject @event, bool previouslyProcessed) - { - Event = @event; - PreviouslyProcessed = previouslyProcessed; - } - - /// - /// Gets a value indicating whether this event has been previously processed before. - /// - public bool PreviouslyProcessed { get; } - - /// - /// Gets the event data. - /// - public JObject Event { get; } = null!; - - /// - public override string ToString() => $"Event: {Event}, PreviouslyProcessed: {PreviouslyProcessed}"; - } -} diff --git a/src/eventstore/NoSynchronizationContextScope.cs b/src/eventstore/NoSynchronizationContextScope.cs deleted file mode 100644 index 2f3b05b..0000000 --- a/src/eventstore/NoSynchronizationContextScope.cs +++ /dev/null @@ -1,36 +0,0 @@ -/* This file is taken from Event Store codebase - https://github.com/EventStore/samples/blob/main/CQRS_Flow/.NET/Core/Core/Threading/NoSynchronizationContextScope.cs - As such we should not add a Corsham Science copyright file header */ - -// ReSharper disable InconsistentNaming -#pragma warning disable CS8632, SA1600, SX1309 - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System; - using System.Threading; - - internal static class NoSynchronizationContextScope - { - public static Disposable Enter() - { - var context = SynchronizationContext.Current; - SynchronizationContext.SetSynchronizationContext(null); - return new Disposable(context); - } - - public struct Disposable : IDisposable - { - private readonly SynchronizationContext? synchronizationContext; - - public Disposable(SynchronizationContext? synchronizationContext) - { - this.synchronizationContext = synchronizationContext; - } - - public void Dispose() => - SynchronizationContext.SetSynchronizationContext(synchronizationContext); - } - } -} -#pragma warning restore CS8632, SA1600, SX1309 diff --git a/src/eventstore/SimpleEventStoreDispatcher.cs b/src/eventstore/SimpleEventStoreDispatcher.cs deleted file mode 100644 index 6574917..0000000 --- a/src/eventstore/SimpleEventStoreDispatcher.cs +++ /dev/null @@ -1,64 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System; - using System.Collections.Generic; - using System.Text; - using CorshamScience.MessageDispatch.Core; - using global::EventStore.Client; - using Newtonsoft.Json; - - /// - /// - /// A simple event store dispatcher. - /// - public class SimpleEventStoreDispatcher : DeserializingMessageDispatcher - { - private readonly Dictionary _eventTypeMapping; - private readonly JsonSerializerSettings _serializerSettings; - -#pragma warning disable SA1648 // inheritdoc should be used with inheriting class - /// - /// - /// Initializes a new instance of the class. - /// - /// Message handler lookup of a type. - /// Event Type Map. - /// Json Serializer settings. - // ReSharper disable once UnusedMember.Global - public SimpleEventStoreDispatcher(IMessageHandlerLookup handlers, Dictionary eventTypeMapping, JsonSerializerSettings serializerSettings = null) - : base(handlers) - { - _eventTypeMapping = eventTypeMapping; - _serializerSettings = serializerSettings ?? new JsonSerializerSettings(); - } -#pragma warning restore SA1648 // inheritdoc should be used with inheriting class - - /// - protected override bool TryGetMessageType(ResolvedEvent rawMessage, out Type type) - { - var eventType = rawMessage.Event.EventType; - return _eventTypeMapping.TryGetValue(eventType, out type); - } - - /// - protected override bool TryDeserialize(Type messageType, ResolvedEvent rawMessage, out object deserialized) - { - deserialized = null; - - try - { - var jsonString = Encoding.UTF8.GetString(rawMessage.Event.Data.Span); - deserialized = JsonConvert.DeserializeObject(jsonString, messageType, _serializerSettings); - return deserialized != null; - } - catch (Exception) - { - return false; - } - } - } -} diff --git a/src/eventstore/WriteThroughFileCheckpoint.cs b/src/eventstore/WriteThroughFileCheckpoint.cs deleted file mode 100644 index 5da49b8..0000000 --- a/src/eventstore/WriteThroughFileCheckpoint.cs +++ /dev/null @@ -1,79 +0,0 @@ -// -// Copyright (c) Corsham Science. All rights reserved. -// -#pragma warning disable CA1001 - -namespace CorshamScience.MessageDispatch.EventStore -{ - using System.IO; - - /// - /// Writes a checkpoint to a file pulled from event store. - /// - internal class WriteThroughFileCheckpoint - { - private readonly object _streamLock = new (); - private readonly FileStream _fileStream; - private readonly BinaryReader _reader; - private readonly BinaryWriter _writer; - private long? _lastWritten; - - /// - /// Initializes a new instance of the class. - /// - /// The file to write a checkpoint to. - /// The initial value to write. - public WriteThroughFileCheckpoint(string filePath, long initValue = 0L) - { - var alreadyExisted = File.Exists(filePath); - - _fileStream = new FileStream(filePath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite); - - if (_fileStream.Length != sizeof(long)) - { - _fileStream.SetLength(8); - } - - _reader = new BinaryReader(_fileStream); - _writer = new BinaryWriter(_fileStream); - - if (!alreadyExisted) - { - Write(initValue); - } - } - - /// - /// Reads the current checkpoint. - /// - /// The current checkpoint. - public long Read() - { - lock (_streamLock) - { - if (_lastWritten.HasValue) - { - return _lastWritten.Value; - } - - _fileStream.Seek(0, SeekOrigin.Begin); - return _reader.ReadInt64(); - } - } - - /// - /// Writes the checkpoint value and optionally flushes the underlying stream. - /// - /// The checkpoint value to write. - public void Write(long checkpoint) - { - lock (_streamLock) - { - _fileStream.Seek(0, SeekOrigin.Begin); - _writer.Write(checkpoint); - _lastWritten = checkpoint; - _fileStream.Flush(flushToDisk: true); - } - } - } -} diff --git a/src/eventstore/eventstore.csproj b/src/eventstore/eventstore.csproj deleted file mode 100644 index 594fdc0..0000000 --- a/src/eventstore/eventstore.csproj +++ /dev/null @@ -1,48 +0,0 @@ - - - - net6.0 - CorshamScience.MessageDispatch.EventStore - CorshamScience.MessageDispatch.EventStore - Corsham Science - Corsham Science - Corsham Science 2019 - https://github.com/qphl/MessageDispatch.EventStore/blob/master/LICENSE - https://github.com/qphl/MessageDispatch.EventStore - https://github.com/qphl/MessageDispatch.EventStore - Message Dispatching, Event Sourcing - CorshamScience.MessageDispatch.EventStore - CorshamScience.MessageDispatch - A package to use EventStore to get Events to Dispatch using CorshamScience.MessageDispatch. - https://GitHub.com/qphl/MessageDispatch.EventStore/releases/tag/$(Tag) - BSD-3-Clause - Logo.png - - - - bin\Debug\net6.0\CorshamScience.MessageDispatch.EventStore.xml - - - - bin\Release\net6.0\CorshamScience.MessageDispatch.EventStore.xml - true - - - - - - All - - - - - - - - - - True - - - -