From 13f4f46eab8fa9086225d7635a7aa98127748e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 31 Jan 2023 16:17:16 +0100 Subject: [PATCH] Exchange command versions in connection Command availability (only StreamStats for now) was based on broker version. This commit makes the exchange for each open connection and make sure the users of Client won't call a command not supported by the broker. --- .../java/com/rabbitmq/stream/impl/Client.java | 33 +++++++++++++++++++ .../stream/impl/ConsumersCoordinator.java | 11 ------- .../stream/impl/ServerFrameHandler.java | 18 ++++++++++ .../stream/impl/StreamEnvironment.java | 11 +------ 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 4f426d6c40..82935c97dd 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -112,9 +112,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -210,6 +212,7 @@ public long applyAsLong(Object value) { private final Map connectionProperties; private final Duration rpcTimeout; private volatile ShutdownReason shutdownReason = null; + private final Runnable exchangeCommandVersionsCheck; public Client() { this(new ClientParameters()); @@ -373,6 +376,19 @@ public void write( this.maxFrameSize(), tuneState.getHeartbeat()); this.connectionProperties = open(parameters.virtualHost); + Set supportedCommands = maybeExchangeCommandVersions(); + if (supportedCommands.stream() + .filter(i -> i.getKey() == COMMAND_STREAM_STATS) + .findAny() + .isPresent()) { + this.exchangeCommandVersionsCheck = () -> {}; + } else { + this.exchangeCommandVersionsCheck = + () -> { + throw new UnsupportedOperationException( + "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); + }; + } started.set(true); this.metricsCollector.openConnection(); } catch (RuntimeException e) { @@ -1455,6 +1471,7 @@ List exchangeCommandVersions() { } StreamStatsResponse streamStats(String stream) { + this.exchangeCommandVersionsCheck.run(); if (stream == null) { throw new IllegalArgumentException("stream must not be null"); } @@ -1548,6 +1565,22 @@ private EncodedMessageBatch createEncodedMessageBatch(Compression compression, i channel.alloc(), compression.code(), compressionCodecFactory.get(compression), batchSize); } + private Set maybeExchangeCommandVersions() { + Set supported = new HashSet<>(); + try { + if (Utils.is3_11_OrMore(brokerVersion())) { + for (FrameHandlerInfo info : exchangeCommandVersions()) { + if (info.getKey() == COMMAND_STREAM_STATS) { + supported.add(info); + } + } + } + } catch (Exception e) { + LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); + } + return supported; + } + public interface OutboundEntityMappingCallback { void handle(long publishingId, Object originalMessageOrBatch); diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 6025b13fcb..4eef477922 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -729,7 +729,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa .key(name); this.client = clientFactory.client(clientFactoryContext); LOGGER.debug("Created consumer connection '{}'", connectionName); - maybeExchangeCommandVersions(client); clientInitializedInManager.set(true); } @@ -1149,16 +1148,6 @@ public String toString() { } } - private static void maybeExchangeCommandVersions(Client client) { - try { - if (Utils.is3_11_OrMore(client.brokerVersion())) { - client.exchangeCommandVersions(); - } - } catch (Exception e) { - LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); - } - } - private static final Predicate RETRY_ON_TIMEOUT = e -> e instanceof TimeoutStreamException; diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index 0255c84c30..6f6138b622 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -83,6 +83,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; @@ -184,6 +185,23 @@ public String toString() { + maxVersion + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FrameHandlerInfo that = (FrameHandlerInfo) o; + return key == that.key && minVersion == that.minVersion && maxVersion == that.maxVersion; + } + + @Override + public int hashCode() { + return Objects.hash(key, minVersion, maxVersion); + } } static List commandVersions() { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 7f6591791a..19978fde2e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -481,16 +481,7 @@ public StreamStats queryStreamStats(String stream) { StreamStatsResponse response = locatorOperation( Utils.namedFunction( - client -> { - if (Utils.is3_11_OrMore(client.brokerVersion())) { - return client.streamStats(stream); - } else { - throw new UnsupportedOperationException( - "QueryStringInfo is available only for RabbitMQ 3.11 or more."); - } - }, - "Query stream stats on stream '%s'", - stream)); + client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream)); if (response.isOk()) { Map info = response.getInfo(); BiFunction offsetSupplierLogic =