diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryUtils.java b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryUtils.java index c654f52b39e9..159a163b70a4 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryUtils.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryUtils.java @@ -43,7 +43,7 @@ static void addAttribute(io.opentelemetry.api.common.AttributesBuilder attribute } else if (value instanceof Byte) { attributesBuilder.put(AttributeKey.longKey(key), (Byte) value); } else { - LOGGER.warning("Could not populate attribute with key '{}', type {} is not supported.", key, value.getClass().getName()); + LOGGER.warning("Could not populate attribute with key '{}', type '{}' is not supported.", key, value.getClass().getName()); } } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index f243c8407004..ff411065fbf1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -4,6 +4,9 @@ ### Features Added +- Added `sessionIdleTimeout` method to configure session idle timeout on `ServiceBusSessionProcessorClientBuilder`. After this time has elapsed, + the processor will close the session and attempt to process another session. ([#34700](https://github.com/Azure/azure-sdk-for-java/issues/34700)) + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 18dfa30af23e..d3817031be31 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -45,6 +45,7 @@ --add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED --add-reads com.azure.messaging.servicebus=com.azure.http.netty + --add-reads com.azure.messaging.servicebus=com.azure.core.tracing.opentelemetry diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java index fd5d64d51ca3..eb1d95df02cb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java @@ -12,27 +12,39 @@ * * @see ServiceBusReceiverAsyncClient */ -class ReceiverOptions { +final class ReceiverOptions { private final ServiceBusReceiveMode receiveMode; private final int prefetchCount; private final boolean enableAutoComplete; private final String sessionId; private final Integer maxConcurrentSessions; private final Duration maxLockRenewDuration; + private final Duration sessionIdleTimeout; - ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, + static ReceiverOptions createNonSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, boolean enableAutoComplete) { - this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null); + return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null, null); } - ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, - boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) { + static ReceiverOptions createNamedSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, + boolean enableAutoComplete, String sessionId) { + return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, sessionId, null, null); + } + + static ReceiverOptions createUnnamedSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, + boolean enableAutoComplete, Integer maxConcurrentSessions, Duration sessionIdleTimeout) { + return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions, sessionIdleTimeout); + } + + private ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, + boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions, Duration sessionIdleTimeout) { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; this.enableAutoComplete = enableAutoComplete; this.sessionId = sessionId; this.maxConcurrentSessions = maxConcurrentSessions; this.maxLockRenewDuration = maxLockRenewDuration; + this.sessionIdleTimeout = sessionIdleTimeout; } /** @@ -43,6 +55,7 @@ class ReceiverOptions { Duration getMaxLockRenewDuration() { return maxLockRenewDuration; } + /** * Gets the receive mode for the message. * @@ -98,6 +111,15 @@ public Integer getMaxConcurrentSessions() { return maxConcurrentSessions; } + /** + * Gets the {@code sessionIdleTimeout} to roll to another session if no messages wew be received. + * + * @return the session idle timeout. + */ + Duration getSessionIdleTimeout() { + return sessionIdleTimeout; + } + public boolean isEnableAutoComplete() { return enableAutoComplete; } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 076799ac14cf..b74f8cbee451 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -62,9 +62,10 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.regex.Pattern; import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; +import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions; +import static com.azure.messaging.servicebus.ReceiverOptions.createUnnamedSessionOptions; import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; /** @@ -208,7 +209,6 @@ public final class ServiceBusClientBuilder implements private static final String UNKNOWN = "UNKNOWN"; private static final String LIBRARY_NAME; private static final String LIBRARY_VERSION; - private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+"); private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5); private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class); private final Object connectionLock = new Object(); @@ -1041,6 +1041,21 @@ public ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration return this; } + /** + * Sets the maximum amount of time to wait for a message to be received for the currently active session. + * After this time has elapsed, the processor will close the session and attempt to process another session. + * If not specified, the {@link AmqpRetryOptions#getTryTimeout()} will be used. + * + * @param sessionIdleTimeout Session idle timeout. + * @return The updated {@link ServiceBusSessionProcessorClientBuilder} object. + * @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative. + */ + public ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout) { + validateAndThrow(sessionIdleTimeout); + sessionReceiverClientBuilder.sessionIdleTimeout(sessionIdleTimeout); + return this; + } + /** * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}. * @@ -1235,6 +1250,7 @@ public final class ServiceBusSessionReceiverClientBuilder { private String subscriptionName; private String topicName; private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION; + private Duration sessionIdleTimeout = null; private SubQueue subQueue = SubQueue.NONE; private ServiceBusSessionReceiverClientBuilder() { @@ -1270,6 +1286,21 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration return this; } + /** + * Sets the maximum amount of time to wait for a message to be received for the currently active session. + * After this time has elapsed, the processor will close the session and attempt to process another session. + * If not specified, the {@link AmqpRetryOptions#getTryTimeout()} will be used. + * + * @param sessionIdleTimeout Session idle timeout. + * @return The updated {@link ServiceBusSessionReceiverClientBuilder} object. + * @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative. + */ + ServiceBusSessionReceiverClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout) { + validateAndThrow(sessionIdleTimeout); + this.sessionIdleTimeout = sessionIdleTimeout; + return this; + } + /** * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}. * @@ -1403,9 +1434,9 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() { } final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); - final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, - maxAutoLockRenewDuration, enableAutoComplete, null, - maxConcurrentSessions); + + final ReceiverOptions receiverOptions = createUnnamedSessionOptions(receiveMode, prefetchCount, + maxAutoLockRenewDuration, enableAutoComplete, maxConcurrentSessions, sessionIdleTimeout); final String clientIdentifier; if (clientOptions instanceof AmqpClientOptions) { @@ -1484,8 +1515,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp } final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); - final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, - maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions); + final ReceiverOptions receiverOptions = createUnnamedSessionOptions(receiveMode, prefetchCount, + maxAutoLockRenewDuration, enableAutoComplete, maxConcurrentSessions, sessionIdleTimeout); final String clientIdentifier; if (clientOptions instanceof AmqpClientOptions) { @@ -1792,7 +1823,6 @@ public final class ServiceBusReceiverClientBuilder { private String subscriptionName; private String topicName; private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION; - private ServiceBusReceiverClientBuilder() { } @@ -1967,7 +1997,7 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo } final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); - final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, + final ReceiverOptions receiverOptions = createNonSessionOptions(receiveMode, prefetchCount, maxAutoLockRenewDuration, enableAutoComplete); final String clientIdentifier; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 5ef4a55769e0..3457e7dcdb53 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -68,6 +68,7 @@ class ServiceBusSessionManager implements AutoCloseable { private final List schedulers; private final Deque availableSchedulers = new ConcurrentLinkedDeque<>(); private final Duration maxSessionLockRenewDuration; + private final Duration sessionIdleTimeout; /** * SessionId to receiver mapping. @@ -107,6 +108,9 @@ class ServiceBusSessionManager implements AutoCloseable { this.processor = EmitterProcessor.create(numberOfSchedulers, false); this.sessionReceiveSink = processor.sink(); this.receiveLink = receiveLink; + this.sessionIdleTimeout = receiverOptions.getSessionIdleTimeout() != null + ? receiverOptions.getSessionIdleTimeout() + : connectionProcessor.getRetryOptions().getTryTimeout(); } ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, @@ -348,8 +352,8 @@ private Flux getSession(Scheduler scheduler, boolean d } return new ServiceBusSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(), - receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, this::renewSessionLock, - maxSessionLockRenewDuration); + receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock, + maxSessionLockRenewDuration, disposeOnIdle ? sessionIdleTimeout : null); }))) .flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> { LOGGER.atVerbose() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index 2ba326ad0352..e0076403cd2a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -64,16 +64,16 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { * @param messageSerializer Serializes and deserializes messages from Service Bus. * @param retryOptions Retry options for the receiver. * @param prefetch Number of messages to prefetch from session. - * @param disposeOnIdle true to dispose the session receiver if there are no more messages and the receiver is - * idle. * @param scheduler The scheduler to publish messages on. * @param renewSessionLock Function to renew the session lock. * @param maxSessionLockRenewDuration Maximum time to renew the session lock for. {@code null} or {@link * Duration#ZERO} to disable session lock renewal. + * @param sessionIdleTimeout Timeout after which session receiver will be disposed if there are no more messages + * and the receiver is idle. Set it to {@code null} to not dispose receiver. */ ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer, - AmqpRetryOptions retryOptions, int prefetch, boolean disposeOnIdle, Scheduler scheduler, - Function> renewSessionLock, Duration maxSessionLockRenewDuration) { + AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler, + Function> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) { this.receiveLink = receiveLink; this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT); @@ -146,12 +146,12 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { // Creates a subscription that disposes/closes the receiver when there are no more messages in the session and // receiver is idle. - if (disposeOnIdle) { + if (sessionIdleTimeout != null) { this.subscriptions.add(Flux.switchOnNext(messageReceivedEmitter - .map((String lockToken) -> Mono.delay(this.retryOptions.getTryTimeout()))) + .map((String lockToken) -> Mono.delay(sessionIdleTimeout))) .subscribe(item -> { withReceiveLinkInformation(LOGGER.atInfo()) - .addKeyValue("timeout", retryOptions.getTryTimeout()) + .addKeyValue("timeout", sessionIdleTimeout) .log("Did not a receive message within timeout."); cancelReceiveProcessor.onComplete(); })); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java index f3741eff69b1..ca433c92e72e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java @@ -21,6 +21,7 @@ import java.util.Objects; import static com.azure.core.util.FluxUtil.monoError; +import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions; /** * This asynchronous session receiver client is used to acquire session locks from a queue or topic and create @@ -136,9 +137,9 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable public Mono acceptNextSession() { return tracer.traceMono("ServiceBus.acceptNextSession", unNamedSessionManager.getActiveLink().flatMap(receiveLink -> receiveLink.getSessionId() .map(sessionId -> { - final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(), + final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(), receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(), - receiverOptions.isEnableAutoComplete(), sessionId, null); + receiverOptions.isEnableAutoComplete(), sessionId); final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor, messageSerializer, newReceiverOptions, receiveLink, identifier); @@ -172,9 +173,9 @@ public Mono acceptSession(String sessionId) { return monoError(LOGGER, new IllegalArgumentException("'sessionId' cannot be empty")); } - final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(), + final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(), receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(), - receiverOptions.isEnableAutoComplete(), sessionId, null); + receiverOptions.isEnableAutoComplete(), sessionId); final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor, messageSerializer, newReceiverOptions, identifier); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java index 30a289bea388..1524fa5de82a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java @@ -264,7 +264,7 @@ private Throwable mapError(Throwable throwable) { */ @Override public Mono renewMessageLock(String lockToken, String associatedLinkName) { - return isAuthorized(OPERATION_PEEK).then(createChannel.flatMap(channel -> { + return isAuthorized(ManagementConstants.OPERATION_RENEW_LOCK).then(createChannel.flatMap(channel -> { final Message requestMessage = createManagementMessage(ManagementConstants.OPERATION_RENEW_LOCK, associatedLinkName); final Map requestBody = new HashMap<>(); @@ -428,7 +428,7 @@ public Mono updateDisposition(String lockToken, DispositionStatus disposit final UUID[] lockTokens = new UUID[]{UUID.fromString(lockToken)}; return isAuthorized(OPERATION_UPDATE_DISPOSITION).then(createChannel.flatMap(channel -> { logger.atVerbose() - .addKeyValue("lockTokens", Arrays.toString(lockTokens)) + .addKeyValue("lockTokens", () -> Arrays.toString(lockTokens)) .addKeyValue(DISPOSITION_STATUS_KEY, dispositionStatus) .addKeyValue(SESSION_ID_KEY, sessionId) .log("Update disposition of deliveries."); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index ef7a7dba81d2..de72fa9bc8d3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -64,8 +64,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic private final Mono sessionLockedUntil; public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, Receiver receiver, - ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, Duration timeout, - AmqpRetryPolicy retryPolicy) { + ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, AmqpRetryPolicy retryPolicy) { super(connection, entityPath, receiver, handler, tokenManager, provider.getReactorDispatcher(), retryPolicy.getRetryOptions()); this.receiver = receiver; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java index dc7833f1b4c9..7d86f07c7312 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java @@ -167,7 +167,7 @@ protected Mono createProducer(String linkName, String entityPath, Dura protected ReactorReceiver createConsumer(String entityPath, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) { return new ServiceBusReactorReceiver(amqpConnection, entityPath, receiver, receiveLinkHandler, tokenManager, - reactorProvider, retryOptions.getTryTimeout(), retryPolicy); + reactorProvider, retryPolicy); } private Mono createConsumer(String linkName, String entityPath, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/FluxAutoLockRenewTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/FluxAutoLockRenewTest.java index ac4d6d7373da..a241ff989565 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/FluxAutoLockRenewTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/FluxAutoLockRenewTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -59,7 +60,6 @@ public class FluxAutoLockRenewTest { private Function> renewalFunction; private OffsetDateTime lockedUntil; - private AutoCloseable mocksCloseable; private ReceiverOptions defaultReceiverOptions; @BeforeAll @@ -78,7 +78,7 @@ public void setup() { receivedMessage.setLockToken(LOCK_TOKEN_UUID); receivedMessage.setLockedUntil(lockedUntil); renewalFunction = (lockToken) -> Mono.just(OffsetDateTime.now().plusSeconds(10)); - defaultReceiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, + defaultReceiverOptions = createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, MAX_AUTO_LOCK_RENEW_DURATION, true); } @@ -144,7 +144,7 @@ public void illegalValueConstructor() { assertThrows(NullPointerException.class, () -> new FluxAutoLockRenew(messageSource, defaultReceiverOptions, messageLockContainer, null)); - ReceiverOptions zeroLockDurationOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, + ReceiverOptions zeroLockDurationOptions = createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, DISABLE_AUTO_LOCK_RENEW_DURATION, true); assertThrows(IllegalArgumentException.class, () -> new FluxAutoLockRenew(messageSource, zeroLockDurationOptions, messageLockContainer, renewalFunction)); @@ -466,6 +466,7 @@ public void contextPropagationTest() { * When auto complete is disabled by user, we do not perform message lock clean up. */ @Test + //@RepeatedTest(1000) void autoCompleteDisabledLockRenewNotClosed() { // Arrange final TestContainer messageLockContainer = new TestContainer(); @@ -478,7 +479,7 @@ void autoCompleteDisabledLockRenewNotClosed() { actualTokenRenewCalledTimes.getAndIncrement(); return Mono.just(OffsetDateTime.now().plusSeconds(1)); }; - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, + ReceiverOptions receiverOptions = createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, MAX_AUTO_LOCK_RENEW_DURATION, enableAutoComplete); final FluxAutoLockRenew renewOperator = new FluxAutoLockRenew(messageSource, receiverOptions, messageLockContainer, lockTokenRenewFunction); @@ -502,7 +503,8 @@ void autoCompleteDisabledLockRenewNotClosed() { .verifyComplete(); assertEquals(1, messageLockContainer.addOrUpdateInvocations.get(LOCK_TOKEN_STRING)); - assertTrue(actualTokenRenewCalledTimes.get() >= renewedForAtLeast); + assertTrue(actualTokenRenewCalledTimes.get() >= renewedForAtLeast, + String.format("expected at least %s, but got %s", renewedForAtLeast, actualTokenRenewCalledTimes.get())); // ensure that we do not remove lockToken from 'messageLockContainer' because user can do it at their will since // enableAutoComplete = false diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java index e00ced155ef9..d6c5f959b623 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java @@ -10,7 +10,9 @@ import com.azure.core.amqp.models.AmqpMessageBody; import com.azure.core.test.TestBase; import com.azure.core.test.TestMode; +import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions; import com.azure.core.util.AsyncCloseable; +import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.CoreUtils; import com.azure.core.util.IterableStream; @@ -24,6 +26,7 @@ import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.trace.SdkTracerProvider; import org.junit.jupiter.api.AfterAll; @@ -66,7 +69,7 @@ public abstract class IntegrationTestBase extends TestBase { protected static final Duration TIMEOUT = Duration.ofSeconds(60); protected static final AmqpRetryOptions RETRY_OPTIONS = new AmqpRetryOptions().setTryTimeout(TIMEOUT); protected final ClientLogger logger; - + protected ClientOptions optionsWithTracing; private static final String PROXY_AUTHENTICATION_TYPE = "PROXY_AUTHENTICATION_TYPE"; private static final Configuration GLOBAL_CONFIGURATION = TestUtils.getGlobalConfiguration(); private List toClose = new ArrayList<>(); @@ -84,7 +87,6 @@ protected IntegrationTestBase(ClientLogger logger) { @BeforeEach public void setupTest(TestInfo testInfo) { - GlobalOpenTelemetry.resetForTest(); Method testMethod = testInfo.getTestMethod().orElseGet(null); testName = String.format("%s-%s", testMethod == null ? "unknown" : testMethod.getName(), @@ -96,10 +98,12 @@ public void setupTest(TestInfo testInfo) { StepVerifier.setDefaultTimeout(TIMEOUT); toClose = new ArrayList<>(); - OpenTelemetrySdk.builder() + GlobalOpenTelemetry.resetForTest(); + OpenTelemetry otel = OpenTelemetrySdk.builder() .setTracerProvider( SdkTracerProvider.builder().addSpanProcessor(new LoggingSpanProcessor(logger)).build()) - .buildAndRegisterGlobal(); + .build(); + optionsWithTracing = new ClientOptions().setTracingOptions(new OpenTelemetryTracingOptions().setProvider(otel.getTracerProvider())); beforeTest(); } @@ -238,6 +242,7 @@ protected ServiceBusClientBuilder getBuilder(boolean useCredentials) { final ServiceBusClientBuilder builder = new ServiceBusClientBuilder() .proxyOptions(ProxyOptions.SYSTEM_DEFAULTS) .retryOptions(RETRY_OPTIONS) + .clientOptions(optionsWithTracing) .transportType(AmqpTransportType.AMQP) .scheduler(scheduler); @@ -304,8 +309,7 @@ protected ServiceBusReceiverClientBuilder getReceiverBuilder(boolean useCredenti } protected ServiceBusSessionReceiverClientBuilder getSessionReceiverBuilder(boolean useCredentials, - MessagingEntityType entityType, int entityIndex, - boolean sharedConnection) { + MessagingEntityType entityType, int entityIndex, boolean sharedConnection, AmqpRetryOptions retryOptions) { ServiceBusClientBuilder builder = getBuilder(useCredentials, sharedConnection); @@ -314,6 +318,7 @@ protected ServiceBusSessionReceiverClientBuilder getSessionReceiverBuilder(boole final String queueName = getSessionQueueName(entityIndex); assertNotNull(queueName, "'queueName' cannot be null."); return builder + .retryOptions(retryOptions) .sessionReceiver() .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .queueName(queueName); @@ -323,7 +328,9 @@ protected ServiceBusSessionReceiverClientBuilder getSessionReceiverBuilder(boole final String subscriptionName = getSessionSubscriptionBaseName(); assertNotNull(topicName, "'topicName' cannot be null."); assertNotNull(subscriptionName, "'subscriptionName' cannot be null."); - return builder.sessionReceiver() + return builder + .retryOptions(retryOptions) + .sessionReceiver() .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .topicName(topicName).subscriptionName(subscriptionName); default: @@ -471,7 +478,7 @@ protected void assertMessageEquals(ServiceBusReceivedMessage message, String mes } } - private ServiceBusClientBuilder getBuilder(boolean useCredentials, boolean sharedConnection) { + protected ServiceBusClientBuilder getBuilder(boolean useCredentials, boolean sharedConnection) { ServiceBusClientBuilder builder; if (sharedConnection && sharedBuilder == null) { sharedBuilder = getBuilder(useCredentials); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java index a4d94e1f140f..9bd6eab57ee9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.function.Function; +import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -117,8 +118,8 @@ void receiveNoAutoComplete() { final int prefetch = 10; final Duration maxAutoLockRenewDuration = Duration.ofSeconds(0); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, - maxAutoLockRenewDuration, false, "sessionId", null); + final ReceiverOptions receiverOptions = createNamedSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, + maxAutoLockRenewDuration, false, "sessionId"); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, receiverOptions); @@ -164,8 +165,8 @@ void canDispose() { final Duration maxAutoLockRenewDuration = Duration.ofSeconds(40); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, - maxAutoLockRenewDuration, false, "sessionId", null); + final ReceiverOptions receiverOptions = createNamedSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, + maxAutoLockRenewDuration, false, "sessionId"); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, receiverOptions); @@ -204,8 +205,8 @@ void onError() { final Duration maxAutoLockRenewDuration = Duration.ofSeconds(40); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, - maxAutoLockRenewDuration, false, "sessionId", null); + final ReceiverOptions receiverOptions = createNamedSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, + maxAutoLockRenewDuration, false, "sessionId"); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, receiverOptions); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java index a6fe5339426f..38b98d84db9b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java @@ -7,6 +7,8 @@ import com.azure.core.credential.TokenCredential; import org.junit.jupiter.api.Test; +import java.time.Duration; + import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -51,39 +53,103 @@ public void testThrowsWithTokenCredentialIfFullyQualifiedNameIsMissing() { .processMessage(x -> { }) .processError(x -> { }) .buildProcessorClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .sessionReceiver() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildAsyncClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .processor() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .processMessage(x -> { }) + .processError(x -> { }) + .buildProcessorClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .sessionReceiver() + .sessionIdleTimeout(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildAsyncClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .processor() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .processMessage(x -> { }) + .processError(x -> { }) + .buildProcessorClient()); + } + + @Test + public void testThrowsIfNegativeMaxLockDuration() { + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .sessionReceiver() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildAsyncClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .receiver() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .processor() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .processMessage(x -> { }) + .processError(x -> { }) + .buildProcessorClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .sessionProcessor() + .maxAutoLockRenewDuration(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildProcessorClient()); + } + + @Test + public void testThrowsIfNegativeSessionIdle() { + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .sessionReceiver() + .sessionIdleTimeout(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildAsyncClient()); + assertThrows(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder() + .sessionProcessor() + .sessionIdleTimeout(Duration.ofSeconds(-1)) + .queueName("fakequeue") + .buildProcessorClient()); } @Test public void testBuildsWithTokenCredentialIfFullyQualifiedNameIsProvided() { - new ServiceBusClientBuilder() - .credential(FAKE_TOKEN_CREDENTIAL) - .fullyQualifiedNamespace(NAMESPACE_NAME) + createMinimalValidClientBuilder() .sender() .queueName("fakequeue") .buildClient(); - new ServiceBusClientBuilder() - .credential(FAKE_TOKEN_CREDENTIAL) - .fullyQualifiedNamespace(NAMESPACE_NAME) + createMinimalValidClientBuilder() .receiver() .queueName("fakequeue") .buildClient(); - new ServiceBusClientBuilder() - .credential(FAKE_TOKEN_CREDENTIAL) - .fullyQualifiedNamespace(NAMESPACE_NAME) + createMinimalValidClientBuilder() .sessionProcessor() .queueName("fakequeue") .processMessage(x -> { }) .processError(x -> { }) .buildProcessorClient(); - new ServiceBusClientBuilder() - .credential(FAKE_TOKEN_CREDENTIAL) - .fullyQualifiedNamespace(NAMESPACE_NAME) + createMinimalValidClientBuilder() .sessionReceiver() .queueName("fakequeue") .buildClient(); - new ServiceBusClientBuilder() - .credential(FAKE_TOKEN_CREDENTIAL) - .fullyQualifiedNamespace(NAMESPACE_NAME) + createMinimalValidClientBuilder() .processor() .queueName("fakequeue") .processMessage(x -> { }) @@ -91,6 +157,46 @@ public void testBuildsWithTokenCredentialIfFullyQualifiedNameIsProvided() { .buildProcessorClient(); } + @Test + public void testBuildsWithSessionIdle() { + createMinimalValidClientBuilder() + .sessionReceiver() + .sessionIdleTimeout(null) + .queueName("fakequeue") + .buildAsyncClient(); + createMinimalValidClientBuilder() + .sessionProcessor() + .sessionIdleTimeout(null) + .queueName("fakequeue") + .processMessage(x -> { }) + .processError(x -> { }) + .buildProcessorClient(); + createMinimalValidClientBuilder() + .sessionReceiver() + .sessionIdleTimeout(Duration.ZERO) + .queueName("fakequeue") + .buildAsyncClient(); + createMinimalValidClientBuilder() + .sessionProcessor() + .sessionIdleTimeout(Duration.ZERO) + .queueName("fakequeue") + .processMessage(x -> { }) + .processError(x -> { }) + .buildProcessorClient(); + createMinimalValidClientBuilder() + .sessionReceiver() + .sessionIdleTimeout(Duration.ofSeconds(1)) + .queueName("fakequeue") + .buildAsyncClient(); + createMinimalValidClientBuilder() + .sessionProcessor() + .sessionIdleTimeout(Duration.ofSeconds(1)) + .queueName("fakequeue") + .processMessage(x -> { }) + .processError(x -> { }) + .buildProcessorClient(); + } + @Test public void testEntityNameInConnectionString() { // Arrange @@ -105,4 +211,10 @@ public void testEntityNameInConnectionString() { // Assert assertNotNull(builder.buildAsyncClient()); } + + private static ServiceBusClientBuilder createMinimalValidClientBuilder() { + return new ServiceBusClientBuilder() + .credential(FAKE_TOKEN_CREDENTIAL) + .fullyQualifiedNamespace(NAMESPACE_NAME); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorClientIntegrationTest.java index e3aed1557908..4248933ce027 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorClientIntegrationTest.java @@ -22,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.azure.messaging.servicebus.TestUtils.getServiceBusMessage; import static com.azure.messaging.servicebus.TestUtils.getSessionSubscriptionBaseName; import static com.azure.messaging.servicebus.TestUtils.getSubscriptionBaseName; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests for {@link ServiceBusProcessorClient}. @@ -99,13 +101,73 @@ void receiveMessage(MessagingEntityType entityType, boolean isSessionEnabled) th // Assert & Act processor.start(); - toClose(processor); toClose((AutoCloseable) () -> processor.stop()); assertTrue(countDownLatch.await(lockTimeoutDurationSeconds * 6, TimeUnit.SECONDS), "Message not arrived, closing processor."); LOGGER.info("Message lock has been renewed. Now closing processor"); } + @ParameterizedTest + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + void rollingSessionOnIdleTimeout(MessagingEntityType entityType) throws InterruptedException { + final int entityIndex = TestUtils.USE_CASE_MULTIPLE_SESSIONS1; + final Duration sessionIdleTimeout = Duration.ofSeconds(3); + + ServiceBusSenderAsyncClient sender = createSender(entityType, entityIndex, true); + + ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processorBuilder = + getSessionProcessorBuilder(false, entityType, entityIndex, false, RETRY_OPTIONS) + .sessionIdleTimeout(sessionIdleTimeout) + .disableAutoComplete(); + + rollingSessionTest(sender, processorBuilder); + } + + @ParameterizedTest + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + void rollingSessionOnTryTimeout(MessagingEntityType entityType) throws InterruptedException { + final int entityIndex = TestUtils.USE_CASE_MULTIPLE_SESSIONS1; + final Duration tryTimeout = Duration.ofSeconds(3); + + ServiceBusSenderAsyncClient sender = createSender(entityType, entityIndex, true); + ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processorBuilder = + getSessionProcessorBuilder(false, entityType, entityIndex, false, + new AmqpRetryOptions().setTryTimeout(tryTimeout)) + .disableAutoComplete(); + + rollingSessionTest(sender, processorBuilder); + } + + void rollingSessionTest(ServiceBusSenderAsyncClient sender, ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processorBuilder) throws InterruptedException { + final String contents = "Some-contents"; + final String randomPrefix = UUID.randomUUID().toString(); + ServiceBusMessage message0 = getServiceBusMessage(contents, randomPrefix + "0").setSessionId(randomPrefix + "0"); + ServiceBusMessage message1 = getServiceBusMessage(contents, randomPrefix + "1").setSessionId(randomPrefix + "1"); + + CountDownLatch latch = new CountDownLatch(2); + ServiceBusProcessorClient processor = toClose(processorBuilder + .processMessage(context -> { + ServiceBusReceivedMessage received = context.getMessage(); + context.complete(); + + if (received.getMessageId().startsWith(randomPrefix)) { + latch.countDown(); + if (message0.getMessageId().equals(received.getMessageId())) { + sendMessage(sender, message1).block(); + } + } + }) + .processError(context -> fail(context.getException())) + .buildProcessorClient()); + + processor.start(); + sendMessage(sender, message0).block(); + + toClose((AutoCloseable) () -> processor.stop()); + + assertTrue(latch.await(20, TimeUnit.SECONDS), "Messages did not arrived, closing processor."); + } + private void processMessage(ServiceBusReceivedMessageContext context, CountDownLatch countDownLatch, String expectedMessageId, AtomicReference lastMessageReceivedTime, int lockTimeoutDurationSeconds) { ServiceBusReceivedMessage message = context.getMessage(); @@ -135,7 +197,7 @@ private void processError(ServiceBusErrorContext context, CountDownLatch countdo context.getFullyQualifiedNamespace(), context.getEntityPath()); } - protected ServiceBusClientBuilder.ServiceBusProcessorClientBuilder getProcessorBuilder(boolean useCredentials, + private ServiceBusClientBuilder.ServiceBusProcessorClientBuilder getProcessorBuilder(boolean useCredentials, MessagingEntityType entityType, int entityIndex, boolean sharedConnection) { ServiceBusClientBuilder builder = getBuilder(useCredentials, sharedConnection); @@ -183,14 +245,7 @@ protected ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder getSes } } - private ServiceBusSenderAsyncClient createSender(MessagingEntityType entityType, int entityIndex, boolean isSessionEnabled) { - final boolean shareConnection = false; - final boolean useCredentials = false; - return toClose(getSenderBuilder(useCredentials, entityType, entityIndex, isSessionEnabled, shareConnection) - .buildAsyncClient()); - } - - private ServiceBusClientBuilder getBuilder(boolean useCredentials, boolean sharedConnection) { + protected ServiceBusClientBuilder getBuilder(boolean useCredentials, boolean sharedConnection) { return new ServiceBusClientBuilder() .connectionString(getConnectionString()) .proxyOptions(ProxyOptions.SYSTEM_DEFAULTS) @@ -199,6 +254,13 @@ private ServiceBusClientBuilder getBuilder(boolean useCredentials, boolean share .scheduler(scheduler); } + private ServiceBusSenderAsyncClient createSender(MessagingEntityType entityType, int entityIndex, boolean isSessionEnabled) { + final boolean shareConnection = false; + final boolean useCredentials = false; + return toClose(getSenderBuilder(useCredentials, entityType, entityIndex, isSessionEnabled, shareConnection) + .buildAsyncClient()); + } + private Mono sendMessage(ServiceBusSenderAsyncClient sender, ServiceBusMessage message) { return sender.sendMessage(message).doOnSuccess(aVoid -> { logMessage(message, sender.getEntityPath(), "sent"); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index 93be61c3ce37..16853c38ed0a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -3,6 +3,7 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.models.AmqpAddress; import com.azure.core.amqp.models.AmqpAnnotatedMessage; import com.azure.core.amqp.models.AmqpMessageBody; @@ -75,6 +76,7 @@ @Execution(ExecutionMode.SAME_THREAD) class ServiceBusReceiverAsyncClientIntegrationTest extends IntegrationTestBase { private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class); + private static final AmqpRetryOptions DEFAULT_RETRY_OPTIONS = null; private final boolean isSessionEnabled = false; private final ClientCreationOptions defaultClientCreationOptions = new ClientCreationOptions() .setMaxAutoLockRenewDuration(Duration.ofMinutes(5)); @@ -289,7 +291,7 @@ void receiveTwoMessagesAutoComplete(MessagingEntityType entityType, boolean isSe // Now create receiver if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - this.sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) + this.sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection, DEFAULT_RETRY_OPTIONS) .buildAsyncClient()); this.receiver = toClose(sessionReceiver.acceptSession(sessionId).block()); } else { @@ -329,7 +331,7 @@ void receiveMessageAutoComplete(MessagingEntityType entityType, boolean isSessio // Now create receiver if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - this.sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) + this.sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection, DEFAULT_RETRY_OPTIONS) .buildAsyncClient()); this.receiver = toClose(this.sessionReceiver.acceptSession(sessionId).block()); } else { @@ -709,7 +711,7 @@ void receiveMessageAmqpTypes(MessagingEntityType entityType, boolean isSessionEn // Now create receiver if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - this.sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) + this.sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection, DEFAULT_RETRY_OPTIONS) .buildAsyncClient()); this.receiver = toClose(this.sessionReceiver.acceptSession(sessionId).block()); } else { @@ -1482,8 +1484,9 @@ private ServiceBusReceiverAsyncClient createReceiver(MessagingEntityType entityT final boolean useCredentials = false; if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) + sessionReceiver = toClose(getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection, DEFAULT_RETRY_OPTIONS) .maxAutoLockRenewDuration(options.getMaxAutoLockRenewDuration()) + .sessionIdleTimeout(options.getSessionIdleTimeout()) .disableAutoComplete() .buildAsyncClient()); @@ -1514,13 +1517,22 @@ private Mono sendMessage(ServiceBusMessage message) { public static class ClientCreationOptions { Duration maxAutoLockRenewDuration; + Duration sessionIdleTimeout; ClientCreationOptions setMaxAutoLockRenewDuration(Duration maxAutoLockRenewDuration) { this.maxAutoLockRenewDuration = maxAutoLockRenewDuration; return this; } + ClientCreationOptions setSessionIdleTimeout(Duration sessionIdleTimeout) { + this.sessionIdleTimeout = sessionIdleTimeout; + return this; + } + Duration getMaxAutoLockRenewDuration() { return this.maxAutoLockRenewDuration; } + Duration getSessionIdleTimeout() { + return this.sessionIdleTimeout; + } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index df4085910731..c87ae53200a8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -85,6 +85,8 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions; +import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions; import static com.azure.messaging.servicebus.TestUtils.getMessage; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -122,7 +124,6 @@ class ServiceBusReceiverAsyncClientTest { private static final Duration CLEANUP_INTERVAL = Duration.ofSeconds(10); private static final String SESSION_ID = "my-session-id"; private static final String CLIENT_IDENTIFIER = "my-client-identifier"; - private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientTest.class); private final String messageTrackingUUID = UUID.randomUUID().toString(); private final ReplayProcessor endpointProcessor = ReplayProcessor.cacheLast(); @@ -204,12 +205,11 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, Sc connectionOptions.getRetry())); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), + createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); sessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false, SESSION_ID, - null), + createNamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false, SESSION_ID), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, mock(ServiceBusSessionManager.class)); } @@ -339,15 +339,15 @@ void receivesMessageLockRenewSessionOnly() { final Duration maxLockRenewDuration = Duration.ofMinutes(1); final ServiceBusSessionManager sessionManager = mock(ServiceBusSessionManager.class); ServiceBusReceiverAsyncClient mySessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, maxLockRenewDuration, - false, SESSION_ID, null), connectionProcessor, + createNamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, maxLockRenewDuration, + false, SESSION_ID), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, sessionManager); // This needs to be used with "try with resource" : https://javadoc.io/static/org.mockito/mockito-core/3.9.0/org/mockito/Mockito.html#static_mocks try ( MockedConstruction mockedAutoLockRenew = Mockito.mockConstructionWithAnswer(FluxAutoLockRenew.class, invocationOnMock -> new FluxAutoLockRenew(Flux.empty(), - new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, Duration.ofSeconds(30), + createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, Duration.ofSeconds(30), true), new LockContainer<>(Duration.ofSeconds(30)), (lock) -> Mono.empty()))) { @@ -432,7 +432,7 @@ void completeNullMessage() { */ @Test void completeInReceiveAndDeleteMode() { - final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, PREFETCH, null, false); + final ReceiverOptions options = createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, PREFETCH, null, false); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); @@ -452,7 +452,7 @@ void completeInReceiveAndDeleteMode() { @Test void throwsExceptionAboutSettlingPeekedMessagesWithNullLockToken() { - final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false); + final ReceiverOptions options = createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); @@ -570,7 +570,7 @@ void errorSourceOnRenewMessageLock() { when(managementNode.renewMessageLock(lockToken, null)) .thenReturn(Mono.error(new AzureException("some error occurred."))); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); @@ -594,8 +594,7 @@ void errorSourceOnSessionLock() { // Arrange when(managementNode.renewSessionLock(SESSION_ID, null)).thenReturn(Mono.error(new AzureException("some error occurred."))); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true, SESSION_ID, - null); + final ReceiverOptions receiverOptions = createNamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true, SESSION_ID); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, mock(ServiceBusSessionManager.class)); @@ -667,7 +666,7 @@ void errorSourceAutoCompleteMessage() { final int messagesToReceive = 1; final List messages = getMessages(); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); @@ -715,7 +714,7 @@ void errorSourceOnReceiveMessage() { when(receivedMessage.getLockToken()).thenReturn(lockToken); when(receivedMessage.getLockedUntil()).thenReturn(expiration); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); @@ -1036,8 +1035,8 @@ void getSessionState() { final byte[] bytes = new byte[]{95, 11, 54, 10}; final ServiceBusSessionManager sessionManager = mock(ServiceBusSessionManager.class); ServiceBusReceiverAsyncClient mySessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, CLEANUP_INTERVAL, - false, SESSION_ID, null), connectionProcessor, + createNamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, CLEANUP_INTERVAL, + false, SESSION_ID), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, sessionManager); when(sessionManager.getSessionState(SESSION_ID)) @@ -1256,7 +1255,7 @@ void autoCompleteMessage() { final int numberOfEvents = 3; final List messages = getMessages(); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); @@ -1290,8 +1289,8 @@ void autoCompleteMessageSessionReceiver() { final String lockToken3 = "token3"; final ServiceBusSessionManager sessionManager = mock(ServiceBusSessionManager.class); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, - true, SESSION_ID, null); + final ReceiverOptions receiverOptions = createNamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, + true, SESSION_ID); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, sessionManager); @@ -1334,7 +1333,7 @@ void receiveMessagesReportsMetricsAsyncInstr() { ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(null, meter, NAMESPACE, ENTITY_PATH, SUBSCRIPTION_NAME, false); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), + createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); // Arrange @@ -1375,7 +1374,7 @@ void settlementMessagesReportsMetrics(DispositionStatus status) { ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(null, meter, NAMESPACE, ENTITY_PATH, SUBSCRIPTION_NAME, false); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), + createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); when(receivedMessage.getLockToken()).thenReturn("mylockToken"); when(receivedMessage.getSequenceNumber()).thenReturn(42L); @@ -1460,7 +1459,7 @@ void receiveWithTracesAndMetrics() { ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, meter, NAMESPACE, ENTITY_PATH, SUBSCRIPTION_NAME, false); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), + createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); // Arrange @@ -1513,7 +1512,7 @@ void receiveMessageNegativeLagReportsMetricsAsyncInstr() { ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(null, meter, NAMESPACE, ENTITY_PATH, null, false); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), + createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); // Arrange @@ -1545,7 +1544,7 @@ void receiveMessageNegativeLagReportsMetricsSyncInstr() { ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(null, meter, NAMESPACE, ENTITY_PATH, null, true); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), + createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, instrumentation, messageSerializer, onClientClose, CLIENT_IDENTIFIER); // Arrange diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index 2fb18c5a4a5a..75286f87f34a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -862,7 +862,7 @@ private void setReceiver(MessagingEntityType entityType, int entityIndex, boolea if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - this.sessionReceiver = toClose(getSessionReceiverBuilder(false, entityType, entityIndex, sharedConnection) + this.sessionReceiver = toClose(getSessionReceiverBuilder(false, entityType, entityIndex, sharedConnection, null) .buildClient()); this.receiver = toClose(this.sessionReceiver.acceptSession(sessionId)); } else { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 8d2c20b7fd8d..3df3c4a3597b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -37,6 +37,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -86,7 +87,7 @@ void setup() { MockitoAnnotations.initMocks(this); when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); - when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 0, null, false)); + when(asyncClient.getReceiverOptions()).thenReturn(createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 0, null, false)); when(asyncClient.getIdentifier()).thenReturn(CLIENT_IDENTIFIER); when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID); when(asyncClient.getInstrumentation()).thenReturn(new ServiceBusReceiverInstrumentation(null, null, NAMESPACE, ENTITY_PATH, null, false)); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java index d671e01f68df..c2edf791e44d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java @@ -3,6 +3,7 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder; @@ -12,24 +13,28 @@ import org.junit.jupiter.params.provider.MethodSource; import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.time.Duration; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static com.azure.messaging.servicebus.TestUtils.getServiceBusMessage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Integration tests for {@link ServiceBusSessionManager}. */ @Tag("integration") class ServiceBusSessionManagerIntegrationTest extends IntegrationTestBase { - private final AtomicInteger messagesPending = new AtomicInteger(); - + private static final AmqpRetryOptions DEFAULT_RETRY_OPTIONS = null; private ServiceBusReceiverAsyncClient receiver; private ServiceBusSenderAsyncClient sender; private ServiceBusSessionReceiverAsyncClient sessionReceiver; @@ -45,8 +50,6 @@ protected void beforeTest() { @Override protected void afterTest() { - final int pending = messagesPending.get(); - logger.info("Pending messages: {}", pending); try { dispose(receiver, sender, sessionReceiver); } catch (Exception e) { @@ -69,7 +72,6 @@ void singleUnnamedSession(MessagingEntityType entityType) { .flatMap(index -> { final ServiceBusMessage message = getServiceBusMessage(contents, messageId) .setSessionId(sessionId); - messagesPending.incrementAndGet(); return sender.sendMessage(message).thenReturn(index); }) .subscribe( @@ -97,25 +99,112 @@ void singleUnnamedSession(MessagingEntityType entityType) { .verify(Duration.ofMinutes(2)); } + @ParameterizedTest + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + void rollingSessionOnIdleTimeout(MessagingEntityType entityType) throws InterruptedException { + final int entityIndex = TestUtils.USE_CASE_MULTIPLE_SESSIONS1; + final Duration sessionIdleTimeout = Duration.ofSeconds(3); + setSender(entityType, entityIndex); + + this.receiver = toClose(getSessionReceiverBuilder(false, + entityType, entityIndex, false, DEFAULT_RETRY_OPTIONS) + .disableAutoComplete() + .maxConcurrentSessions(1) + .sessionIdleTimeout(sessionIdleTimeout) + .buildAsyncClientForProcessor()); + + rollingSessionTest(); + } + + @ParameterizedTest + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + void rollingSessionOnTryTimeout(MessagingEntityType entityType) throws InterruptedException { + final int entityIndex = TestUtils.USE_CASE_MULTIPLE_SESSIONS3; + final Duration tryTimeout = Duration.ofSeconds(3); + final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(tryTimeout); + setSender(entityType, entityIndex); + + this.receiver = toClose(getSessionReceiverBuilder(false, + entityType, entityIndex, false, retryOptions) + .disableAutoComplete() + .maxConcurrentSessions(1) + .buildAsyncClientForProcessor()); + + rollingSessionTest(); + } + + private void rollingSessionTest() throws InterruptedException { + final String contents = "Some-contents"; + final String randomPrefix = UUID.randomUUID().toString(); + ServiceBusMessage message0 = getServiceBusMessage(contents, randomPrefix + "0").setSessionId(randomPrefix + "0"); + ServiceBusMessage message1 = getServiceBusMessage(contents, randomPrefix + "1").setSessionId(randomPrefix + "1"); + + CountDownLatch latch = new CountDownLatch(2); + toClose(sender.sendMessage(message0) + .thenMany(receiver.receiveMessages()) + .flatMap(m -> receiver.complete(m).thenReturn(m)) + .filter(m -> m.getMessageId().startsWith(randomPrefix)) + .flatMap(m -> + (message0.getMessageId().equals(m.getMessageId())) + ? sender.sendMessage(message1).thenReturn(m) : Mono.just(m) + ) + .subscribe(m -> latch.countDown(), ex -> fail(ex))); + + assertTrue(latch.await(20, TimeUnit.SECONDS)); + } + @ParameterizedTest + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + void noRollingSessionWhenNoConcurrentSessions(MessagingEntityType entityType) { + final int entityIndex = TestUtils.USE_CASE_MULTIPLE_SESSIONS2; + final String contents = "Some-contents"; + final Duration sessionIdleTimeout = Duration.ofSeconds(3); + final Duration tryTimeout = Duration.ofSeconds(10); + setSender(entityType, entityIndex); + final String randomPrefix = UUID.randomUUID().toString(); + ServiceBusMessage message0 = getServiceBusMessage(contents, randomPrefix + "0").setSessionId(randomPrefix + "0"); + ServiceBusMessage message1 = getServiceBusMessage(contents, randomPrefix + "1").setSessionId(randomPrefix + "1"); + + AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(tryTimeout); + this.receiver = toClose(getSessionReceiverBuilder(false, + entityType, entityIndex, false, retryOptions) + .disableAutoComplete() + .sessionIdleTimeout(sessionIdleTimeout) + .buildAsyncClientForProcessor()); + + AtomicReference sessionId = new AtomicReference<>(); + StepVerifier.create( + sender.sendMessage(message0) + .thenMany(receiver.receiveMessages()) + .flatMap(m -> receiver.complete(m).thenReturn(m)) + .flatMap(m -> { + if (!sessionId.compareAndSet(null, m.getSessionId())) { + assertEquals(sessionId.get(), m.getSessionId(), "session rolling should not happen"); + } + return sender.sendMessage(message1).thenReturn(m); + }) + ) + .expectNextCount(1) + .verifyTimeout(tryTimeout.plusSeconds(20)); + } + /** * Sets the sender and receiver. If session is enabled, then a single-named session receiver is created. */ private void setSender(MessagingEntityType entityType, int entityIndex) { - this.sender = toClose(getSenderBuilder(false, entityType, entityIndex, true, false) .buildAsyncClient()); } + private void setReceiver(MessagingEntityType entityType, int entityIndex, Function onBuild) { ServiceBusSessionReceiverClientBuilder sessionBuilder = getSessionReceiverBuilder(false, - entityType, entityIndex, false).disableAutoComplete(); + entityType, entityIndex, false, DEFAULT_RETRY_OPTIONS).disableAutoComplete(); this.sessionReceiver = toClose(onBuild.apply(sessionBuilder).buildAsyncClient()); this.receiver = toClose(this.sessionReceiver.acceptSession(sessionId).block()); } private static void assertMessageEquals(String sessionId, String messageId, String contents, ServiceBusReceivedMessage message) { - assertNotNull(message, "'message' should not be null."); if (!CoreUtils.isNullOrEmpty(sessionId)) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java index 8312ecfba366..f2984ce5aef7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java @@ -55,6 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static com.azure.messaging.servicebus.ReceiverOptions.createUnnamedSessionOptions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -78,7 +79,7 @@ class ServiceBusSessionManagerTest { private static final ClientOptions CLIENT_OPTIONS = new ClientOptions(); private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final Duration MAX_LOCK_RENEWAL = Duration.ofSeconds(5); - + private static final Duration SESSION_IDLE_TIMEOUT = Duration.ofSeconds(20); private static final String NAMESPACE = "my-namespace-foo.net"; private static final String ENTITY_PATH = "queue-name"; private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE; @@ -176,7 +177,7 @@ void afterEach(TestInfo testInfo) throws Exception { @Test void properties() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); + ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, 5, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -187,7 +188,7 @@ void properties() { @Test void receiveNull() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); + ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, 5, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -203,8 +204,7 @@ void receiveNull() { @Test void singleUnnamedSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, - 5); + ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, 5, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -258,8 +258,8 @@ void singleUnnamedSession() { @Test void singleUnnamedSessionLockRenew() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, - 1); + ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, + 1, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -316,8 +316,8 @@ void singleUnnamedSessionLockRenew() { @Test void multipleSessions() { // Arrange - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true, - null, 5); + final ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true, + 5, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -444,8 +444,8 @@ void multipleReceiveUnnamedSession() { // Arrange final int expectedLinksCreated = 2; final Callable onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5)); - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, - null, 1); + final ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, + 1, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -517,8 +517,8 @@ void multipleReceiveUnnamedSession() { @Test void singleUnnamedSessionCleanupAfterTimeout() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, - 2); + ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, + 2, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java index ca6364efcc0a..a979af9605e7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java @@ -44,6 +44,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; +import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions; +import static com.azure.messaging.servicebus.ReceiverOptions.createUnnamedSessionOptions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; @@ -57,8 +59,7 @@ class ServiceBusSessionReceiverAsyncClientTest { private static final ClientOptions CLIENT_OPTIONS = new ClientOptions(); private static final Duration TIMEOUT = Duration.ofSeconds(10); - private static final Duration MAX_LOCK_RENEWAL = Duration.ofSeconds(5); - + private static final Duration SESSION_IDLE_TIMEOUT = Duration.ofSeconds(20); private static final String NAMESPACE = "my-namespace-foo.net"; private static final String ENTITY_PATH = "queue-name"; private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE; @@ -152,11 +153,11 @@ void afterEach(TestInfo testInfo) throws Exception { @Test void acceptSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); final String lockToken = "a-lock-token"; final String linkName = "my-link-name"; final String sessionId = linkName; final OffsetDateTime sessionLockedUntil = OffsetDateTime.now().plus(Duration.ofSeconds(30)); + ReceiverOptions receiverOptions = createNamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, sessionId); final Message message = mock(Message.class); final ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class); @@ -203,7 +204,8 @@ void acceptSession() { @Test void acceptNextSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); + ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, + false, null, SESSION_IDLE_TIMEOUT); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, messageSerializer, receiverOptions, CLIENT_IDENTIFIER); @@ -327,12 +329,13 @@ void acceptNextSession() { @Test void specificSessionReceive() { // Arrange - final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, - Duration.ZERO, false, null, null); + final ReceiverOptions receiverOptions = createUnnamedSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, + Duration.ZERO, false, 1, SESSION_IDLE_TIMEOUT); final String lockToken = "a-lock-token"; final String linkName = "my-link-name"; final String sessionId = "my-session-id"; + final OffsetDateTime sessionLockedUntil = OffsetDateTime.now().plus(Duration.ofSeconds(30)); final Message message = mock(Message.class); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java index 64974581a866..aa7b9a565d53 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java @@ -41,6 +41,7 @@ public class ServiceBusSessionReceiverTest { private static final String NAMESPACE = "my-namespace-foo.net"; private static final String ENTITY_PATH = "queue-name"; private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiverTest.class); + private static final Duration NO_SESSION_IDLE_TIMEOUT = null; private final TestPublisher endpointProcessor = TestPublisher.createCold(); private final TestPublisher messagePublisher = TestPublisher.createCold(); @@ -99,14 +100,13 @@ public void getsProperties() { when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty()); final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); - final boolean disposeOnIdle = false; final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); // Act final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, - messageSerializer, retryOptions, 1, disposeOnIdle, scheduler, - unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration); + messageSerializer, retryOptions, 1, scheduler, + unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT); // Assert assertEquals(sessionId, sessionReceiver.getSessionId()); @@ -150,12 +150,11 @@ public void receivesMessages() { when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty()); final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); - final boolean disposeOnIdle = false; final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, - messageSerializer, retryOptions, 1, disposeOnIdle, scheduler, - unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration); + messageSerializer, retryOptions, 1, scheduler, + unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT); // Act & Assert try { @@ -220,13 +219,12 @@ public void disposesOnIdle() { final Duration waitTime = Duration.ofSeconds(3); final Duration halfWaitTime = Duration.ofSeconds(waitTime.getSeconds() / 2); final Duration timeout = Duration.ofSeconds(waitTime.getSeconds() * 3); - final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(waitTime); - final boolean disposeOnIdle = true; + final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofMinutes(10)); final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, - messageSerializer, retryOptions, 1, disposeOnIdle, scheduler, - unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration); + messageSerializer, retryOptions, 1, scheduler, + unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, waitTime); // Act & Assert try { @@ -281,12 +279,11 @@ public void removesLockOnUpdateDisposition() { when(amqpReceiveLink.closeAsync()).thenReturn(Mono.empty()); final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); - final boolean disposeOnIdle = false; final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, - messageSerializer, retryOptions, 1, disposeOnIdle, scheduler, - unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration); + messageSerializer, retryOptions, 1, scheduler, + unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT); // Act & Assert try { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java index 5659e71f76a4..694af16dcb33 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java @@ -82,7 +82,9 @@ public class TestUtils { static final int USE_CASE_SEND_SCHEDULED = 26; static final int USE_CASE_RECEIVE_AND_COMPLETE = 27; static final int USE_CASE_PEEK_MESSAGE = 28; - + static final int USE_CASE_MULTIPLE_SESSIONS1 = 29; + static final int USE_CASE_MULTIPLE_SESSIONS2 = 30; + static final int USE_CASE_MULTIPLE_SESSIONS3 = 31; static final Configuration GLOBAL_CONFIGURATION = Configuration.getGlobalConfiguration(); // An application property key to identify where in the stream this message was created. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java index 6e4b8693ae9a..a0dc8642726e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java @@ -117,7 +117,7 @@ void setup(TestInfo testInfo) throws IOException { when(connection.getShutdownSignals()).thenReturn(Flux.never()); reactorReceiver = new ServiceBusReactorReceiver(connection, ENTITY_PATH, receiver, receiveLinkHandler, - tokenManager, reactorProvider, Duration.ofSeconds(20), retryPolicy); + tokenManager, reactorProvider, retryPolicy); } @AfterEach diff --git a/sdk/servicebus/test-resources.json b/sdk/servicebus/test-resources.json index 24ced56ca609..d586785cb593 100644 --- a/sdk/servicebus/test-resources.json +++ b/sdk/servicebus/test-resources.json @@ -59,7 +59,7 @@ "namespaceName": "[concat('sb-java-', parameters('baseName'))]", "queueName": "queue", "queueSessionName": "queue-session", - "numberOfInstances": 29, + "numberOfInstances": 32, "subscriptionName": "subscription", "subscriptionSessionName": "subscription-session", "serviceBusDataOwnerRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419')]",