Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<dependencies>
<group targetFramework=".netstandard2.0">
<dependency id="Microsoft.Azure.ServiceBus" version="[3.1.0, )" />
<dependency id="Microsoft.Azure.Management.ServiceBus" version="[1.4.0, )" />
<dependency id="WindowsAzure.Storage" version="[9.3.2, )" />
<dependency id="ServiceStack.Core" version="5.0.0" />
</group>
Expand Down
8 changes: 4 additions & 4 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void Ack(IMessage message)
try
{
#if NETSTANDARD2_0
sbClient.CompleteAsync(lockToken).Wait();
sbClient.CompleteAsync(lockToken).GetAwaiter().GetResult();
#else
sbClient.Complete(Guid.Parse(lockToken));
#endif
Expand All @@ -64,7 +64,7 @@ public IMessage<T> CreateMessage<T>(object mqResponse)
return null;
var msgBody = msg.Body.FromMessageBody();
#else
if (!(mqResponse is BrokeredMessage msg))
if (!(mqResponse is BrokeredMessage msg))
return null;
var msgBody = msg.GetBody<Stream>().FromMessageBody();
#endif
Expand Down Expand Up @@ -140,7 +140,7 @@ public override void Dispose()

if (!task.IsCompleted)
throw new TimeoutException("Reached timeout while getting message from client");
} else
} else
{
await task;
}
Expand Down Expand Up @@ -172,7 +172,7 @@ public void Notify(string queueName, IMessage message)
{
if (parentFactory.MqServer.DisableNotifyMessages)
return;

Publish(queueName, message);
}

Expand Down
65 changes: 45 additions & 20 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Collections.Concurrent;
#if NETSTANDARD2_0
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;

#else
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
Expand All @@ -18,13 +20,16 @@ public class ServiceBusMqMessageFactory : IMessageFactory
protected internal readonly string address;
#if !NETSTANDARD2_0
protected internal readonly NamespaceManager namespaceManager;
#else
protected internal readonly ManagementClient managementClient;
#endif

internal Dictionary<Type, IMessageHandlerFactory> handlerMap;
Dictionary<string, Type> queueMap;

// A list of all Service Bus QueueClients - one per type & queue (priorityq, inq, outq, and dlq)
private static readonly ConcurrentDictionary<string, QueueClient> sbClients = new ConcurrentDictionary<string, QueueClient>();
private static readonly ConcurrentDictionary<string, QueueClient> sbClients =
new ConcurrentDictionary<string, QueueClient>();

public ServiceBusMqServer MqServer { get; }

Expand All @@ -34,6 +39,8 @@ public ServiceBusMqMessageFactory(ServiceBusMqServer mqServer, string address)
this.address = address;
#if !NETSTANDARD2_0
this.namespaceManager = NamespaceManager.CreateFromConnectionString(address);
#else
this.managementClient = new ManagementClient(address);
#endif
}

Expand All @@ -49,7 +56,6 @@ public IMessageQueueClient CreateMessageQueueClient()

public void Dispose()
{

}

protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> handlerMap)
Expand All @@ -59,7 +65,7 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han

queueMap = new Dictionary<string, Type>();

var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" };
var mqSuffixes = new[] {".inq", ".outq", ".priorityq", ".dlq"};
foreach (var type in this.handlerMap.Keys)
{
foreach (var mqSuffix in mqSuffixes)
Expand All @@ -69,10 +75,17 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han

if (!queueMap.ContainsKey(queueName))
queueMap.Add(queueName, type);
#if !NETSTANDARD2_0

var mqDesc = new QueueDescription(queueName);
#if !NETSTANDARD2_0
if (!namespaceManager.QueueExists(queueName))
namespaceManager.CreateQueue(mqDesc);
#else
// Prefer GetAwaiter().GetResult() so that the StackTrace
// is easier to use, see:
// https://stackoverflow.com/a/36427080
if (!managementClient.QueueExistsAsync(mqDesc.Path).GetAwaiter().GetResult())
managementClient.CreateQueueAsync(mqDesc).GetAwaiter().GetResult();
#endif
}

Expand All @@ -91,23 +104,23 @@ private void AddQueueHandler(string queueName)
var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.RegisterMessageHandler(sbWorker.HandleMessageAsync,
new MessageHandlerOptions(
(eventArgs) => Task.CompletedTask)
{
(eventArgs) => Task.CompletedTask)
{
MaxConcurrentCalls = 1,
AutoComplete = false
});
#else
var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock);
var options = new OnMessageOptions
{
// Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would
// Cannot use AutoComplete because our HandleMessage throws errors into SS's handlers; this would
// normally release the BrokeredMessage back to the Azure Service Bus queue, which we don't actually want

AutoComplete = false,
AutoComplete = false,
//AutoRenewTimeout = new TimeSpan()
MaxConcurrentCalls = 1
};

var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock);
var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.OnMessage(sbWorker.HandleMessage, options);
#endif
Expand All @@ -127,25 +140,37 @@ protected internal void StopQueues()
protected internal QueueClient GetOrCreateClient(string queueName)
{
queueName = queueName.SafeQueueName();

if (sbClients.ContainsKey(queueName))
return sbClients[queueName];

#if !NETSTANDARD2_0
// Create queue on ServiceBus namespace if it doesn't exist
var qd = new QueueDescription(queueName);
if (!namespaceManager.QueueExists(queueName))
namespaceManager.CreateQueue(qd);
#endif

#if NETSTANDARD2_0
var sbClient = new QueueClient(address, queueName);
#else
#if !NETSTANDARD2_0
// Create queue on ServiceBus namespace if it doesn't exist
if (!namespaceManager.QueueExists(queueName))
{
try
{
namespaceManager.CreateQueue(qd);
}
catch (MessagingEntityAlreadyExistsException) { /* ignore */ }
}
var sbClient = QueueClient.CreateFromConnectionString(address, qd.Path);
#endif
#else
if (!managementClient.QueueExistsAsync(queueName).GetAwaiter().GetResult())
{
try
{
managementClient.CreateQueueAsync(qd).GetAwaiter().GetResult();
}
catch (MessagingEntityAlreadyExistsException) { /* ignore */ }
}
var sbClient = new QueueClient(address, queueName);

#endif
sbClient = sbClients.GetOrAdd(queueName, sbClient);
return sbClient;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public virtual void Publish(string queueName, IMessage message)
};
sbClient.SendAsync(msg).Wait();
#else
var msg = new BrokeredMessage(msgBody) {MessageId = message.Id.ToString()};
var msg = new BrokeredMessage(msgBody) { MessageId = message.Id.ToString() };

sbClient.Send(msg);
#endif
Expand All @@ -86,7 +86,7 @@ protected MessageReceiver GetOrCreateMessageReceiver(string queueName)
parentFactory.address,
queueName,
ReceiveMode.ReceiveAndDelete); //should be ReceiveMode.PeekLock, but it does not delete messages from queue on CompleteAsync()

sbReceivers.Add(queueName, messageReceiver);
return messageReceiver;
}
Expand Down
1 change: 0 additions & 1 deletion src/ServiceStack.Azure/ServiceStack.Azure.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.1.0" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="1.4.0" />

<Reference Include="..\..\lib\netstandard2.0\ServiceStack.Interfaces.dll" />
<Reference Include="..\..\lib\netstandard2.0\ServiceStack.Text.dll" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.1.0" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="1.4.0" />

<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.App" Version="2.1.3" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Collections.Specialized" Version="4.3.0" />

Expand Down