Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/DomainBlocks.MongoDB.Sequencing/MongoSequenceInitializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using MongoDB.Bson;
using MongoDB.Driver;

namespace DomainBlocks.MongoDB.Sequencing;

/// <summary>
/// Utility for initializing sequence counters prior to use with
/// <see cref="MongoSequencedAppender{TDocument,TContext}"/>.
/// </summary>
public static class MongoSequenceInitializer
{
/// <summary>
/// Initializes a sequence counter so that the first document appended will be assigned
/// <paramref name="initialSequenceNumber"/>. Has no effect if the counter document already exists.
/// </summary>
/// <returns>
/// <see langword="true"/> if the sequence counter was created; <see langword="false"/> if it already existed.
/// </returns>
public static async Task<bool> TryInitializeAsync(
IMongoClient mongoClient,
CollectionNamespace sequenceCollectionNamespace,
string sequenceId,
long initialSequenceNumber,
CancellationToken cancellationToken = default)
{
ArgumentOutOfRangeException.ThrowIfNegative(initialSequenceNumber);

var collection = mongoClient
.GetDatabase(sequenceCollectionNamespace.DatabaseNamespace.DatabaseName)
.GetCollection<BsonDocument>(sequenceCollectionNamespace.CollectionName)
.WithWriteConcern(WriteConcern.WMajority.With(journal: true));

var filter = Builders<BsonDocument>.Filter.Eq("_id", sequenceId);

// SetOnInsert ensures this is a no-op if the counter document already exists.
var update = Builders<BsonDocument>.Update.SetOnInsert(SequenceFieldNames.Next, initialSequenceNumber);

var result = await collection
.UpdateOneAsync(filter, update, new UpdateOptions { IsUpsert = true }, cancellationToken)
.ConfigureAwait(false);

return result.UpsertedId is not null;
}
}
8 changes: 3 additions & 5 deletions src/DomainBlocks.MongoDB.Sequencing/MongoSequencedAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,23 +339,21 @@ void CompleteAndRemoveFromBatch(Exception exception)

private async Task<long> ClaimSequenceAsync(IClientSessionHandle session, long count, CancellationToken ct)
{
const string nextFieldName = "next";

var filter = Builders<BsonDocument>.Filter.Eq("_id", _sequenceId);
var update = Builders<BsonDocument>.Update.Inc(nextFieldName, count);
var update = Builders<BsonDocument>.Update.Inc(SequenceFieldNames.Next, count);

var options = new FindOneAndUpdateOptions<BsonDocument>
{
IsUpsert = true,
Projection = Builders<BsonDocument>.Projection.Include(nextFieldName),
Projection = Builders<BsonDocument>.Projection.Include(SequenceFieldNames.Next),
ReturnDocument = ReturnDocument.Before
};

var result = await _sequenceCollection
.FindOneAndUpdateAsync(session, filter, update, options, ct)
.ConfigureAwait(false);

var start = result?[nextFieldName].ToInt64() ?? 0;
var start = result?[SequenceFieldNames.Next].ToInt64() ?? 0;

return start;
}
Expand Down
6 changes: 6 additions & 0 deletions src/DomainBlocks.MongoDB.Sequencing/SequenceFieldNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace DomainBlocks.MongoDB.Sequencing;

internal static class SequenceFieldNames
{
public const string Next = "next";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using NUnit.Framework;
using Shouldly;

namespace DomainBlocks.MongoDB.Sequencing.Tests.Integration;

public class MongoSequenceInitializerTests
{
private const string TestDbPrefix = "seq_init_test_";
private const string SequenceId = "test_seq";

private MongoClient _mongoClient = null!;
private string _databaseName = null!;
private CollectionNamespace _seqNs = null!;

[SetUp]
public void SetUp()
{
_mongoClient = new MongoClient(MongoReplicaSetFixture.ConnectionString);
_databaseName = $"{TestDbPrefix}{Guid.NewGuid():N}";
_seqNs = new CollectionNamespace(_databaseName, "sequences");
}

[TearDown]
public async Task TearDown()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

try
{
await _mongoClient.DropDatabaseAsync(_databaseName, cts.Token);
}
catch (OperationCanceledException)
{
// best-effort
}
finally
{
cts.Dispose();

#if !MONGO_DRIVER_V2
_mongoClient.Dispose();
#endif
}
}

[Test]
public async Task InitializeAsync_WhenSequenceDoesNotExist_ReturnsTrueAndSeedsCounter()
{
var result = await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 1);

result.ShouldBeTrue();

var next = await ReadNextAsync();
next.ShouldBe(1);
}

[Test]
public async Task InitializeAsync_WhenSequenceAlreadyExists_ReturnsFalseAndLeavesCounterUnchanged()
{
await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 1);

