Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />
<allow pkg="kafka.utils" />
<allow pkg="kafka.test" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
Expand Down
77 changes: 57 additions & 20 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package kafka.test;

import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.test.TestUtils;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -32,6 +35,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
Expand All @@ -46,6 +50,10 @@ default boolean isKRaftTest() {
return type() == Type.KRAFT || type() == Type.CO_KRAFT;
}

Map<Integer, KafkaBroker> brokers();

Map<Integer, ControllerServer> controllers();

/**
* The immutable cluster configuration used to create this cluster.
*/
Expand All @@ -61,7 +69,9 @@ default boolean isKRaftTest() {
/**
* Return the set of all broker IDs configured for this test.
*/
Set<Integer> brokerIds();
default Set<Integer> brokerIds() {
return brokers().keySet();
}

/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
Expand Down Expand Up @@ -97,7 +107,11 @@ default Optional<ListenerName> controlPlaneListenerName() {
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
*/
Collection<SocketServer> brokerSocketServers();
default Collection<SocketServer> brokerSocketServers() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}

/**
* A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
Expand All @@ -108,17 +122,20 @@ default Optional<ListenerName> controlPlaneListenerName() {
/**
* Return any one of the broker servers. Throw an error if none are found
*/
SocketServer anyBrokerSocketServer();
default SocketServer anyBrokerSocketServer() {
return brokerSocketServers().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}

/**
* Return any one of the controller servers. Throw an error if none are found
*/
SocketServer anyControllerSocketServer();

/**
* Return a mapping of the underlying broker IDs to their supported features
*/
Map<Integer, BrokerFeatures> brokerFeatures();
default SocketServer anyControllerSocketServer() {
return controllerSocketServers().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}

String clusterId();

Expand All @@ -137,16 +154,6 @@ default Admin createAdminClient() {
return createAdminClient(new Properties());
}

void start();

void stop();

void shutdownBroker(int brokerId);

void startBroker(int brokerId);

void waitForReadyBrokers() throws InterruptedException;

default Set<GroupProtocol> supportedGroupProtocols() {
Map<String, String> serverProperties = config().serverProperties();
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
Expand All @@ -160,4 +167,34 @@ default Set<GroupProtocol> supportedGroupProtocols() {

return Collections.unmodifiableSet(supportedGroupProtocols);
}

//---------------------------[modify]---------------------------//

void start();

void stop();

void shutdownBroker(int brokerId);

void startBroker(int brokerId);

//---------------------------[wait]---------------------------//

void waitForReadyBrokers() throws InterruptedException;

default void waitForTopic(String topic, int partitions) throws InterruptedException {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I add a helper to make all ClusterInstance users :)

// wait for metadata
TestUtils.waitForCondition(
() -> brokers().values().stream().allMatch(broker -> partitions == 0 ?
broker.metadataCache().numPartitions(topic).isEmpty() :
broker.metadataCache().numPartitions(topic).contains(partitions)
), 60000L, topic + " metadata not propagated after 60000 ms");

for (ControllerServer controller : controllers().values()) {
long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset - 1;
TestUtils.waitForCondition(
() -> brokers().values().stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),
60000L, "Timeout waiting for controller metadata propagating to brokers");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.test.junit;

import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
Expand All @@ -39,6 +39,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -151,64 +152,31 @@ public String bootstrapControllers() {
return clusterReference.get().bootstrapControllers();
}

@Override
public Collection<SocketServer> brokerSocketServers() {
return brokers()
.map(BrokerServer::socketServer)
.collect(Collectors.toList());
}

@Override
public ListenerName clientListener() {
return ListenerName.normalised("EXTERNAL");
}

@Override
public Optional<ListenerName> controllerListenerName() {
return OptionConverters.toJava(controllers().findAny().get().config().controllerListenerNames().headOption().map(ListenerName::new));
return controllers().values().stream()
.findAny()
.flatMap(s -> OptionConverters.toJava(s.config().controllerListenerNames().headOption()))
.map(ListenerName::new);
}

@Override
public Collection<SocketServer> controllerSocketServers() {
return controllers()
return controllers().values().stream()
.map(ControllerServer::socketServer)
.collect(Collectors.toList());
}

@Override
public SocketServer anyBrokerSocketServer() {
return brokers()
.map(BrokerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}

@Override
public SocketServer anyControllerSocketServer() {
return controllers()
.map(ControllerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}

@Override
public Map<Integer, BrokerFeatures> brokerFeatures() {
return brokers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
BrokerServer::brokerFeatures
));
}

@Override
public String clusterId() {
return controllers().findFirst().map(ControllerServer::clusterId).orElse(
brokers().findFirst().map(BrokerServer::clusterId).orElseThrow(
() -> new RuntimeException("No controllers or brokers!"))
);
}

public Collection<ControllerServer> controllerServers() {
return controllers().collect(Collectors.toList());
return Stream.concat(controllers().values().stream().map(ControllerServer::clusterId),
brokers().values().stream().map(KafkaBroker::clusterId)).findFirst()
.orElseThrow(() -> new RuntimeException("No controllers or brokers!"));
}

@Override
Expand All @@ -223,16 +191,7 @@ public ClusterConfig config() {

@Override
public Set<Integer> controllerIds() {
return controllers()
.map(controllerServer -> controllerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public Set<Integer> brokerIds() {
return brokers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
return controllers().keySet();
}

@Override
Expand Down Expand Up @@ -295,12 +254,16 @@ private BrokerServer findBrokerOrThrow(int brokerId) {
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}

public Stream<BrokerServer> brokers() {
return clusterReference.get().brokers().values().stream();
@Override
public Map<Integer, KafkaBroker> brokers() {
return clusterReference.get().brokers().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Stream<ControllerServer> controllers() {
return clusterReference.get().controllers().values().stream();
@Override
public Map<Integer, ControllerServer> controllers() {
return Collections.unmodifiableMap(clusterReference.get().controllers());
}

}
Expand Down
65 changes: 17 additions & 48 deletions core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
Expand Down Expand Up @@ -52,7 +53,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
Expand Down Expand Up @@ -132,13 +132,6 @@ public String bootstrapControllers() {
throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
}

@Override
public Collection<SocketServer> brokerSocketServers() {
return servers()
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
}

@Override
public ListenerName clientListener() {
return clusterReference.get().listenerName();
Expand All @@ -152,40 +145,15 @@ public Optional<ListenerName> controlPlaneListenerName() {

@Override
public Collection<SocketServer> controllerSocketServers() {
return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
return brokers().values().stream()
.filter(s -> ((KafkaServer) s).kafkaController().isActive())
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}

@Override
public SocketServer anyBrokerSocketServer() {
return servers()
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}

@Override
public SocketServer anyControllerSocketServer() {
return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}

@Override
public Map<Integer, BrokerFeatures> brokerFeatures() {
return servers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
KafkaServer::brokerFeatures
));
}

@Override
public String clusterId() {
return servers().findFirst().map(KafkaServer::clusterId).orElseThrow(
return brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow(
() -> new RuntimeException("No broker instances found"));
}

Expand All @@ -204,13 +172,6 @@ public Set<Integer> controllerIds() {
return brokerIds();
}

@Override
public Set<Integer> brokerIds() {
return servers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
Expand Down Expand Up @@ -274,14 +235,22 @@ public void waitForReadyBrokers() throws InterruptedException {
}

private KafkaServer findBrokerOrThrow(int brokerId) {
return servers()
return brokers().values().stream()
.filter(server -> server.config().brokerId() == brokerId)
.map(s -> (KafkaServer) s)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}

public Stream<KafkaServer> servers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
@Override
public Map<Integer, ControllerServer> controllers() {
return Collections.emptyMap();
}

@Override
public Map<Integer, KafkaBroker> brokers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers())
.stream().collect(Collectors.toMap(s -> s.config().brokerId(), s -> s));
}
}

Expand Down
Loading