diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 4f426d6c40..82935c97dd 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -112,9 +112,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -210,6 +212,7 @@ public long applyAsLong(Object value) { private final Map connectionProperties; private final Duration rpcTimeout; private volatile ShutdownReason shutdownReason = null; + private final Runnable exchangeCommandVersionsCheck; public Client() { this(new ClientParameters()); @@ -373,6 +376,19 @@ public void write( this.maxFrameSize(), tuneState.getHeartbeat()); this.connectionProperties = open(parameters.virtualHost); + Set supportedCommands = maybeExchangeCommandVersions(); + if (supportedCommands.stream() + .filter(i -> i.getKey() == COMMAND_STREAM_STATS) + .findAny() + .isPresent()) { + this.exchangeCommandVersionsCheck = () -> {}; + } else { + this.exchangeCommandVersionsCheck = + () -> { + throw new UnsupportedOperationException( + "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); + }; + } started.set(true); this.metricsCollector.openConnection(); } catch (RuntimeException e) { @@ -1455,6 +1471,7 @@ List exchangeCommandVersions() { } StreamStatsResponse streamStats(String stream) { + this.exchangeCommandVersionsCheck.run(); if (stream == null) { throw new IllegalArgumentException("stream must not be null"); } @@ -1548,6 +1565,22 @@ private EncodedMessageBatch createEncodedMessageBatch(Compression compression, i channel.alloc(), compression.code(), compressionCodecFactory.get(compression), batchSize); } + private Set maybeExchangeCommandVersions() { + Set supported = new HashSet<>(); + try { + if (Utils.is3_11_OrMore(brokerVersion())) { + for (FrameHandlerInfo info : exchangeCommandVersions()) { + if (info.getKey() == COMMAND_STREAM_STATS) { + supported.add(info); + } + } + } + } catch (Exception e) { + LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); + } + return supported; + } + public interface OutboundEntityMappingCallback { void handle(long publishingId, Object originalMessageOrBatch); diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 6025b13fcb..4eef477922 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -729,7 +729,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa .key(name); this.client = clientFactory.client(clientFactoryContext); LOGGER.debug("Created consumer connection '{}'", connectionName); - maybeExchangeCommandVersions(client); clientInitializedInManager.set(true); } @@ -1149,16 +1148,6 @@ public String toString() { } } - private static void maybeExchangeCommandVersions(Client client) { - try { - if (Utils.is3_11_OrMore(client.brokerVersion())) { - client.exchangeCommandVersions(); - } - } catch (Exception e) { - LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); - } - } - private static final Predicate RETRY_ON_TIMEOUT = e -> e instanceof TimeoutStreamException; diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index 0255c84c30..6f6138b622 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -83,6 +83,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; @@ -184,6 +185,23 @@ public String toString() { + maxVersion + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FrameHandlerInfo that = (FrameHandlerInfo) o; + return key == that.key && minVersion == that.minVersion && maxVersion == that.maxVersion; + } + + @Override + public int hashCode() { + return Objects.hash(key, minVersion, maxVersion); + } } static List commandVersions() { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 7f6591791a..19978fde2e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -481,16 +481,7 @@ public StreamStats queryStreamStats(String stream) { StreamStatsResponse response = locatorOperation( Utils.namedFunction( - client -> { - if (Utils.is3_11_OrMore(client.brokerVersion())) { - return client.streamStats(stream); - } else { - throw new UnsupportedOperationException( - "QueryStringInfo is available only for RabbitMQ 3.11 or more."); - } - }, - "Query stream stats on stream '%s'", - stream)); + client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream)); if (response.isOk()) { Map info = response.getInfo(); BiFunction offsetSupplierLogic =