From 5a4764c40caa325d1c95fa3aa5f7571b312b50ec Mon Sep 17 00:00:00 2001 From: Deon Heyns Date: Wed, 4 Jul 2018 07:37:11 -0300 Subject: [PATCH] Use new Azure ServiceBus library that can create queues --- .../servicestack.azure.core.nuspec | 7 +- .../Messaging/QueueClientExtensions.cs | 10 ++- .../Messaging/ServiceBusMqClient.cs | 6 +- .../Messaging/ServiceBusMqMessageFactory.cs | 68 ++++++++++++------- .../Messaging/ServiceBusMqMessageProducer.cs | 6 +- .../Messaging/ServiceBusMqServer.cs | 6 +- .../Messaging/ServiceBusMqWorker.cs | 2 - .../ServiceStack.Azure.csproj | 3 +- .../ServiceStack.Azure.Tests.csproj | 3 +- 9 files changed, 60 insertions(+), 51 deletions(-) diff --git a/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec b/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec index 86cb0d0..4c4feeb 100644 --- a/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec +++ b/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec @@ -4,7 +4,7 @@ ServiceStack.Azure.Core ServiceStack.Azure .NET Standard 2.0 5.0.0 - ServiceStack + ServiceStack ServiceStack .NET Standard 2.0 version of ServiceStack.Azure @@ -13,12 +13,11 @@ https://servicestack.net/terms https://servicestack.net/img/logo-32.png ServiceStack Azure WebServices ServiceBus Cache CacheClient - en-US + en-US ServiceStack and contributors - - + diff --git a/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs b/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs index d3f79a6..431907b 100644 --- a/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs +++ b/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs @@ -1,11 +1,9 @@ using System.Threading.Tasks; using System; using System.Text; -using ServiceStack.Text; namespace ServiceStack.Azure.Messaging { - public static class QueueClientExtensions { #if NETSTANDARD2_0 @@ -26,11 +24,11 @@ public static class QueueClientExtensions public static string GetBodyString(this Microsoft.Azure.ServiceBus.Message message) { var strMessage = Encoding.UTF8.GetString(message.Body); - + //Windows Azure Client is not wire-compatible with .NET Core client - //we check if the message comes from Windows client and cut off + //we check if the message comes from Windows client and cut off //64 header chars and 2 footer chars - //see https://github.com/Azure/azure-service-bus-dotnet/issues/239 + //see https://github.com/Azure/azure-service-bus-dotnet/issues/239 if (strMessage.StartsWith("@\u0006string", StringComparison.Ordinal)) { strMessage = strMessage.Substring(64, strMessage.Length - 66); @@ -43,5 +41,5 @@ public static string GetBodyString(this Microsoft.Azure.ServiceBus.Message messa internal static string SafeQueueName(this string queueName) => queueName?.Replace(":", "."); } - + } diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs index 464bead..7ef7b54 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs @@ -2,8 +2,6 @@ using ServiceStack.Text; using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading.Tasks; #if NETSTANDARD2_0 using Microsoft.Azure.ServiceBus; @@ -138,7 +136,7 @@ public override void Dispose() if (!task.IsCompleted) throw new TimeoutException("Reached timeout while getting message from client"); - } else + } else { await task; } @@ -170,7 +168,7 @@ public void Notify(string queueName, IMessage message) { if (parentFactory.MqServer.DisableNotifyMessages) return; - + Publish(queueName, message); } diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs index 6813f07..8a9bc8e 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; #if NETSTANDARD2_0 using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Management; #else using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; @@ -18,6 +19,8 @@ public class ServiceBusMqMessageFactory : IMessageFactory protected internal readonly string address; #if !NETSTANDARD2_0 protected internal readonly NamespaceManager namespaceManager; +#else + protected internal readonly ManagementClient managementClient; #endif internal Dictionary handlerMap; @@ -34,6 +37,8 @@ public ServiceBusMqMessageFactory(ServiceBusMqServer mqServer, string address) this.address = address; #if !NETSTANDARD2_0 this.namespaceManager = NamespaceManager.CreateFromConnectionString(address); +#else + this.managementClient = new ManagementClient(address); #endif } @@ -59,7 +64,7 @@ protected internal void StartQueues(Dictionary han queueMap = new Dictionary(); - var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" }; + var mqSuffixes = new[] { ".inq", ".outq", ".priorityq", ".dlq" }; foreach (var type in this.handlerMap.Keys) { foreach (var mqSuffix in mqSuffixes) @@ -69,11 +74,7 @@ protected internal void StartQueues(Dictionary han if (!queueMap.ContainsKey(queueName)) queueMap.Add(queueName, type); -#if !NETSTANDARD2_0 - var mqDesc = new QueueDescription(queueName); - if (!namespaceManager.QueueExists(queueName)) - namespaceManager.CreateQueue(mqDesc); -#endif + RegisterQueueByName(queueName); } var mqNames = new QueueNames(type); @@ -85,29 +86,27 @@ protected internal void StartQueues(Dictionary han private void AddQueueHandler(string queueName) { queueName = queueName.SafeQueueName(); - + var sbClient = GetOrCreateClient(queueName); #if NETSTANDARD2_0 - var sbClient = new QueueClient(address, queueName, ReceiveMode.PeekLock); var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient); sbClient.RegisterMessageHandler(sbWorker.HandleMessageAsync, new MessageHandlerOptions( - (eventArgs) => Task.CompletedTask) - { + (eventArgs) => Task.CompletedTask) + { MaxConcurrentCalls = 1, AutoComplete = false }); #else var options = new OnMessageOptions { - // Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would + // Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would // normally release the BrokeredMessage back to the Azure Service Bus queue, which we don't actually want - AutoComplete = false, + AutoComplete = false, //AutoRenewTimeout = new TimeSpan() MaxConcurrentCalls = 1 }; - var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock); var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient); sbClient.OnMessage(sbWorker.HandleMessage, options); #endif @@ -124,28 +123,49 @@ protected internal void StopQueues() sbClients.Clear(); } - protected internal QueueClient GetOrCreateClient(string queueName) + protected internal QueueClient GetOrCreateClient(string queueName, ReceiveMode receiveMode = ReceiveMode.PeekLock) { queueName = queueName.SafeQueueName(); - + if (sbClients.ContainsKey(queueName)) return sbClients[queueName]; -#if !NETSTANDARD2_0 - // Create queue on ServiceBus namespace if it doesn't exist - var qd = new QueueDescription(queueName); - if (!namespaceManager.QueueExists(queueName)) - namespaceManager.CreateQueue(qd); -#endif + var qd = RegisterQueueByName(queueName); #if NETSTANDARD2_0 - var sbClient = new QueueClient(address, queueName); + var sbClient = new QueueClient(address, qd.Path, receiveMode); #else - var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path); + var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path, receiveMode); #endif - sbClient = sbClients.GetOrAdd(queueName, sbClient); return sbClient; } + + protected internal QueueDescription RegisterQueueByName(string queueName) + { + var mqDesc = new QueueDescription(queueName); +#if !NETSTANDARD2_0 + if (!namespaceManager.QueueExists(queueName)) + namespaceManager.CreateQueue(mqDesc); +#else + try + { + managementClient.QueueExistsAsync(queueName) + .ContinueWith(async asc => + { + if (!asc.Result) + { + await managementClient.CreateQueueAsync(mqDesc) + .ConfigureAwait(continueOnCapturedContext: true); + } + }); + } + catch (AggregateException aex) + { + throw aex.Flatten(); + } +#endif + return mqDesc; + } } } diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs index 4073536..3964276 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using ServiceStack.Messaging; using ServiceStack.Text; #if NETSTANDARD2_0 @@ -56,6 +55,7 @@ public virtual void Publish(string queueName, IMessage message) message.ReplyTo = message.ReplyTo.SafeQueueName(); var sbClient = parentFactory.GetOrCreateClient(queueName); + parentFactory.RegisterQueueByName(queueName); using (JsConfig.With(includeTypeInfo: true)) { var msgBody = JsonSerializer.SerializeToString(message, typeof(IMessage)); @@ -86,7 +86,7 @@ protected MessageReceiver GetOrCreateMessageReceiver(string queueName) parentFactory.address, queueName, ReceiveMode.ReceiveAndDelete); //should be ReceiveMode.PeekLock, but it does not delete messages from queue on CompleteAsync() - + sbReceivers.Add(queueName, messageReceiver); return messageReceiver; } diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs index 6b12a78..e4c9766 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs @@ -2,8 +2,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace ServiceStack.Azure.Messaging { @@ -51,7 +49,7 @@ public ServiceBusMqServer(string connectionString) public List RegisteredTypes => handlerMap.Keys.ToList(); /// - /// Opt-in to only publish responses on this white list. + /// Opt-in to only publish responses on this white list. /// Publishes all responses by default. /// public string[] PublishResponsesWhitelist { get; set; } @@ -60,7 +58,7 @@ public bool DisablePublishingResponses { set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null; } - + /// /// Disable publishing .outq Messages for Responses with no return type /// diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs index 45cc9b2..a36fc36 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs @@ -3,8 +3,6 @@ using ServiceStack.Text; using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; #if NETSTANDARD2_0 diff --git a/src/ServiceStack.Azure/ServiceStack.Azure.csproj b/src/ServiceStack.Azure/ServiceStack.Azure.csproj index 20bfa95..5682217 100644 --- a/src/ServiceStack.Azure/ServiceStack.Azure.csproj +++ b/src/ServiceStack.Azure/ServiceStack.Azure.csproj @@ -46,8 +46,7 @@ - - + diff --git a/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj b/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj index f010275..6863fcf 100644 --- a/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj +++ b/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj @@ -27,7 +27,6 @@ - @@ -55,7 +54,7 @@ - +