var result = await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 100);

result.ShouldBeFalse();

var next = await ReadNextAsync();
next.ShouldBe(1, "counter must not be overwritten by second call");
}

[Test]
public async Task InitializeAsync_WithZeroInitialSequenceNumber_Succeeds()
{
var result = await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 0);

result.ShouldBeTrue();

var next = await ReadNextAsync();
next.ShouldBe(0);
}

[Test]
public void InitializeAsync_WithNegativeInitialSequenceNumber_Throws()
{
Should.Throw<ArgumentOutOfRangeException>(() =>
MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, -1));
}

[Test]
[CancelAfter(30_000)]
public async Task InitializeAsync_FirstAppend_StartsFromInitialSequenceNumber(CancellationToken ct)
{
await MongoSequenceInitializer.TryInitializeAsync(_mongoClient, _seqNs, SequenceId, 10, ct);

var targetNs = new CollectionNamespace(_databaseName, "targets");
var db = _mongoClient.GetDatabase(_databaseName);
await db.CreateCollectionAsync(targetNs.CollectionName, cancellationToken: ct);

var targetCollection = db.GetCollection<TargetDoc>(targetNs.CollectionName);

await targetCollection.Indexes.CreateOneAsync(
new CreateIndexModel<TargetDoc>(
Builders<TargetDoc>.IndexKeys.Ascending(x => x.Sequence),
new CreateIndexOptions { Unique = true }),
cancellationToken: ct);

var binding = new MongoSequenceBinding<TargetDoc>(
sequenceCollectionNamespace: _seqNs,
sequenceId: SequenceId,
targetCollectionNamespace: targetNs,
targetField: new ExpressionFieldDefinition<TargetDoc, long>(x => x.Sequence));

await using var appender = new MongoSequencedAppender<TargetDoc, object>(_mongoClient, binding);

await appender.AppendAsync(
[
new TargetDoc { Value = "a" },
new TargetDoc { Value = "b" }
],
new object(),
cancellationToken: ct);

var docs = await targetCollection
.Find(FilterDefinition<TargetDoc>.Empty)
.Sort(Builders<TargetDoc>.Sort.Ascending(x => x.Sequence))
.ToListAsync(ct);

docs[0].Sequence.ShouldBe(10);
docs[1].Sequence.ShouldBe(11);
}

private async Task<long> ReadNextAsync()
{
var collection = _mongoClient
.GetDatabase(_seqNs.DatabaseNamespace.DatabaseName)
.GetCollection<BsonDocument>(_seqNs.CollectionName);

var doc = await collection
.Find(Builders<BsonDocument>.Filter.Eq("_id", SequenceId))
.FirstOrDefaultAsync();

doc.ShouldNotBeNull("sequence counter document should exist");
return doc["next"].ToInt64();
}

// ReSharper disable all
private record TargetDoc
{
public ObjectId Id { get; init; }
public required string Value { get; init; }

[BsonElement("seq")]
public long Sequence { get; init; }
}
// ReSharper restore all
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class MongoSequencedAppenderBenchmarkTests
{
private const int TestTimeoutMillis = 30 * 1_000;
private const string TestDbPrefix = "seq_bench_";
private const string SequenceId = "bench_seq";

private MongoClient _mongoClient = null!;
private string _databaseName = null!;
Expand Down Expand Up @@ -221,7 +222,7 @@ private MongoSequencedAppender<BenchmarkDoc, object> CreateAppender(int index =
{
var binding = new MongoSequenceBinding<BenchmarkDoc>(
sequenceCollectionNamespace: _seqNs,
sequenceId: "bench_seq",
sequenceId: SequenceId,
targetCollectionNamespace: _targetNs,
targetField: new ExpressionFieldDefinition<BenchmarkDoc, long>(x => x.Sequence));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class MongoSequencedAppenderConcurrencyTests
private const int TestTimeoutMillis = 10 * 1_000;
private const int AppenderCount = 5;
private const string TestDbPrefix = "seq_test_";
private const string SequenceId = "test_seq";

private MongoClient _mongoClient = null!;
private string _databaseName = null!;
Expand Down Expand Up @@ -241,7 +242,7 @@ private MongoSequencedAppender<TargetDoc, object> CreateAppender(int index)
{
var binding = new MongoSequenceBinding<TargetDoc>(
sequenceCollectionNamespace: _seqNs,
sequenceId: "test_seq",
sequenceId: SequenceId,
targetCollectionNamespace: _targetNs,
targetField: new ExpressionFieldDefinition<TargetDoc, long>(x => x.Nested!.Sequence));

Expand Down
Loading