diff --git a/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec b/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec index 121c51d..7c75280 100644 --- a/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec +++ b/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec @@ -18,7 +18,6 @@ - diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs index 0274a05..9df3fa5 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs @@ -43,7 +43,7 @@ public void Ack(IMessage message) try { #if NETSTANDARD2_0 - sbClient.CompleteAsync(lockToken).Wait(); + sbClient.CompleteAsync(lockToken).GetAwaiter().GetResult(); #else sbClient.Complete(Guid.Parse(lockToken)); #endif @@ -64,7 +64,7 @@ public IMessage CreateMessage(object mqResponse) return null; var msgBody = msg.Body.FromMessageBody(); #else - if (!(mqResponse is BrokeredMessage msg)) + if (!(mqResponse is BrokeredMessage msg)) return null; var msgBody = msg.GetBody().FromMessageBody(); #endif @@ -140,7 +140,7 @@ public override void Dispose() if (!task.IsCompleted) throw new TimeoutException("Reached timeout while getting message from client"); - } else + } else { await task; } @@ -172,7 +172,7 @@ public void Notify(string queueName, IMessage message) { if (parentFactory.MqServer.DisableNotifyMessages) return; - + Publish(queueName, message); } diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs index 6813f07..26711a2 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs @@ -5,6 +5,8 @@ using System.Collections.Concurrent; #if NETSTANDARD2_0 using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Management; + #else using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; @@ -18,13 +20,16 @@ public class ServiceBusMqMessageFactory : IMessageFactory protected internal readonly string address; #if !NETSTANDARD2_0 protected internal readonly NamespaceManager namespaceManager; +#else + protected internal readonly ManagementClient managementClient; #endif internal Dictionary handlerMap; Dictionary queueMap; // A list of all Service Bus QueueClients - one per type & queue (priorityq, inq, outq, and dlq) - private static readonly ConcurrentDictionary sbClients = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary sbClients = + new ConcurrentDictionary(); public ServiceBusMqServer MqServer { get; } @@ -34,6 +39,8 @@ public ServiceBusMqMessageFactory(ServiceBusMqServer mqServer, string address) this.address = address; #if !NETSTANDARD2_0 this.namespaceManager = NamespaceManager.CreateFromConnectionString(address); +#else + this.managementClient = new ManagementClient(address); #endif } @@ -49,7 +56,6 @@ public IMessageQueueClient CreateMessageQueueClient() public void Dispose() { - } protected internal void StartQueues(Dictionary handlerMap) @@ -59,7 +65,7 @@ protected internal void StartQueues(Dictionary han queueMap = new Dictionary(); - var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" }; + var mqSuffixes = new[] {".inq", ".outq", ".priorityq", ".dlq"}; foreach (var type in this.handlerMap.Keys) { foreach (var mqSuffix in mqSuffixes) @@ -69,10 +75,17 @@ protected internal void StartQueues(Dictionary han if (!queueMap.ContainsKey(queueName)) queueMap.Add(queueName, type); -#if !NETSTANDARD2_0 + var mqDesc = new QueueDescription(queueName); +#if !NETSTANDARD2_0 if (!namespaceManager.QueueExists(queueName)) namespaceManager.CreateQueue(mqDesc); +#else + // Prefer GetAwaiter().GetResult() so that the StackTrace + // is easier to use, see: + // https://stackoverflow.com/a/36427080 + if (!managementClient.QueueExistsAsync(mqDesc.Path).GetAwaiter().GetResult()) + managementClient.CreateQueueAsync(mqDesc).GetAwaiter().GetResult(); #endif } @@ -91,23 +104,23 @@ private void AddQueueHandler(string queueName) var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient); sbClient.RegisterMessageHandler(sbWorker.HandleMessageAsync, new MessageHandlerOptions( - (eventArgs) => Task.CompletedTask) - { + (eventArgs) => Task.CompletedTask) + { MaxConcurrentCalls = 1, AutoComplete = false }); #else + var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock); var options = new OnMessageOptions { - // Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would + // Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would // normally release the BrokeredMessage back to the Azure Service Bus queue, which we don't actually want - AutoComplete = false, + AutoComplete = false, //AutoRenewTimeout = new TimeSpan() MaxConcurrentCalls = 1 }; - var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock); var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient); sbClient.OnMessage(sbWorker.HandleMessage, options); #endif @@ -127,25 +140,37 @@ protected internal void StopQueues() protected internal QueueClient GetOrCreateClient(string queueName) { queueName = queueName.SafeQueueName(); - + if (sbClients.ContainsKey(queueName)) return sbClients[queueName]; -#if !NETSTANDARD2_0 - // Create queue on ServiceBus namespace if it doesn't exist var qd = new QueueDescription(queueName); - if (!namespaceManager.QueueExists(queueName)) - namespaceManager.CreateQueue(qd); -#endif -#if NETSTANDARD2_0 - var sbClient = new QueueClient(address, queueName); -#else +#if !NETSTANDARD2_0 +// Create queue on ServiceBus namespace if it doesn't exist + if (!namespaceManager.QueueExists(queueName)) + { + try + { + namespaceManager.CreateQueue(qd); + } + catch (MessagingEntityAlreadyExistsException) { /* ignore */ } + } var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path); -#endif +#else + if (!managementClient.QueueExistsAsync(queueName).GetAwaiter().GetResult()) + { + try + { + managementClient.CreateQueueAsync(qd).GetAwaiter().GetResult(); + } + catch (MessagingEntityAlreadyExistsException) { /* ignore */ } + } + var sbClient = new QueueClient(address, queueName); +#endif sbClient = sbClients.GetOrAdd(queueName, sbClient); return sbClient; } } -} +} \ No newline at end of file diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs index 4073536..e9cec48 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs @@ -67,7 +67,7 @@ public virtual void Publish(string queueName, IMessage message) }; sbClient.SendAsync(msg).Wait(); #else - var msg = new BrokeredMessage(msgBody) {MessageId = message.Id.ToString()}; + var msg = new BrokeredMessage(msgBody) { MessageId = message.Id.ToString() }; sbClient.Send(msg); #endif @@ -86,7 +86,7 @@ protected MessageReceiver GetOrCreateMessageReceiver(string queueName) parentFactory.address, queueName, ReceiveMode.ReceiveAndDelete); //should be ReceiveMode.PeekLock, but it does not delete messages from queue on CompleteAsync() - + sbReceivers.Add(queueName, messageReceiver); return messageReceiver; } diff --git a/src/ServiceStack.Azure/ServiceStack.Azure.csproj b/src/ServiceStack.Azure/ServiceStack.Azure.csproj index 5d2f431..97852c3 100644 --- a/src/ServiceStack.Azure/ServiceStack.Azure.csproj +++ b/src/ServiceStack.Azure/ServiceStack.Azure.csproj @@ -47,7 +47,6 @@ - diff --git a/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj b/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj index 6443409..aab2c07 100644 --- a/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj +++ b/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj @@ -53,9 +53,7 @@ - - - +