Skip to content

Commit 03d3bfd

Browse files
committed
Move QUIESCENT_TIMEOUT constant to ClientManager
Update tests to set setDisconnectCompletionTimeout and setQuiescentTimeout to 1L to reduce test runtime Rebase
1 parent 3d6c718 commit 03d3bfd

File tree

5 files changed

+27
-69
lines changed

5 files changed

+27
-69
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
3939
*/
4040
long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
4141

42+
Long QUIESCENT_TIMEOUT = 30_000L;
43+
4244
/**
4345
* The default disconnect completion timeout in milliseconds.
4446
*/

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@
6060
public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends MessageProducerSupport
6161
implements ApplicationEventPublisherAware, ClientManager.ConnectCallback {
6262

63-
public static final Long QUIESCENT_TIMEOUT = 30_000L;
64-
6563
protected final Lock topicLock = new ReentrantLock(); // NOSONAR
6664

6765
private final String url;
@@ -76,7 +74,7 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess
7674

7775
private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
7876

79-
private long quiescentTimeout = QUIESCENT_TIMEOUT;
77+
private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT;
8078

8179
private boolean manualAcks;
8280

@@ -206,15 +204,15 @@ protected long getDisconnectCompletionTimeout() {
206204

207205
/**
208206
* Set the quiescentTimeout timeout when disconnecting.
209-
* Default is 30,000 milliseconds.
207+
* Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds.
210208
* @param quiescentTimeout The timeout.
211209
* @since 7.0.0
212210
*/
213211
public void setQuiescentTimeout(long quiescentTimeout) {
214212
this.quiescentTimeout = quiescentTimeout;
215213
}
216214

217-
public long getQuiescentTimeout() {
215+
protected long getQuiescentTimeout() {
218216
return this.quiescentTimeout;
219217
}
220218

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ protected void doStop() {
298298

299299
}
300300
if (getClientManager() == null) {
301-
this.mqttClient.disconnectForcibly(QUIESCENT_TIMEOUT, getDisconnectCompletionTimeout(),
301+
this.mqttClient.disconnectForcibly(ClientManager.QUIESCENT_TIMEOUT, getDisconnectCompletionTimeout(),
302302
MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties());
303303
if (getConnectionInfo().isAutomaticReconnect()) {
304304
MqttUtils.stopClientReconnectCycle(this.mqttClient);

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -113,36 +113,7 @@ public void testSingleTopic() {
113113
MqttPahoMessageDrivenChannelAdapter inbound =
114114
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
115115
QueueChannel outputChannel = new QueueChannel();
116-
inbound.setOutputChannel(outputChannel);
117-
inbound.setTaskScheduler(taskScheduler);
118-
inbound.setBeanFactory(mock(BeanFactory.class));
119-
inbound.afterPropertiesSet();
120-
inbound.start();
121-
adapter.handleMessage(new GenericMessage<>("foo"));
122-
Message<?> out = outputChannel.receive(20000);
123-
assertThat(out).isNotNull();
124-
adapter.stop();
125-
inbound.stop();
126-
assertThat(out.getPayload()).isEqualTo("foo");
127-
assertThat(out.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)).isEqualTo("mqtt-foo");
128-
assertThat(adapter.getConnectionInfo().getServerURIs()[0]).isEqualTo(MosquittoContainerTest.mqttUrl());
129-
}
130-
131-
@Test
132-
void testSingleTopicWithQuiescentSet() {
133-
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler(MosquittoContainerTest.mqttUrl(), "si-test-out");
134-
adapter.setDefaultTopic("mqtt-foo");
135-
adapter.setBeanFactory(mock(BeanFactory.class));
136-
adapter.afterPropertiesSet();
137-
adapter.start();
138-
MqttPahoMessageDrivenChannelAdapter inbound =
139-
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
140-
QueueChannel outputChannel = new QueueChannel();
141-
inbound.setOutputChannel(outputChannel);
142-
inbound.setTaskScheduler(taskScheduler);
143-
inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT);
144-
inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
145-
inbound.setBeanFactory(mock(BeanFactory.class));
116+
initializeInboundAdapter(inbound, outputChannel);
146117
inbound.afterPropertiesSet();
147118
inbound.start();
148119
adapter.handleMessage(new GenericMessage<>("foo"));
@@ -179,9 +150,7 @@ private void testJsonCommon(String... trusted) {
179150
MqttPahoMessageDrivenChannelAdapter inbound =
180151
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
181152
QueueChannel outputChannel = new QueueChannel();
182-
inbound.setOutputChannel(outputChannel);
183-
inbound.setTaskScheduler(taskScheduler);
184-
inbound.setBeanFactory(mock(BeanFactory.class));
153+
initializeInboundAdapter(inbound, outputChannel);
185154
inbound.setConverter(converter);
186155
inbound.afterPropertiesSet();
187156
inbound.start();
@@ -210,9 +179,7 @@ public void testAddRemoveTopic() {
210179
MqttPahoMessageDrivenChannelAdapter inbound =
211180
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in");
212181
QueueChannel outputChannel = new QueueChannel();
213-
inbound.setOutputChannel(outputChannel);
214-
inbound.setTaskScheduler(taskScheduler);
215-
inbound.setBeanFactory(mock(BeanFactory.class));
182+
initializeInboundAdapter(inbound, outputChannel);
216183
inbound.afterPropertiesSet();
217184
inbound.start();
218185
inbound.addTopic("mqtt-foo");
@@ -258,9 +225,7 @@ public void testTwoTopics() {
258225
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
259226
"si-test-in", "mqtt-foo", "mqtt-bar");
260227
QueueChannel outputChannel = new QueueChannel();
261-
inbound.setOutputChannel(outputChannel);
262-
inbound.setTaskScheduler(taskScheduler);
263-
inbound.setBeanFactory(mock(BeanFactory.class));
228+
initializeInboundAdapter(inbound, outputChannel);
264229
inbound.afterPropertiesSet();
265230
inbound.start();
266231
adapter.handleMessage(new GenericMessage<>("foo"));
@@ -293,9 +258,7 @@ public void testAsync() throws Exception {
293258
MqttPahoMessageDrivenChannelAdapter inbound =
294259
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
295260
QueueChannel outputChannel = new QueueChannel();
296-
inbound.setOutputChannel(outputChannel);
297-
inbound.setTaskScheduler(taskScheduler);
298-
inbound.setBeanFactory(mock(BeanFactory.class));
261+
initializeInboundAdapter(inbound, outputChannel);
299262
inbound.afterPropertiesSet();
300263
inbound.start();
301264
GenericMessage<String> message = new GenericMessage<>("foo");
@@ -331,9 +294,7 @@ public void testAsyncPersisted() throws Exception {
331294
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
332295
"si-test-in", "mqtt-foo", "mqtt-bar");
333296
QueueChannel outputChannel = new QueueChannel();
334-
inbound.setOutputChannel(outputChannel);
335-
inbound.setTaskScheduler(taskScheduler);
336-
inbound.setBeanFactory(mock(BeanFactory.class));
297+
initializeInboundAdapter(inbound, outputChannel);
337298
inbound.afterPropertiesSet();
338299
inbound.start();
339300
Message<String> message1 = new GenericMessage<>("foo");
@@ -428,6 +389,14 @@ public void onApplicationEvent(MqttSubscribedEvent event) {
428389

429390
}
430391

392+
private void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) {
393+
inbound.setOutputChannel(outputChannel);
394+
inbound.setTaskScheduler(taskScheduler);
395+
inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT);
396+
inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
397+
inbound.setBeanFactory(mock(BeanFactory.class));
398+
}
399+
431400
private class EventPublisher implements ApplicationEventPublisher {
432401

433402
private volatile MqttMessageDeliveredEvent delivered;

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
@DirtiesContext
6767
public class Mqttv5BackToBackTests implements MosquittoContainerTest {
6868

69+
private static final long QUIESCENT_TIMEOUT = 1;
70+
71+
private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L;
72+
6973
@Autowired
7074
@Qualifier("mqttOutFlow.input")
7175
private MessageChannel mqttOutFlowInput;
@@ -95,7 +99,7 @@ public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() {
9599

96100
@Test
97101
public void testSimpleMqttv5Interaction() {
98-
String testPayload = "foo";
102+
String testPayload = "datakey";
99103

100104
this.mqttOutFlowInput.send(
101105
MessageBuilder.withPayload(testPayload)
@@ -154,23 +158,6 @@ public void testSharedTopicMqttv5Interaction() {
154158
assertThat(receive.getPayload()).isEqualTo(testPayload);
155159
}
156160

157-
@Test
158-
void testSharedTopicMqttv5InteractionQuiescentTimeout() {
159-
this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopicq");
160-
this.mqttv5MessageDrivenChannelAdapter.setQuiescentTimeout(2000);
161-
this.mqttv5MessageDrivenChannelAdapter.setDisconnectCompletionTimeout(2000);
162-
String testPayload = "shared topic payload";
163-
this.mqttOutFlowInput.send(
164-
MessageBuilder.withPayload(testPayload)
165-
.setHeader(MqttHeaders.TOPIC, "testTopicq")
166-
.build());
167-
168-
Message<?> receive = this.fromMqttChannel.receive(10_000);
169-
170-
assertThat(receive).isNotNull();
171-
assertThat(receive.getPayload()).isEqualTo(testPayload);
172-
}
173-
174161
@Configuration
175162
@EnableIntegration
176163
public static class Config {
@@ -231,6 +218,8 @@ public IntegrationFlow mqttInFlow() {
231218
new Mqttv5PahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "mqttv5SIin",
232219
mqttSubscription);
233220
messageProducer.setPayloadType(String.class);
221+
messageProducer.setQuiescentTimeout(QUIESCENT_TIMEOUT);
222+
messageProducer.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
234223
messageProducer.setMessageConverter(mqttStringToBytesConverter());
235224
messageProducer.setManualAcks(true);
236225

0 commit comments

Comments
 (0)