diff --git a/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec b/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec
index 86cb0d0..4c4feeb 100644
--- a/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec
+++ b/NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec
@@ -4,7 +4,7 @@
ServiceStack.Azure.Core
ServiceStack.Azure .NET Standard 2.0
5.0.0
- ServiceStack
+ ServiceStack
ServiceStack
.NET Standard 2.0 version of ServiceStack.Azure
@@ -13,12 +13,11 @@
https://servicestack.net/terms
https://servicestack.net/img/logo-32.png
ServiceStack Azure WebServices ServiceBus Cache CacheClient
- en-US
+ en-US
ServiceStack and contributors
-
-
+
diff --git a/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs b/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs
index d3f79a6..431907b 100644
--- a/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs
+++ b/src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs
@@ -1,11 +1,9 @@
using System.Threading.Tasks;
using System;
using System.Text;
-using ServiceStack.Text;
namespace ServiceStack.Azure.Messaging
{
-
public static class QueueClientExtensions
{
#if NETSTANDARD2_0
@@ -26,11 +24,11 @@ public static class QueueClientExtensions
public static string GetBodyString(this Microsoft.Azure.ServiceBus.Message message)
{
var strMessage = Encoding.UTF8.GetString(message.Body);
-
+
//Windows Azure Client is not wire-compatible with .NET Core client
- //we check if the message comes from Windows client and cut off
+ //we check if the message comes from Windows client and cut off
//64 header chars and 2 footer chars
- //see https://github.com/Azure/azure-service-bus-dotnet/issues/239
+ //see https://github.com/Azure/azure-service-bus-dotnet/issues/239
if (strMessage.StartsWith("@\u0006string", StringComparison.Ordinal))
{
strMessage = strMessage.Substring(64, strMessage.Length - 66);
@@ -43,5 +41,5 @@ public static string GetBodyString(this Microsoft.Azure.ServiceBus.Message messa
internal static string SafeQueueName(this string queueName) =>
queueName?.Replace(":", ".");
}
-
+
}
diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs
index 464bead..7ef7b54 100644
--- a/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs
+++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs
@@ -2,8 +2,6 @@
using ServiceStack.Text;
using System;
using System.Collections.Generic;
-using System.Linq;
-using System.Text;
using System.Threading.Tasks;
#if NETSTANDARD2_0
using Microsoft.Azure.ServiceBus;
@@ -138,7 +136,7 @@ public override void Dispose()
if (!task.IsCompleted)
throw new TimeoutException("Reached timeout while getting message from client");
- } else
+ } else
{
await task;
}
@@ -170,7 +168,7 @@ public void Notify(string queueName, IMessage message)
{
if (parentFactory.MqServer.DisableNotifyMessages)
return;
-
+
Publish(queueName, message);
}
diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs
index 6813f07..8a9bc8e 100644
--- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs
+++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs
@@ -5,6 +5,7 @@
using System.Collections.Concurrent;
#if NETSTANDARD2_0
using Microsoft.Azure.ServiceBus;
+using Microsoft.Azure.ServiceBus.Management;
#else
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
@@ -18,6 +19,8 @@ public class ServiceBusMqMessageFactory : IMessageFactory
protected internal readonly string address;
#if !NETSTANDARD2_0
protected internal readonly NamespaceManager namespaceManager;
+#else
+ protected internal readonly ManagementClient managementClient;
#endif
internal Dictionary handlerMap;
@@ -34,6 +37,8 @@ public ServiceBusMqMessageFactory(ServiceBusMqServer mqServer, string address)
this.address = address;
#if !NETSTANDARD2_0
this.namespaceManager = NamespaceManager.CreateFromConnectionString(address);
+#else
+ this.managementClient = new ManagementClient(address);
#endif
}
@@ -59,7 +64,7 @@ protected internal void StartQueues(Dictionary han
queueMap = new Dictionary();
- var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" };
+ var mqSuffixes = new[] { ".inq", ".outq", ".priorityq", ".dlq" };
foreach (var type in this.handlerMap.Keys)
{
foreach (var mqSuffix in mqSuffixes)
@@ -69,11 +74,7 @@ protected internal void StartQueues(Dictionary han
if (!queueMap.ContainsKey(queueName))
queueMap.Add(queueName, type);
-#if !NETSTANDARD2_0
- var mqDesc = new QueueDescription(queueName);
- if (!namespaceManager.QueueExists(queueName))
- namespaceManager.CreateQueue(mqDesc);
-#endif
+ RegisterQueueByName(queueName);
}
var mqNames = new QueueNames(type);
@@ -85,29 +86,27 @@ protected internal void StartQueues(Dictionary han
private void AddQueueHandler(string queueName)
{
queueName = queueName.SafeQueueName();
-
+ var sbClient = GetOrCreateClient(queueName);
#if NETSTANDARD2_0
- var sbClient = new QueueClient(address, queueName, ReceiveMode.PeekLock);
var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.RegisterMessageHandler(sbWorker.HandleMessageAsync,
new MessageHandlerOptions(
- (eventArgs) => Task.CompletedTask)
- {
+ (eventArgs) => Task.CompletedTask)
+ {
MaxConcurrentCalls = 1,
AutoComplete = false
});
#else
var options = new OnMessageOptions
{
- // Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would
+ // Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would
// normally release the BrokeredMessage back to the Azure Service Bus queue, which we don't actually want
- AutoComplete = false,
+ AutoComplete = false,
//AutoRenewTimeout = new TimeSpan()
MaxConcurrentCalls = 1
};
- var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock);
var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.OnMessage(sbWorker.HandleMessage, options);
#endif
@@ -124,28 +123,49 @@ protected internal void StopQueues()
sbClients.Clear();
}
- protected internal QueueClient GetOrCreateClient(string queueName)
+ protected internal QueueClient GetOrCreateClient(string queueName, ReceiveMode receiveMode = ReceiveMode.PeekLock)
{
queueName = queueName.SafeQueueName();
-
+
if (sbClients.ContainsKey(queueName))
return sbClients[queueName];
-#if !NETSTANDARD2_0
- // Create queue on ServiceBus namespace if it doesn't exist
- var qd = new QueueDescription(queueName);
- if (!namespaceManager.QueueExists(queueName))
- namespaceManager.CreateQueue(qd);
-#endif
+ var qd = RegisterQueueByName(queueName);
#if NETSTANDARD2_0
- var sbClient = new QueueClient(address, queueName);
+ var sbClient = new QueueClient(address, qd.Path, receiveMode);
#else
- var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path);
+ var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path, receiveMode);
#endif
-
sbClient = sbClients.GetOrAdd(queueName, sbClient);
return sbClient;
}
+
+ protected internal QueueDescription RegisterQueueByName(string queueName)
+ {
+ var mqDesc = new QueueDescription(queueName);
+#if !NETSTANDARD2_0
+ if (!namespaceManager.QueueExists(queueName))
+ namespaceManager.CreateQueue(mqDesc);
+#else
+ try
+ {
+ managementClient.QueueExistsAsync(queueName)
+ .ContinueWith(async asc =>
+ {
+ if (!asc.Result)
+ {
+ await managementClient.CreateQueueAsync(mqDesc)
+ .ConfigureAwait(continueOnCapturedContext: true);
+ }
+ });
+ }
+ catch (AggregateException aex)
+ {
+ throw aex.Flatten();
+ }
+#endif
+ return mqDesc;
+ }
}
}
diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs
index 4073536..3964276 100644
--- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs
+++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageProducer.cs
@@ -1,5 +1,4 @@
-using System;
-using System.Collections.Generic;
+using System.Collections.Generic;
using ServiceStack.Messaging;
using ServiceStack.Text;
#if NETSTANDARD2_0
@@ -56,6 +55,7 @@ public virtual void Publish(string queueName, IMessage message)
message.ReplyTo = message.ReplyTo.SafeQueueName();
var sbClient = parentFactory.GetOrCreateClient(queueName);
+ parentFactory.RegisterQueueByName(queueName);
using (JsConfig.With(includeTypeInfo: true))
{
var msgBody = JsonSerializer.SerializeToString(message, typeof(IMessage));
@@ -86,7 +86,7 @@ protected MessageReceiver GetOrCreateMessageReceiver(string queueName)
parentFactory.address,
queueName,
ReceiveMode.ReceiveAndDelete); //should be ReceiveMode.PeekLock, but it does not delete messages from queue on CompleteAsync()
-
+
sbReceivers.Add(queueName, messageReceiver);
return messageReceiver;
}
diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs
index 6b12a78..e4c9766 100644
--- a/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs
+++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs
@@ -2,8 +2,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
namespace ServiceStack.Azure.Messaging
{
@@ -51,7 +49,7 @@ public ServiceBusMqServer(string connectionString)
public List RegisteredTypes => handlerMap.Keys.ToList();
///
- /// Opt-in to only publish responses on this white list.
+ /// Opt-in to only publish responses on this white list.
/// Publishes all responses by default.
///
public string[] PublishResponsesWhitelist { get; set; }
@@ -60,7 +58,7 @@ public bool DisablePublishingResponses
{
set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null;
}
-
+
///
/// Disable publishing .outq Messages for Responses with no return type
///
diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs
index 45cc9b2..a36fc36 100644
--- a/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs
+++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs
@@ -3,8 +3,6 @@
using ServiceStack.Text;
using System;
using System.Collections.Generic;
-using System.Linq;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
#if NETSTANDARD2_0
diff --git a/src/ServiceStack.Azure/ServiceStack.Azure.csproj b/src/ServiceStack.Azure/ServiceStack.Azure.csproj
index 20bfa95..5682217 100644
--- a/src/ServiceStack.Azure/ServiceStack.Azure.csproj
+++ b/src/ServiceStack.Azure/ServiceStack.Azure.csproj
@@ -46,8 +46,7 @@
-
-
+
diff --git a/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj b/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj
index f010275..6863fcf 100644
--- a/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj
+++ b/tests/ServiceStack.Azure.Tests/ServiceStack.Azure.Tests.csproj
@@ -27,7 +27,6 @@
-
@@ -55,7 +54,7 @@
-
+