Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int
kConfig.setAllowAutoTopicCreation(true);
kConfig.setAllowAutoTopicCreationType("partitioned");
kConfig.setBrokerDeleteInactiveTopicsEnabled(false);
kConfig.setGroupInitialRebalanceDelayMs(0);
kConfig.setBrokerShutdownTimeoutMs(0);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int
kConfig.setBrokerDeleteInactiveTopicsEnabled(false);
kConfig.setSystemTopicEnabled(true);
kConfig.setTopicLevelPoliciesEnabled(true);
kConfig.setGroupInitialRebalanceDelayMs(0);
kConfig.setBrokerShutdownTimeoutMs(0);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public KafkaIntegrationTest(final String entryFormat) {
@DataProvider
public static Object[][] integrations() {
return new Object[][]{
{"golang-sarama", Optional.empty(), true, true},
{"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true},
// TODO: golang-sarama works well but it's very likely to complete before testcontainers catch the logs
// so that GenericContainer failed to start. Ignore these two cases first.
// See https://github.com/streamnative/kop/issues/629.
//{"golang-sarama", Optional.empty(), true, true},
//{"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true},
{"golang-confluent-kafka", Optional.empty(), true, true},
// TODO: rustlang-rdkafka is failing on Github Actions and works locally, we need to investigate
// {"rustlang-rdkafka", Optional.empty(), true, true},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

Expand Down Expand Up @@ -177,6 +178,9 @@ protected void resetConfig() {
+ SSL_PREFIX + "localhost:" + kafkaBrokerPortTls);
kafkaConfig.setEntryFormat(entryFormat);

// Speed up tests for reducing rebalance time
kafkaConfig.setGroupInitialRebalanceDelayMs(0);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Path handlerPath;
Expand Down Expand Up @@ -313,6 +317,8 @@ protected void restartBroker() throws Exception {
}

protected void stopBroker() throws Exception {
// set shutdown timeout to 0 for forceful shutdown
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsar.close();
}

Expand Down Expand Up @@ -344,6 +350,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();

doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor();
doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
}

public static MockZooKeeper createMockZooKeeper(String clusterName, String brokerUrl, String brokerUrlTls,
Expand Down