Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can

var sw = Stopwatch.StartNew();
try {
await _managementClient.CreateSubscriptionAsync(CreateSubscriptionDescription()).AnyContext();
await _managementClient.CreateSubscriptionAsync(CreateSubscriptionDescription(), cancellationToken).AnyContext();
} catch (MessagingEntityAlreadyExistsException) { }

// Look into message factory with multiple receivers so more than one connection is made and managed....
_subscriptionClient = new SubscriptionClient(_options.ConnectionString, _options.Topic, _subscriptionName, ReceiveMode.ReceiveAndDelete, _options.SubscriptionRetryPolicy);
_subscriptionClient = new SubscriptionClient(_options.ConnectionString, _options.Topic, _subscriptionName, _options.SubscriptionReceiveMode, _options.SubscriptionRetryPolicy);
_subscriptionClient.RegisterMessageHandler(OnMessageAsync, new MessageHandlerOptions(MessageHandlerException) {
/* AutoComplete = true, // Don't run with receive and delete */ MaxConcurrentCalls = 6 /* calculate this based on the the thread count. */ });
if (_options.PrefetchCount.HasValue)
Expand All @@ -59,7 +59,7 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can
private Task OnMessageAsync(Microsoft.Azure.ServiceBus.Message brokeredMessage, CancellationToken cancellationToken) {
if (_subscribers.IsEmpty)
return Task.CompletedTask;

_logger.LogTrace("OnMessageAsync({messageId})", brokeredMessage.MessageId);
var message = new Message(() => DeserializeMessageBody(brokeredMessage.ContentType, brokeredMessage.Body)) {
Data = brokeredMessage.Body,
Expand Down Expand Up @@ -141,7 +141,7 @@ private TopicDescription CreateTopicDescription() {

if (!String.IsNullOrEmpty(_options.TopicUserMetadata))
td.UserMetadata = _options.TopicUserMetadata;

return td;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,18 @@ public class AzureServiceBusMessageBusOptions : SharedMessageBusOptions {
public string SubscriptionUserMetadata { get; set; }

public RetryPolicy SubscriptionRetryPolicy { get; set; }

public ReceiveMode SubscriptionReceiveMode { get; set; } = ReceiveMode.ReceiveAndDelete;
}

public class AzureServiceBusMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder<
AzureServiceBusMessageBusOptions, AzureServiceBusMessageBusOptionsBuilder> {

public AzureServiceBusMessageBusOptionsBuilder ConnectionString(string connectionString) {
Target.ConnectionString = connectionString;
return this;
}

public AzureServiceBusMessageBusOptionsBuilder PrefetchCount(int prefetchCount) {
Target.PrefetchCount = prefetchCount;
return this;
Expand Down Expand Up @@ -292,5 +294,10 @@ public AzureServiceBusMessageBusOptionsBuilder SubscriptionRetryPolicy(RetryPoli
Target.SubscriptionRetryPolicy = subscriptionRetryPolicy ?? throw new ArgumentNullException(nameof(subscriptionRetryPolicy));
return this;
}

public AzureServiceBusMessageBusOptionsBuilder SubscriptionReceiveMode(ReceiveMode receiveMode) {
Target.SubscriptionReceiveMode = receiveMode;
return this;
}
}
}