diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java index 1f3bb0bffdb4..590fb39637fa 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java @@ -77,7 +77,7 @@ Mono updateDisposition(String lockToken, DispositionStatus dispositionStat @Override public void close() { if (!isDisposed.getAndSet(true)) { - linkProcessor.cancel(); + linkProcessor.dispose(); } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java index 97bd84a35420..e0925bdb2d30 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java @@ -11,7 +11,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor; -import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.AfterAll; @@ -24,11 +23,10 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.Disposable; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; import java.time.Duration; import java.util.UUID; @@ -45,10 +43,13 @@ */ class ServiceBusAsyncConsumerTest { private static final String LINK_NAME = "some-link"; - private final EmitterProcessor messageProcessor = EmitterProcessor.create(); - private final FluxSink messageProcessorSink = messageProcessor.sink(); - private final EmitterProcessor endpointProcessor = EmitterProcessor.create(); - private final FluxSink endpointProcessorSink = endpointProcessor.sink(); + private final TestPublisher linkPublisher = TestPublisher.create(); + private final Flux linkFlux = linkPublisher.flux(); + private final TestPublisher messagePublisher = TestPublisher.create(); + private final Flux messageFlux = messagePublisher.flux(); + private final TestPublisher endpointPublisher = TestPublisher.create(); + private final Flux endpointStateFlux = endpointPublisher.flux(); + private final ClientLogger logger = new ClientLogger(ServiceBusAsyncConsumer.class); private ServiceBusReceiveLinkProcessor linkProcessor; @@ -80,13 +81,10 @@ void setup(TestInfo testInfo) { MockitoAnnotations.initMocks(this); - when(link.getEndpointStates()).thenReturn(endpointProcessor); - when(link.receive()).thenReturn(messageProcessor); - linkProcessor = Flux.create(sink -> sink.onRequest(requested -> { - logger.info("Requested link: {}", requested); - sink.next(link); - })).subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy, parentConnection, - new AmqpErrorContext("a-namespace"))); + when(link.getEndpointStates()).thenReturn(endpointStateFlux); + when(link.receive()).thenReturn(messageFlux); + linkProcessor = linkFlux.subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy, + parentConnection, new AmqpErrorContext("a-namespace"))); when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); when(link.updateDisposition(anyString(), any(DeliveryState.class))).thenReturn(Mono.empty()); @@ -97,6 +95,11 @@ void teardown(TestInfo testInfo) { logger.info("[{}]: Tearing down.", testInfo.getDisplayName()); Mockito.framework().clearInlineMocks(); + + linkProcessor.dispose(); + linkPublisher.complete(); + endpointPublisher.complete(); + messagePublisher.complete(); } /** @@ -122,9 +125,13 @@ void receiveNoAutoComplete() { // Act and Assert StepVerifier.create(consumer.receive()) - .then(() -> messageProcessorSink.next(message1)) + .then(() -> { + linkPublisher.next(link); + endpointPublisher.next(AmqpEndpointState.ACTIVE); + messagePublisher.next(message1); + }) .expectNext(receivedMessage1) - .then(() -> messageProcessorSink.next(message2)) + .then(() -> messagePublisher.next(message2)) .expectNext(receivedMessage2) .thenCancel() .verify(); @@ -139,9 +146,6 @@ void receiveNoAutoComplete() { void canDispose() { // Arrange final String lockToken = UUID.randomUUID().toString(); - when(linkProcessor.updateDisposition(lockToken, Accepted.getInstance())) - .thenReturn(Mono.error(new IllegalArgumentException("Should not have called complete."))); - final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer); final Message message1 = mock(Message.class); @@ -153,11 +157,15 @@ void canDispose() { // Act and Assert StepVerifier.create(consumer.receive()) .then(() -> { - endpointProcessorSink.next(AmqpEndpointState.ACTIVE); - messageProcessorSink.next(message1); + linkPublisher.next(link); + endpointPublisher.next(AmqpEndpointState.ACTIVE); + messagePublisher.next(message1); }) .expectNext(receivedMessage1) - .then(() -> consumer.close()) + .then(() -> { + linkPublisher.complete(); + endpointPublisher.complete(); + }) .verifyComplete(); verify(link, never()).updateDisposition(anyString(), any(DeliveryState.class));