Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -210,6 +212,7 @@ public long applyAsLong(Object value) {
private final Map<String, String> connectionProperties;
private final Duration rpcTimeout;
private volatile ShutdownReason shutdownReason = null;
private final Runnable exchangeCommandVersionsCheck;

public Client() {
this(new ClientParameters());
Expand Down Expand Up @@ -373,6 +376,19 @@ public void write(
this.maxFrameSize(),
tuneState.getHeartbeat());
this.connectionProperties = open(parameters.virtualHost);
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
if (supportedCommands.stream()
.filter(i -> i.getKey() == COMMAND_STREAM_STATS)
.findAny()
.isPresent()) {
this.exchangeCommandVersionsCheck = () -> {};
} else {
this.exchangeCommandVersionsCheck =
() -> {
throw new UnsupportedOperationException(
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
};
}
started.set(true);
this.metricsCollector.openConnection();
} catch (RuntimeException e) {
Expand Down Expand Up @@ -1455,6 +1471,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
}

StreamStatsResponse streamStats(String stream) {
this.exchangeCommandVersionsCheck.run();
if (stream == null) {
throw new IllegalArgumentException("stream must not be null");
}
Expand Down Expand Up @@ -1548,6 +1565,22 @@ private EncodedMessageBatch createEncodedMessageBatch(Compression compression, i
channel.alloc(), compression.code(), compressionCodecFactory.get(compression), batchSize);
}

private Set<FrameHandlerInfo> maybeExchangeCommandVersions() {
Set<FrameHandlerInfo> supported = new HashSet<>();
try {
if (Utils.is3_11_OrMore(brokerVersion())) {
for (FrameHandlerInfo info : exchangeCommandVersions()) {
if (info.getKey() == COMMAND_STREAM_STATS) {
supported.add(info);
}
}
}
} catch (Exception e) {
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
}
return supported;
}

public interface OutboundEntityMappingCallback {

void handle(long publishingId, Object originalMessageOrBatch);
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
.key(name);
this.client = clientFactory.client(clientFactoryContext);
LOGGER.debug("Created consumer connection '{}'", connectionName);
maybeExchangeCommandVersions(client);
clientInitializedInManager.set(true);
}

Expand Down Expand Up @@ -1149,16 +1148,6 @@ public String toString() {
}
}

private static void maybeExchangeCommandVersions(Client client) {
try {
if (Utils.is3_11_OrMore(client.brokerVersion())) {
client.exchangeCommandVersions();
}
} catch (Exception e) {
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
}
}

private static final Predicate<Exception> RETRY_ON_TIMEOUT =
e -> e instanceof TimeoutStreamException;

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
Expand Down Expand Up @@ -184,6 +185,23 @@ public String toString() {
+ maxVersion
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FrameHandlerInfo that = (FrameHandlerInfo) o;
return key == that.key && minVersion == that.minVersion && maxVersion == that.maxVersion;
}

@Override
public int hashCode() {
return Objects.hash(key, minVersion, maxVersion);
}
}

static List<FrameHandlerInfo> commandVersions() {
Expand Down
11 changes: 1 addition & 10 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,16 +481,7 @@ public StreamStats queryStreamStats(String stream) {
StreamStatsResponse response =
locatorOperation(
Utils.namedFunction(
client -> {
if (Utils.is3_11_OrMore(client.brokerVersion())) {
return client.streamStats(stream);
} else {
throw new UnsupportedOperationException(
"QueryStringInfo is available only for RabbitMQ 3.11 or more.");
}
},
"Query stream stats on stream '%s'",
stream));
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
if (response.isOk()) {
Map<String, Long> info = response.getInfo();
BiFunction<String, String, LongSupplier> offsetSupplierLogic =
Expand Down