From d4079de4b92fbdf866e687ce599b94477d0047a0 Mon Sep 17 00:00:00 2001 From: divancode Date: Thu, 9 Apr 2026 20:33:00 +0300 Subject: [PATCH 1/2] feat(duely): kafka sasl auth --- Duely/docker-compose.yml | 1 + .../ExeshSubmissionStatusConsumer.cs | 8 ++++++++ .../Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs | 3 +++ .../TaskiSubmissionStatusConsumer.cs | 8 ++++++++ Duely/src/Duely/appsettings.Development.json | 3 ++- 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/Duely/docker-compose.yml b/Duely/docker-compose.yml index ba870532..7ec07634 100644 --- a/Duely/docker-compose.yml +++ b/Duely/docker-compose.yml @@ -65,6 +65,7 @@ services: Kafka__TaskiTopic: taski.testing Kafka__ExeshTopic: exesh.step-updates Kafka__BootstrapServers: kafka:9092 + Kafka__SaslAuth: false depends_on: postgres: condition: service_healthy diff --git a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs index 61f7cc0a..212c1b3e 100644 --- a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs +++ b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs @@ -30,6 +30,14 @@ public ExeshSubmissionStatusConsumer(IServiceScopeFactory scopeFactory, IOptions GroupId = kafkaOptions.Value.GroupId, AutoOffsetReset = AutoOffsetReset.Earliest }; + + if (kafkaOptions.Value.SaslAuth) + { + config.SecurityProtocol = SecurityProtocol.SaslPlaintext; + config.SaslMechanism = SaslMechanism.ScramSha512; + config.SaslUsername = kafkaOptions.Value.Username; + config.SaslPassword = kafkaOptions.Value.Password; + } _consumer = new ConsumerBuilder(config) .SetValueDeserializer(new KafkaValueDeserializer()) diff --git a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs index bf4aebe6..de0f45e4 100644 --- a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs +++ b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs @@ -8,4 +8,7 @@ public sealed class KafkaOptions public required string TaskiTopic { get; init; } public required string ExeshTopic { get; init; } public required string GroupId { get; init; } + public required bool SaslAuth { get; init; } + public string? Username { get; init; } + public string? Password { get; init; } } diff --git a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs index 4819f1cd..1da45bc5 100644 --- a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs +++ b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs @@ -28,6 +28,14 @@ public TaskiSubmissionStatusConsumer(IServiceScopeFactory scopeFactory, IOptions BootstrapServers = kafkaOptions.Value.BootstrapServers, GroupId = kafkaOptions.Value.GroupId }; + + if (kafkaOptions.Value.SaslAuth) + { + config.SecurityProtocol = SecurityProtocol.SaslPlaintext; + config.SaslMechanism = SaslMechanism.ScramSha512; + config.SaslUsername = kafkaOptions.Value.Username; + config.SaslPassword = kafkaOptions.Value.Password; + } _consumer = new ConsumerBuilder(config) .SetValueDeserializer(new KafkaValueDeserializer()) diff --git a/Duely/src/Duely/appsettings.Development.json b/Duely/src/Duely/appsettings.Development.json index 8cb619a9..16e6ef26 100644 --- a/Duely/src/Duely/appsettings.Development.json +++ b/Duely/src/Duely/appsettings.Development.json @@ -18,6 +18,7 @@ "BootstrapServers": "localhost:29092", "TaskiTopic": "taski.testing", "ExeshTopic": "exesh.step-updates", - "GroupId": "duely" + "GroupId": "duely", + "SaslAuth": false } } From 6a4c87c901cc9e01bff7c23440e5255f061c0003 Mon Sep 17 00:00:00 2001 From: divancode Date: Thu, 9 Apr 2026 22:09:44 +0300 Subject: [PATCH 2/2] fix naming --- .../ExeshSubmissionStatusConsumer.cs | 4 ++-- .../src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs | 4 ++-- .../TaskiSubmissionStatusConsumer.cs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs index 212c1b3e..b05744da 100644 --- a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs +++ b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/ExeshSubmissionStatusConsumer.cs @@ -35,8 +35,8 @@ public ExeshSubmissionStatusConsumer(IServiceScopeFactory scopeFactory, IOptions { config.SecurityProtocol = SecurityProtocol.SaslPlaintext; config.SaslMechanism = SaslMechanism.ScramSha512; - config.SaslUsername = kafkaOptions.Value.Username; - config.SaslPassword = kafkaOptions.Value.Password; + config.SaslUsername = kafkaOptions.Value.SaslUsername; + config.SaslPassword = kafkaOptions.Value.SaslPassword; } _consumer = new ConsumerBuilder(config) diff --git a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs index de0f45e4..83157478 100644 --- a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs +++ b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/KafkaOptions.cs @@ -9,6 +9,6 @@ public sealed class KafkaOptions public required string ExeshTopic { get; init; } public required string GroupId { get; init; } public required bool SaslAuth { get; init; } - public string? Username { get; init; } - public string? Password { get; init; } + public string? SaslUsername { get; init; } + public string? SaslPassword { get; init; } } diff --git a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs index 1da45bc5..d9c955a1 100644 --- a/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs +++ b/Duely/src/Duely.Infrastructure.MessageBus.Kafka/TaskiSubmissionStatusConsumer.cs @@ -33,8 +33,8 @@ public TaskiSubmissionStatusConsumer(IServiceScopeFactory scopeFactory, IOptions { config.SecurityProtocol = SecurityProtocol.SaslPlaintext; config.SaslMechanism = SaslMechanism.ScramSha512; - config.SaslUsername = kafkaOptions.Value.Username; - config.SaslPassword = kafkaOptions.Value.Password; + config.SaslUsername = kafkaOptions.Value.SaslUsername; + config.SaslPassword = kafkaOptions.Value.SaslPassword; } _consumer = new ConsumerBuilder(config)