diff --git a/src/DomainBlocks.MongoDB.Sequencing/MongoSequenceInitializer.cs b/src/DomainBlocks.MongoDB.Sequencing/MongoSequenceInitializer.cs new file mode 100644 index 0000000..489cb8c --- /dev/null +++ b/src/DomainBlocks.MongoDB.Sequencing/MongoSequenceInitializer.cs @@ -0,0 +1,44 @@ +using MongoDB.Bson; +using MongoDB.Driver; + +namespace DomainBlocks.MongoDB.Sequencing; + +/// +/// Utility for initializing sequence counters prior to use with +/// . +/// +public static class MongoSequenceInitializer +{ + /// + /// Initializes a sequence counter so that the first document appended will be assigned + /// . Has no effect if the counter document already exists. + /// + /// + /// if the sequence counter was created; if it already existed. + /// + public static async Task TryInitializeAsync( + IMongoClient mongoClient, + CollectionNamespace sequenceCollectionNamespace, + string sequenceId, + long initialSequenceNumber, + CancellationToken cancellationToken = default) + { + ArgumentOutOfRangeException.ThrowIfNegative(initialSequenceNumber); + + var collection = mongoClient + .GetDatabase(sequenceCollectionNamespace.DatabaseNamespace.DatabaseName) + .GetCollection(sequenceCollectionNamespace.CollectionName) + .WithWriteConcern(WriteConcern.WMajority.With(journal: true)); + + var filter = Builders.Filter.Eq("_id", sequenceId); + + // SetOnInsert ensures this is a no-op if the counter document already exists. + var update = Builders.Update.SetOnInsert(SequenceFieldNames.Next, initialSequenceNumber); + + var result = await collection + .UpdateOneAsync(filter, update, new UpdateOptions { IsUpsert = true }, cancellationToken) + .ConfigureAwait(false); + + return result.UpsertedId is not null; + } +} \ No newline at end of file diff --git a/src/DomainBlocks.MongoDB.Sequencing/MongoSequencedAppender.cs b/src/DomainBlocks.MongoDB.Sequencing/MongoSequencedAppender.cs index adb98ff..a8ad053 100644 --- a/src/DomainBlocks.MongoDB.Sequencing/MongoSequencedAppender.cs +++ b/src/DomainBlocks.MongoDB.Sequencing/MongoSequencedAppender.cs @@ -339,15 +339,13 @@ void CompleteAndRemoveFromBatch(Exception exception) private async Task ClaimSequenceAsync(IClientSessionHandle session, long count, CancellationToken ct) { - const string nextFieldName = "next"; - var filter = Builders.Filter.Eq("_id", _sequenceId); - var update = Builders.Update.Inc(nextFieldName, count); + var update = Builders.Update.Inc(SequenceFieldNames.Next, count); var options = new FindOneAndUpdateOptions { IsUpsert = true, - Projection = Builders.Projection.Include(nextFieldName), + Projection = Builders.Projection.Include(SequenceFieldNames.Next), ReturnDocument = ReturnDocument.Before }; @@ -355,7 +353,7 @@ private async Task ClaimSequenceAsync(IClientSessionHandle session, long c .FindOneAndUpdateAsync(session, filter, update, options, ct) .ConfigureAwait(false); - var start = result?[nextFieldName].ToInt64() ?? 0; + var start = result?[SequenceFieldNames.Next].ToInt64() ?? 0; return start; } diff --git a/src/DomainBlocks.MongoDB.Sequencing/SequenceFieldNames.cs b/src/DomainBlocks.MongoDB.Sequencing/SequenceFieldNames.cs new file mode 100644 index 0000000..ff3de9d --- /dev/null +++ b/src/DomainBlocks.MongoDB.Sequencing/SequenceFieldNames.cs @@ -0,0 +1,6 @@ +namespace DomainBlocks.MongoDB.Sequencing; + +internal static class SequenceFieldNames +{ + public const string Next = "next"; +} \ No newline at end of file diff --git a/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequenceInitializerTests.cs b/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequenceInitializerTests.cs new file mode 100644 index 0000000..d8ae4e5 --- /dev/null +++ b/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequenceInitializerTests.cs @@ -0,0 +1,158 @@ +using MongoDB.Bson; +using MongoDB.Bson.Serialization.Attributes; +using MongoDB.Driver; +using NUnit.Framework; +using Shouldly; + +namespace DomainBlocks.MongoDB.Sequencing.Tests.Integration; + +public class MongoSequenceInitializerTests +{ + private const string TestDbPrefix = "seq_init_test_"; + private const string SequenceId = "test_seq"; + + private MongoClient _mongoClient = null!; + private string _databaseName = null!; + private CollectionNamespace _seqNs = null!; + + [SetUp] + public void SetUp() + { + _mongoClient = new MongoClient(MongoReplicaSetFixture.ConnectionString); + _databaseName = $"{TestDbPrefix}{Guid.NewGuid():N}"; + _seqNs = new CollectionNamespace(_databaseName, "sequences"); + } + + [TearDown] + public async Task TearDown() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + try + { + await _mongoClient.DropDatabaseAsync(_databaseName, cts.Token); + } + catch (OperationCanceledException) + { + // best-effort + } + finally + { + cts.Dispose(); + +#if !MONGO_DRIVER_V2 + _mongoClient.Dispose(); +#endif + } + } + + [Test] + public async Task InitializeAsync_WhenSequenceDoesNotExist_ReturnsTrueAndSeedsCounter() + { + var result = await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 1); + + result.ShouldBeTrue(); + + var next = await ReadNextAsync(); + next.ShouldBe(1); + } + + [Test] + public async Task InitializeAsync_WhenSequenceAlreadyExists_ReturnsFalseAndLeavesCounterUnchanged() + { + await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 1); + + var result = await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 100); + + result.ShouldBeFalse(); + + var next = await ReadNextAsync(); + next.ShouldBe(1, "counter must not be overwritten by second call"); + } + + [Test] + public async Task InitializeAsync_WithZeroInitialSequenceNumber_Succeeds() + { + var result = await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 0); + + result.ShouldBeTrue(); + + var next = await ReadNextAsync(); + next.ShouldBe(0); + } + + [Test] + public void InitializeAsync_WithNegativeInitialSequenceNumber_Throws() + { + Should.Throw(() => + MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, -1)); + } + + [Test] + [CancelAfter(30_000)] + public async Task InitializeAsync_FirstAppend_StartsFromInitialSequenceNumber(CancellationToken ct) + { + await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 10, ct); + + var targetNs = new CollectionNamespace(_databaseName, "targets"); + var db = _mongoClient.GetDatabase(_databaseName); + await db.CreateCollectionAsync(targetNs.CollectionName, cancellationToken: ct); + + var targetCollection = db.GetCollection(targetNs.CollectionName); + + await targetCollection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.Sequence), + new CreateIndexOptions { Unique = true }), + cancellationToken: ct); + + var binding = new MongoSequenceBinding( + sequenceCollectionNamespace: _seqNs, + sequenceId: SequenceId, + targetCollectionNamespace: targetNs, + targetField: new ExpressionFieldDefinition(x => x.Sequence)); + + await using var appender = new MongoSequencedAppender(_mongoClient, binding); + + await appender.AppendAsync( + [ + new TargetDoc { Value = "a" }, + new TargetDoc { Value = "b" } + ], + new object(), + cancellationToken: ct); + + var docs = await targetCollection + .Find(FilterDefinition.Empty) + .Sort(Builders.Sort.Ascending(x => x.Sequence)) + .ToListAsync(ct); + + docs[0].Sequence.ShouldBe(10); + docs[1].Sequence.ShouldBe(11); + } + + private async Task ReadNextAsync() + { + var collection = _mongoClient + .GetDatabase(_seqNs.DatabaseNamespace.DatabaseName) + .GetCollection(_seqNs.CollectionName); + + var doc = await collection + .Find(Builders.Filter.Eq("_id", SequenceId)) + .FirstOrDefaultAsync(); + + doc.ShouldNotBeNull("sequence counter document should exist"); + return doc["next"].ToInt64(); + } + + // ReSharper disable all + private record TargetDoc + { + public ObjectId Id { get; init; } + public required string Value { get; init; } + + [BsonElement("seq")] + public long Sequence { get; init; } + } + // ReSharper restore all +} \ No newline at end of file diff --git a/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderBenchmarkTests.cs b/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderBenchmarkTests.cs index 2270459..acf6680 100644 --- a/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderBenchmarkTests.cs +++ b/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderBenchmarkTests.cs @@ -13,6 +13,7 @@ public class MongoSequencedAppenderBenchmarkTests { private const int TestTimeoutMillis = 30 * 1_000; private const string TestDbPrefix = "seq_bench_"; + private const string SequenceId = "bench_seq"; private MongoClient _mongoClient = null!; private string _databaseName = null!; @@ -221,7 +222,7 @@ private MongoSequencedAppender CreateAppender(int index = { var binding = new MongoSequenceBinding( sequenceCollectionNamespace: _seqNs, - sequenceId: "bench_seq", + sequenceId: SequenceId, targetCollectionNamespace: _targetNs, targetField: new ExpressionFieldDefinition(x => x.Sequence)); diff --git a/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderConcurrencyTests.cs b/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderConcurrencyTests.cs index 620862a..688e255 100644 --- a/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderConcurrencyTests.cs +++ b/tests/DomainBlocks.MongoDB.Sequencing.Tests.Integration/MongoSequencedAppenderConcurrencyTests.cs @@ -12,6 +12,7 @@ public class MongoSequencedAppenderConcurrencyTests private const int TestTimeoutMillis = 10 * 1_000; private const int AppenderCount = 5; private const string TestDbPrefix = "seq_test_"; + private const string SequenceId = "test_seq"; private MongoClient _mongoClient = null!; private string _databaseName = null!; @@ -241,7 +242,7 @@ private MongoSequencedAppender CreateAppender(int index) { var binding = new MongoSequenceBinding( sequenceCollectionNamespace: _seqNs, - sequenceId: "test_seq", + sequenceId: SequenceId, targetCollectionNamespace: _targetNs, targetField: new ExpressionFieldDefinition(x => x.Nested!.Sequence));