Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,18 @@ jobs:

- name: Build with Maven
run: mvn clean install

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts

- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
2 changes: 1 addition & 1 deletion integrations/rustlang-rdkafka/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn produce(brokers: &str, topic_name: &str, limit: i8) -> Result<(), std::

// This loop will wait until all delivery statuses have been received received.
for future in futures {
info!("Future completed. Result: {:?}", future.await);
println!("Future completed. Result: {:?}", future.await);
}
println!(
"produced all messages successfully ({})",
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.12.4</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
214 changes: 165 additions & 49 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions src/main/java/io/streamnative/kop/utils/TopicNameUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public static TopicName pulsarTopicName(String topic) {
}

public static TopicName pulsarTopicName(String topic, int partitionIndex, NamespaceName namespace) {
if (topic.startsWith(TopicDomain.persistent.value())) {
topic = topic.replace(TopicDomain.persistent.value() + "://", "");
}

if (topic.contains(namespace.getNamespaceObject().toString())) {
topic = topic.replace(namespace.getNamespaceObject().toString() + "/", "");
}
return TopicName.get(TopicDomain.persistent.value(),
namespace,
topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex);
Expand Down
208 changes: 208 additions & 0 deletions src/test/java/io/streamnative/kop/KafkaIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop;

import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.junit.AfterClass;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* This class is running Integration tests for Kafka clients.
* It uses testcontainers to spawn containers that will either:
* * produce a number of messages
* * consume a number of messages
*
* <p>As testcontainers is not capable of checking exitCode of the app running in the container,
* Every container should print the exitCode in stdout.
*
* <p>This class is waiting for some precise logs to come-up:
* * "ready to produce"
* * "ready to consume"
* * "produced all messages successfully"
* * "consumed all messages successfully"
*
* <p>This class is using environment variables to control the containers, such as:
* * broker address,
* * topic name,
* * produce or consume mode,
* * how many message to produce/consume,
*/
@Slf4j
public class KafkaIntegrationTest extends MockKafkaServiceBaseTest {

@DataProvider
public static Object[][] integrations() {
return new Object[][]{
{"golang-sarama", Optional.empty(), true, true},
{"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true},
{"golang-confluent-kafka", Optional.empty(), true, true},
// TODO: rustlang-rdkafka is failing on Github Actions and works locally, we need to investigate
// {"rustlang-rdkafka", Optional.empty(), true, true},
// consumer is broken, see integrations/README.md
{"node-kafka-node", Optional.empty(), true, false},
{"node-rdkafka", Optional.empty(), true, true},
};
}

private static WaitingConsumer createLogFollower(final GenericContainer container) {
final WaitingConsumer waitingConsumer = new WaitingConsumer();
container.followOutput(waitingConsumer);
return waitingConsumer;
}

private static void checkForErrorsInLogs(final String logs) {
assertFalse(logs.contains("no available broker to send metadata request to"));
assertFalse(logs.contains("panic"));
assertFalse(logs.contains("correlation ID didn't match"));
assertFalse(logs.contains("Required feature not supported by broker"));


if (logs.contains("starting to produce")) {
assertTrue(logs.contains("produced all messages successfully"));
}

if (logs.contains("starting to consume")) {
assertTrue(logs.contains("consumed all messages successfully"));
}
assertTrue(logs.contains("ExitCode=0"));
}

@BeforeClass
@Override
protected void setup() throws Exception {

super.resetConfig();
super.internalSetup();

if (!this.admin.clusters().getClusters().contains(this.configClusterName)) {
// so that clients can test short names
this.admin.clusters().createCluster(this.configClusterName,
new ClusterData("http://127.0.0.1:" + this.brokerWebservicePort));
} else {
this.admin.clusters().updateCluster(this.configClusterName,
new ClusterData("http://127.0.0.1:" + this.brokerWebservicePort));
}

if (!this.admin.tenants().getTenants().contains("public")) {
this.admin.tenants().createTenant("public",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
} else {
this.admin.tenants().updateTenant("public",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
}
if (!this.admin.namespaces().getNamespaces("public").contains("public/default")) {
this.admin.namespaces().createNamespace("public/default");
this.admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
this.admin.namespaces().setRetention("public/default",
new RetentionPolicies(60, 1000));
}
if (!this.admin.namespaces().getNamespaces("public").contains("public/__kafka")) {
this.admin.namespaces().createNamespace("public/__kafka");
this.admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test"));
this.admin.namespaces().setRetention("public/__kafka",
new RetentionPolicies(-1, -1));
}
Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort));
}

@Test(timeOut = 3 * 60_000, dataProvider = "integrations")
void simpleProduceAndConsume(final String integration, final Optional<String> topic,
final boolean shouldProduce, final boolean shouldConsume) throws Exception {
String topicName = topic.orElse(integration);
System.out.println("starting integration " + integration + " with topicName " + topicName);

this.getAdmin().topics().createPartitionedTopic(topicName, 1);

System.out.println("topic created");

final GenericContainer producer = new GenericContainer<>("streamnative/kop-test-" + integration)
.withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort)
.withEnv("KOP_PRODUCE", "true")
.withEnv("KOP_TOPIC", topic.orElse(integration))
.withEnv("KOP_LIMIT", "10")
.withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(KafkaIntegrationTest.log))
.waitingFor(Wait.forLogMessage("starting to produce\\n", 1))
.withNetworkMode("host");

final GenericContainer consumer = new GenericContainer<>("streamnative/kop-test-" + integration)
.withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort)
.withEnv("KOP_TOPIC", topic.orElse(integration))
.withEnv("KOP_CONSUME", "true")
.withEnv("KOP_LIMIT", "10")
.withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(KafkaIntegrationTest.log))
.waitingFor(Wait.forLogMessage("starting to consume\\n", 1))
.withNetworkMode("host");

WaitingConsumer producerWaitingConsumer = null;
WaitingConsumer consumerWaitingConsumer = null;
if (shouldProduce) {
producer.start();
producerWaitingConsumer = KafkaIntegrationTest.createLogFollower(producer);
System.out.println("producer started");
}

if (shouldConsume) {
consumer.start();
consumerWaitingConsumer = KafkaIntegrationTest.createLogFollower(consumer);
System.out.println("consumer started");
}

if (shouldProduce) {
producerWaitingConsumer.waitUntil(frame ->
frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS);
KafkaIntegrationTest.checkForErrorsInLogs(producer.getLogs());
}

if (shouldConsume) {
consumerWaitingConsumer.waitUntil(frame ->
frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS);
KafkaIntegrationTest.checkForErrorsInLogs(consumer.getLogs());
}
}

@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ protected void resetConfig() {
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.ofNullable(brokerWebservicePort));
this.conf.setClusterName(configClusterName);
this.conf.setAdvertisedAddress("localhost");
this.conf.setListeners(
PLAINTEXT_PREFIX + "localhost:" + kafkaBrokerPort + ","
+ SSL_PREFIX + "localhost:" + kafkaBrokerPortTls);
Expand Down