Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions NuGet/ServiceStack.Azure.Core/servicestack.azure.core.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<id>ServiceStack.Azure.Core</id>
<title>ServiceStack.Azure .NET Standard 2.0</title>
<version>5.0.0</version>
<authors>ServiceStack</authors>
<authors>ServiceStack</authors>
<owners>ServiceStack</owners>
<description>
.NET Standard 2.0 version of ServiceStack.Azure
Expand All @@ -13,12 +13,11 @@
<licenseUrl>https://servicestack.net/terms</licenseUrl>
<iconUrl>https://servicestack.net/img/logo-32.png</iconUrl>
<tags>ServiceStack Azure WebServices ServiceBus Cache CacheClient</tags>
<language>en-US</language>
<language>en-US</language>
<copyright>ServiceStack and contributors</copyright>
<dependencies>
<group targetFramework=".netstandard2.0">
<dependency id="Microsoft.Azure.ServiceBus" version="[2.0.0, )" />
<dependency id="Microsoft.Azure.Management.ServiceBus" version="[1.2.0, )" />
<dependency id="Microsoft.Azure.ServiceBus" version="[3.1.0-preview, )" />
<dependency id="WindowsAzure.Storage" version="[9.1.1, )" />
<dependency id="ServiceStack.Core" version="5.0.0" />
</group>
Expand Down
10 changes: 4 additions & 6 deletions src/ServiceStack.Azure/Messaging/QueueClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
using System.Threading.Tasks;
using System;
using System.Text;
using ServiceStack.Text;

namespace ServiceStack.Azure.Messaging
{

public static class QueueClientExtensions
{
#if NETSTANDARD2_0
Expand All @@ -26,11 +24,11 @@ public static class QueueClientExtensions
public static string GetBodyString(this Microsoft.Azure.ServiceBus.Message message)
{
var strMessage = Encoding.UTF8.GetString(message.Body);

//Windows Azure Client is not wire-compatible with .NET Core client
//we check if the message comes from Windows client and cut off
//we check if the message comes from Windows client and cut off
//64 header chars and 2 footer chars
//see https://github.com/Azure/azure-service-bus-dotnet/issues/239
//see https://github.com/Azure/azure-service-bus-dotnet/issues/239
if (strMessage.StartsWith("@\u0006string", StringComparison.Ordinal))
{
strMessage = strMessage.Substring(64, strMessage.Length - 66);
Expand All @@ -43,5 +41,5 @@ public static string GetBodyString(this Microsoft.Azure.ServiceBus.Message messa
internal static string SafeQueueName(this string queueName) =>
queueName?.Replace(":", ".");
}

}
6 changes: 2 additions & 4 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using ServiceStack.Text;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
#if NETSTANDARD2_0
using Microsoft.Azure.ServiceBus;
Expand Down Expand Up @@ -138,7 +136,7 @@ public override void Dispose()

if (!task.IsCompleted)
throw new TimeoutException("Reached timeout while getting message from client");
} else
} else
{
await task;
}
Expand Down Expand Up @@ -170,7 +168,7 @@ public void Notify(string queueName, IMessage message)
{
if (parentFactory.MqServer.DisableNotifyMessages)
return;

Publish(queueName, message);
}

Expand Down
68 changes: 44 additions & 24 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
#if NETSTANDARD2_0
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
#else
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
Expand All @@ -18,6 +19,8 @@ public class ServiceBusMqMessageFactory : IMessageFactory
protected internal readonly string address;
#if !NETSTANDARD2_0
protected internal readonly NamespaceManager namespaceManager;
#else
protected internal readonly ManagementClient managementClient;
#endif

internal Dictionary<Type, IMessageHandlerFactory> handlerMap;
Expand All @@ -34,6 +37,8 @@ public ServiceBusMqMessageFactory(ServiceBusMqServer mqServer, string address)
this.address = address;
#if !NETSTANDARD2_0
this.namespaceManager = NamespaceManager.CreateFromConnectionString(address);
#else
this.managementClient = new ManagementClient(address);
#endif
}

Expand All @@ -59,7 +64,7 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han

queueMap = new Dictionary<string, Type>();

var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" };
var mqSuffixes = new[] { ".inq", ".outq", ".priorityq", ".dlq" };
foreach (var type in this.handlerMap.Keys)
{
foreach (var mqSuffix in mqSuffixes)
Expand All @@ -69,11 +74,7 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han

if (!queueMap.ContainsKey(queueName))
queueMap.Add(queueName, type);
#if !NETSTANDARD2_0
var mqDesc = new QueueDescription(queueName);
if (!namespaceManager.QueueExists(queueName))
namespaceManager.CreateQueue(mqDesc);
#endif
RegisterQueueByName(queueName);
}

