diff --git a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs index a608852..0a98f12 100644 --- a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs +++ b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs @@ -42,11 +42,11 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can var sw = Stopwatch.StartNew(); try { - await _managementClient.CreateSubscriptionAsync(CreateSubscriptionDescription()).AnyContext(); + await _managementClient.CreateSubscriptionAsync(CreateSubscriptionDescription(), cancellationToken).AnyContext(); } catch (MessagingEntityAlreadyExistsException) { } // Look into message factory with multiple receivers so more than one connection is made and managed.... - _subscriptionClient = new SubscriptionClient(_options.ConnectionString, _options.Topic, _subscriptionName, ReceiveMode.ReceiveAndDelete, _options.SubscriptionRetryPolicy); + _subscriptionClient = new SubscriptionClient(_options.ConnectionString, _options.Topic, _subscriptionName, _options.SubscriptionReceiveMode, _options.SubscriptionRetryPolicy); _subscriptionClient.RegisterMessageHandler(OnMessageAsync, new MessageHandlerOptions(MessageHandlerException) { /* AutoComplete = true, // Don't run with receive and delete */ MaxConcurrentCalls = 6 /* calculate this based on the the thread count. */ }); if (_options.PrefetchCount.HasValue) @@ -59,7 +59,7 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can private Task OnMessageAsync(Microsoft.Azure.ServiceBus.Message brokeredMessage, CancellationToken cancellationToken) { if (_subscribers.IsEmpty) return Task.CompletedTask; - + _logger.LogTrace("OnMessageAsync({messageId})", brokeredMessage.MessageId); var message = new Message(() => DeserializeMessageBody(brokeredMessage.ContentType, brokeredMessage.Body)) { Data = brokeredMessage.Body, @@ -141,7 +141,7 @@ private TopicDescription CreateTopicDescription() { if (!String.IsNullOrEmpty(_options.TopicUserMetadata)) td.UserMetadata = _options.TopicUserMetadata; - + return td; } diff --git a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs index 6428a27..7e7a919 100644 --- a/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs +++ b/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs @@ -143,16 +143,18 @@ public class AzureServiceBusMessageBusOptions : SharedMessageBusOptions { public string SubscriptionUserMetadata { get; set; } public RetryPolicy SubscriptionRetryPolicy { get; set; } + + public ReceiveMode SubscriptionReceiveMode { get; set; } = ReceiveMode.ReceiveAndDelete; } public class AzureServiceBusMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder< AzureServiceBusMessageBusOptions, AzureServiceBusMessageBusOptionsBuilder> { - + public AzureServiceBusMessageBusOptionsBuilder ConnectionString(string connectionString) { Target.ConnectionString = connectionString; return this; } - + public AzureServiceBusMessageBusOptionsBuilder PrefetchCount(int prefetchCount) { Target.PrefetchCount = prefetchCount; return this; @@ -292,5 +294,10 @@ public AzureServiceBusMessageBusOptionsBuilder SubscriptionRetryPolicy(RetryPoli Target.SubscriptionRetryPolicy = subscriptionRetryPolicy ?? throw new ArgumentNullException(nameof(subscriptionRetryPolicy)); return this; } + + public AzureServiceBusMessageBusOptionsBuilder SubscriptionReceiveMode(ReceiveMode receiveMode) { + Target.SubscriptionReceiveMode = receiveMode; + return this; + } } } \ No newline at end of file