Skip to content

Commit d232ed7

Browse files
committed
Added acceptance tests and fixed a bug
1 parent d5ac77e commit d232ed7

File tree

7 files changed

+654
-13
lines changed

7 files changed

+654
-13
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
namespace ServiceControl.AcceptanceTests.Recoverability.ExternalIntegration
2+
{
3+
using System;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using Infrastructure;
7+
using NServiceBus;
8+
using NServiceBus.AcceptanceTesting;
9+
using NServiceBus.Settings;
10+
using ServiceBus.Management.Infrastructure.Settings;
11+
using NUnit.Framework;
12+
using ServiceControl.Contracts;
13+
using ServiceControl.MessageFailures;
14+
using TestSupport;
15+
using TestSupport.EndpointTemplates;
16+
using Newtonsoft.Json;
17+
18+
class When_a_failed_message_is_archived : AcceptanceTest
19+
{
20+
[Test]
21+
public async Task Should_publish_notification()
22+
{
23+
CustomConfiguration = config => config.OnEndpointSubscribed<MyContext>((s, ctx) =>
24+
{
25+
if (s.SubscriberReturnAddress.IndexOf("ExternalProcessor", StringComparison.OrdinalIgnoreCase) >= 0)
26+
{
27+
ctx.ExternalProcessorSubscribed = true;
28+
}
29+
});
30+
31+
var context = await Define<MyContext>()
32+
.WithEndpoint<Receiver>(b => b.When(c => c.ExternalProcessorSubscribed, async bus =>
33+
{
34+
await bus.SendLocal<MyMessage>(m => m.MessageNumber = 1)
35+
.ConfigureAwait(false);
36+
}).DoNotFailOnErrorMessages())
37+
.WithEndpoint<ExternalProcessor>(b => b.When(async (bus, c) =>
38+
{
39+
await bus.Subscribe<FailedMessageArchived>();
40+
41+
if (c.HasNativePubSubSupport)
42+
{
43+
c.ExternalProcessorSubscribed = true;
44+
}
45+
}))
46+
.Do("WaitUntilErrorsContainsFaildMessage", async ctx =>
47+
{
48+
if (ctx.FailedMessageId == null)
49+
{
50+
return false;
51+
}
52+
53+
return await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}",
54+
e => e.Status == FailedMessageStatus.Unresolved);
55+
})
56+
.Do("Archive", async ctx =>
57+
{
58+
await this.Post<object>($"/api/errors/{ctx.FailedMessageId}/archive");
59+
})
60+
.Do("EnsureMessageIsArchived", async ctx =>
61+
{
62+
return await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}",
63+
e => e.Status == FailedMessageStatus.Archived);
64+
})
65+
.Done(ctx => ctx.EventDelivered) //Done when sequence is finished
66+
.Run();
67+
68+
var deserializedEvent = JsonConvert.DeserializeObject<FailedMessageArchived>(context.Event);
69+
Assert.IsTrue(deserializedEvent.FailedMessageId == context.FailedMessageId);
70+
}
71+
72+
public class Receiver : EndpointConfigurationBuilder
73+
{
74+
public Receiver()
75+
{
76+
EndpointSetup<DefaultServer>(c =>
77+
{
78+
c.NoDelayedRetries();
79+
c.ReportSuccessfulRetriesToServiceControl();
80+
});
81+
}
82+
83+
public class MyMessageHandler : IHandleMessages<MyMessage>
84+
{
85+
public MyContext Context { get; set; }
86+
public ReadOnlySettings Settings { get; set; }
87+
88+
public Task Handle(MyMessage message, IMessageHandlerContext context)
89+
{
90+
var messageId = context.MessageId.Replace(@"\", "-");
91+
92+
var uniqueMessageId = DeterministicGuid.MakeId(messageId, Settings.EndpointName()).ToString();
93+
94+
if (message.MessageNumber == 1)
95+
{
96+
Context.FailedMessageId = uniqueMessageId;
97+
}
98+
99+
if (Context.FailProcessing)
100+
{
101+
throw new Exception("Simulated exception");
102+
}
103+
104+
return Task.FromResult(0);
105+
}
106+
}
107+
}
108+
109+
public class ExternalProcessor : EndpointConfigurationBuilder
110+
{
111+
public ExternalProcessor()
112+
{
113+
EndpointSetup<DefaultServer>(c =>
114+
{
115+
var routing = c.ConfigureTransport().Routing();
116+
routing.RouteToEndpoint(typeof(FailedMessageArchived).Assembly, Settings.DEFAULT_SERVICE_NAME);
117+
}, publisherMetadata => { publisherMetadata.RegisterPublisherFor<FailedMessageArchived>(Settings.DEFAULT_SERVICE_NAME); });
118+
}
119+
120+
public class FailureHandler : IHandleMessages<FailedMessageArchived>
121+
{
122+
public MyContext Context { get; set; }
123+
124+
public Task Handle(FailedMessageArchived message, IMessageHandlerContext context)
125+
{
126+
var serializedMessage = JsonConvert.SerializeObject(message);
127+
Context.Event = serializedMessage;
128+
Context.EventDelivered = true;
129+
return Task.FromResult(0);
130+
}
131+
}
132+
}
133+
134+
public class MyMessage : ICommand
135+
{
136+
public int MessageNumber { get; set; }
137+
}
138+
139+
public class MyContext : ScenarioContext, ISequenceContext
140+
{
141+
public string FailedMessageId { get; set; }
142+
public string GroupId { get; set; }
143+
public bool FailProcessing { get; set; } = true;
144+
public int Step { get; set; }
145+
public bool ExternalProcessorSubscribed { get; set; }
146+
public string Event { get; set; }
147+
public bool EventDelivered { get; set; }
148+
}
149+
}
150+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
namespace ServiceControl.AcceptanceTests.Recoverability.ExternalIntegration
2+
{
3+
using System;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using Infrastructure;
7+
using NServiceBus;
8+
using NServiceBus.AcceptanceTesting;
9+
using NServiceBus.Settings;
10+
using ServiceBus.Management.Infrastructure.Settings;
11+
using NUnit.Framework;
12+
using ServiceControl.Contracts;
13+
using ServiceControl.MessageFailures;
14+
using TestSupport;
15+
using TestSupport.EndpointTemplates;
16+
using Newtonsoft.Json;
17+
18+
class When_a_failed_message_is_resolved_by_retry : AcceptanceTest
19+
{
20+
[Test]
21+
public async Task Should_publish_notification()
22+
{
23+
CustomConfiguration = config => config.OnEndpointSubscribed<MyContext>((s, ctx) =>
24+
{
25+
if (s.SubscriberReturnAddress.IndexOf("ExternalProcessor", StringComparison.OrdinalIgnoreCase) >= 0)
26+
{
27+
ctx.ExternalProcessorSubscribed = true;
28+
}
29+
});
30+
31+
var context = await Define<MyContext>()
32+
.WithEndpoint<Receiver>(b => b.When(c => c.ExternalProcessorSubscribed, async bus =>
33+
{
34+
await bus.SendLocal<MyMessage>(m => m.MessageNumber = 1)
35+
.ConfigureAwait(false);
36+
}).DoNotFailOnErrorMessages())
37+
.WithEndpoint<ExternalProcessor>(b => b.When(async (bus, c) =>
38+
{
39+
await bus.Subscribe<MessageFailureResolvedByRetry>();
40+
41+
if (c.HasNativePubSubSupport)
42+
{
43+
c.ExternalProcessorSubscribed = true;
44+
}
45+
}))
46+
.Do("WaitUntilErrorsContainsFaildMessage", async ctx =>
47+
{
48+
if (ctx.FailedMessageId == null)
49+
{
50+
return false;
51+
}
52+
53+
FailedMessage failedMessage = await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}");
54+
if (failedMessage == null)
55+
{
56+
return false;
57+
}
58+
59+
return true;
60+
})
61+
.Do("Retry", async ctx =>
62+
{
63+
ctx.AboutToSendRetry = true;
64+
ctx.FailProcessing = false;
65+
await this.Post<object>($"/api/errors/{ctx.FailedMessageId}/retry");
66+
})
67+
.Do("EnsureRetried", async ctx =>
68+
{
69+
return await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}",
70+
e => e.Status == FailedMessageStatus.Resolved);
71+
})
72+
.Done(ctx => ctx.EventDelivered) //Done when sequence is finished
73+
.Run();
74+
75+
var deserializedEvent = JsonConvert.DeserializeObject<MessageFailureResolvedByRetry>(context.Event);
76+
Assert.IsTrue(deserializedEvent.FailedMessageId == context.FailedMessageId);
77+
}
78+
79+
public class Receiver : EndpointConfigurationBuilder
80+
{
81+
public Receiver()
82+
{
83+
EndpointSetup<DefaultServer>(c =>
84+
{
85+
c.NoDelayedRetries();
86+
c.ReportSuccessfulRetriesToServiceControl();
87+
});
88+
}
89+
90+
public class MyMessageHandler : IHandleMessages<MyMessage>
91+
{
92+
public MyContext Context { get; set; }
93+
public ReadOnlySettings Settings { get; set; }
94+
95+
public Task Handle(MyMessage message, IMessageHandlerContext context)
96+
{
97+
var messageId = context.MessageId.Replace(@"\", "-");
98+
99+
var uniqueMessageId = DeterministicGuid.MakeId(messageId, Settings.EndpointName()).ToString();
100+
101+
if (message.MessageNumber == 1)
102+
{
103+
Context.FailedMessageId = uniqueMessageId;
104+
}
105+
106+
if (Context.FailProcessing)
107+
{
108+
throw new Exception("Simulated exception");
109+
}
110+
111+
return Task.FromResult(0);
112+
}
113+
}
114+
}
115+
116+
public class ExternalProcessor : EndpointConfigurationBuilder
117+
{
118+
public ExternalProcessor()
119+
{
120+
EndpointSetup<DefaultServer>(c =>
121+
{
122+
var routing = c.ConfigureTransport().Routing();
123+
routing.RouteToEndpoint(typeof(MessageFailureResolvedByRetry).Assembly, Settings.DEFAULT_SERVICE_NAME);
124+
}, publisherMetadata => { publisherMetadata.RegisterPublisherFor<MessageFailureResolvedByRetry>(Settings.DEFAULT_SERVICE_NAME); });
125+
}
126+
127+
public class FailureHandler : IHandleMessages<MessageFailureResolvedByRetry>
128+
{
129+
public MyContext Context { get; set; }
130+
131+
public Task Handle(MessageFailureResolvedByRetry message, IMessageHandlerContext context)
132+
{
133+
var serializedMessage = JsonConvert.SerializeObject(message);
134+
Context.Event = serializedMessage;
135+
Context.EventDelivered = true;
136+
return Task.FromResult(0);
137+
}
138+
}
139+
}
140+
141+
public class MyMessage : ICommand
142+
{
143+
public int MessageNumber { get; set; }
144+
}
145+
146+
public class MyContext : ScenarioContext, ISequenceContext
147+
{
148+
public string FailedMessageId { get; set; }
149+
public string GroupId { get; set; }
150+
public bool FailProcessing { get; set; } = true;
151+
public int Step { get; set; }
152+
public bool ExternalProcessorSubscribed { get; set; }
153+
public string Event { get; set; }
154+
public bool EventDelivered { get; set; }
155+
public bool AboutToSendRetry { get; internal set; }
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)