var mqNames = new QueueNames(type);
Expand All @@ -85,29 +86,27 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han
private void AddQueueHandler(string queueName)
{
queueName = queueName.SafeQueueName();

var sbClient = GetOrCreateClient(queueName);
#if NETSTANDARD2_0
var sbClient = new QueueClient(address, queueName, ReceiveMode.PeekLock);
var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.RegisterMessageHandler(sbWorker.HandleMessageAsync,
new MessageHandlerOptions(
(eventArgs) => Task.CompletedTask)
{
(eventArgs) => Task.CompletedTask)
{
MaxConcurrentCalls = 1,
AutoComplete = false
});
#else
var options = new OnMessageOptions
{
// Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would
// Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would
// normally release the BrokeredMessage back to the Azure Service Bus queue, which we don't actually want

AutoComplete = false,
AutoComplete = false,
//AutoRenewTimeout = new TimeSpan()
MaxConcurrentCalls = 1
};

var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock);
var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.OnMessage(sbWorker.HandleMessage, options);
#endif
Expand All @@ -124,28 +123,49 @@ protected internal void StopQueues()
sbClients.Clear();
}

protected internal QueueClient GetOrCreateClient(string queueName)
protected internal QueueClient GetOrCreateClient(string queueName, ReceiveMode receiveMode = ReceiveMode.PeekLock)
{
queueName = queueName.SafeQueueName();

if (sbClients.ContainsKey(queueName))
return sbClients[queueName];

#if !NETSTANDARD2_0
// Create queue on ServiceBus namespace if it doesn't exist
var qd = new QueueDescription(queueName);
if (!namespaceManager.QueueExists(queueName))
namespaceManager.CreateQueue(qd);
#endif
var qd = RegisterQueueByName(queueName);

#if NETSTANDARD2_0
var sbClient = new QueueClient(address, queueName);
var sbClient = new QueueClient(address, qd.Path, receiveMode);
#else
var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path);
var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path, receiveMode);
#endif

sbClient = sbClients.GetOrAdd(queueName, sbClient);
return sbClient;
}

protected internal QueueDescription RegisterQueueByName(string queueName)
{
var mqDesc = new QueueDescription(queueName);
#if !NETSTANDARD2_0
if (!namespaceManager.QueueExists(queueName))
namespaceManager.CreateQueue(mqDesc);
#else
try
{
managementClient.QueueExistsAsync(queueName)
.ContinueWith(async asc =>
{
if (!asc.Result)
{
await managementClient.CreateQueueAsync(mqDesc)
.ConfigureAwait(continueOnCapturedContext: true);
}
});
}
catch (AggregateException aex)
{
throw aex.Flatten();
}
#endif
return mqDesc;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using ServiceStack.Messaging;
using ServiceStack.Text;
#if NETSTANDARD2_0
Expand Down Expand Up @@ -56,6 +55,7 @@ public virtual void Publish(string queueName, IMessage message)
message.ReplyTo = message.ReplyTo.SafeQueueName();

var sbClient = parentFactory.GetOrCreateClient(queueName);
parentFactory.RegisterQueueByName(queueName);
using (JsConfig.With(includeTypeInfo: true))
{
var msgBody = JsonSerializer.SerializeToString(message, typeof(IMessage));
Expand Down Expand Up @@ -86,7 +86,7 @@ protected MessageReceiver GetOrCreateMessageReceiver(string queueName)
parentFactory.address,
queueName,
ReceiveMode.ReceiveAndDelete); //should be ReceiveMode.PeekLock, but it does not delete messages from queue on CompleteAsync()

sbReceivers.Add(queueName, messageReceiver);
return messageReceiver;
}
Expand Down
6 changes: 2 additions & 4 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ServiceStack.Azure.Messaging
{
Expand Down Expand Up @@ -51,7 +49,7 @@ public ServiceBusMqServer(string connectionString)
public List<Type> RegisteredTypes => handlerMap.Keys.ToList();

/// <summary>
/// Opt-in to only publish responses on this white list.
/// Opt-in to only publish responses on this white list.
/// Publishes all responses by default.
/// </summary>
public string[] PublishResponsesWhitelist { get; set; }
Expand All @@ -60,7 +58,7 @@ public bool DisablePublishingResponses
{
set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null;
}

/// <summary>
/// Disable publishing .outq Messages for Responses with no return type
/// </summary>
Expand Down
2 changes: 0 additions & 2 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
using ServiceStack.Text;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
#if NETSTANDARD2_0
Expand Down
3 changes: 1 addition & 2 deletions src/ServiceStack.Azure/ServiceStack.Azure.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="1.2.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.1.0-preview" />

<Reference Include="..\..\lib\netstandard2.0\ServiceStack.Interfaces.dll" />
<Reference Include="..\..\lib\netstandard2.0\ServiceStack.Text.dll" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net46' ">
<PackageReference Include="WindowsAzure.ServiceBus" Version="4.1.8" />
<PackageReference Include="Microsoft.WindowsAzure.ConfigurationManager" Version="3.2.3" />

<Reference Include="..\..\lib\net45\ServiceStack.Interfaces.dll" />
Expand Down Expand Up @@ -55,7 +54,7 @@
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.0' ">
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="2.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.1.0-preview" />
<PackageReference Include="Microsoft.Extensions.Primitives" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.0.1" />
Expand Down