From 1ab5a7705f52d4ad16f5896053fd8bdcc1ff75c0 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Thu, 19 Dec 2019 17:30:21 +0100 Subject: [PATCH 01/21] Initial commit for integration tests using Sarama --- integrations/golang/sarama/Dockerfile | 9 ++ integrations/golang/sarama/go.mod | 5 + integrations/golang/sarama/go.sum | 52 ++++++++ integrations/golang/sarama/main.go | 125 ++++++++++++++++++ pom.xml | 6 + .../kop/MockKafkaServiceBaseTest.java | 1 - .../kop/integrations/GolangSaramaTest.java | 105 +++++++++++++++ 7 files changed, 302 insertions(+), 1 deletion(-) create mode 100644 integrations/golang/sarama/Dockerfile create mode 100644 integrations/golang/sarama/go.mod create mode 100644 integrations/golang/sarama/go.sum create mode 100644 integrations/golang/sarama/main.go create mode 100644 src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java diff --git a/integrations/golang/sarama/Dockerfile b/integrations/golang/sarama/Dockerfile new file mode 100644 index 0000000000..fb4b3b85c9 --- /dev/null +++ b/integrations/golang/sarama/Dockerfile @@ -0,0 +1,9 @@ +FROM golang:1.13.4-stretch + +WORKDIR /go/src/app +COPY . . + +RUN go get -d -v ./... +RUN go install -v ./... + +CMD [ "/go/bin/sarama-golang" ] \ No newline at end of file diff --git a/integrations/golang/sarama/go.mod b/integrations/golang/sarama/go.mod new file mode 100644 index 0000000000..c13ce190b7 --- /dev/null +++ b/integrations/golang/sarama/go.mod @@ -0,0 +1,5 @@ +module github.com/apache/pulsar/kop/integration/sarama-golang + +go 1.13 + +require github.com/Shopify/sarama v1.24.1 diff --git a/integrations/golang/sarama/go.sum b/integrations/golang/sarama/go.sum new file mode 100644 index 0000000000..f327335702 --- /dev/null +++ b/integrations/golang/sarama/go.sum @@ -0,0 +1,52 @@ +github.com/Shopify/sarama v1.24.1 h1:svn9vfN3R1Hz21WR2Gj0VW9ehaDGkiOS+VqlIcZOkMI= +github.com/Shopify/sarama v1.24.1/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= +github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw= +github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= diff --git a/integrations/golang/sarama/main.go b/integrations/golang/sarama/main.go new file mode 100644 index 0000000000..5e6da7052e --- /dev/null +++ b/integrations/golang/sarama/main.go @@ -0,0 +1,125 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "sync" + + "github.com/Shopify/sarama" +) + +type exampleConsumerGroupHandler struct { + counter int + limit int + wg *sync.WaitGroup +} + +func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset) + sess.MarkMessage(msg, "") + h.counter++ + fmt.Printf("received msg %d/%d\n", h.counter, h.limit) + if h.counter == h.limit { + fmt.Println("limit reached, exiting") + h.wg.Done() + } + } + return nil +} + +func main() { + + nbrMessages, err := strconv.Atoi(getEnv("KOP_NBR_MESSAGES", "10")) + if err != nil { + panic(err) + } + limit, err := strconv.Atoi(getEnv("KOP_EXPECT_MESSAGES", "10")) + if err != nil { + panic(err) + } + + // Init config, specify appropriate version + config := sarama.NewConfig() + config.Version = sarama.V2_0_0_0 + config.Metadata.Retry.Max = 0 + config.Consumer.Return.Errors = true + config.Producer.Return.Successes = true + brokers := []string{getEnv("KOP_BROKER", "localhost:9092")} + topic := getEnv("KOP_TOPIC", "my-sarama-topic") + topics := []string{topic} + sarama.Logger = log.New(os.Stdout, "", log.Ltime) + + fmt.Println("connecting to", brokers) + + // Start with a client + client, err := sarama.NewClient(brokers, config) + if err != nil { + panic(err) + } + defer func() { _ = client.Close() }() + + var waitgroup sync.WaitGroup + waitgroup.Add(1) + + // Start a new consumer group + group, err := sarama.NewConsumerGroupFromClient("sarama-consumer", client) + if err != nil { + panic(err) + } + defer func() { _ = group.Close() }() + + fmt.Println("ready to consume") + + // Iterate over consumer sessions. + ctx := context.Background() + go func() { + handler := exampleConsumerGroupHandler{counter: 0, limit: limit, wg: &waitgroup} + + err := group.Consume(ctx, topics, handler) + if err != nil { + panic(err) + } + + }() + + syncProducer, err := sarama.NewSyncProducerFromClient(client) + if err != nil { + panic(err) + } + defer func() { _ = syncProducer.Close() }() + + fmt.Println("starting to produce") + + for i := 0; i < nbrMessages; i++ { + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder("hello from sarama"), + Metadata: "test", + } + + fmt.Println("send a message") + + _, _, err := syncProducer.SendMessage(msg) + if err != nil { + panic(err) + } + } + fmt.Printf("produced %d messages, waiting for consumption...\n", nbrMessages) + + waitgroup.Wait() + fmt.Println("exiting normally") +} + +func getEnv(key, fallback string) string { + value, exists := os.LookupEnv(key) + if !exists { + value = fallback + } + return value +} diff --git a/pom.xml b/pom.xml index 1ffbfb33f0..d584713bf5 100644 --- a/pom.xml +++ b/pom.xml @@ -212,6 +212,12 @@ test + + org.testcontainers + testcontainers + 1.12.4 + test + diff --git a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java index cbac189dff..c15e8940ab 100644 --- a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java +++ b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java @@ -105,7 +105,6 @@ protected void resetConfig() { this.conf.setAdvertisedAddress("localhost"); this.conf.setWebServicePort(Optional.ofNullable(brokerWebservicePort)); this.conf.setClusterName(configClusterName); - this.conf.setAdvertisedAddress("localhost"); this.conf.setListeners( PLAINTEXT_PREFIX + "localhost:" + kafkaBrokerPort + "," + SSL_PREFIX + "localhost:" + kafkaBrokerPortTls); diff --git a/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java b/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java new file mode 100644 index 0000000000..f401c883c6 --- /dev/null +++ b/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java @@ -0,0 +1,105 @@ +package io.streamnative.kop.integrations; + +import com.google.common.collect.Sets; +import io.streamnative.kop.MockKafkaServiceBaseTest; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.junit.AfterClass; +import org.junit.ClassRule; +import org.testcontainers.Testcontainers; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.WaitingConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import static org.testcontainers.containers.output.OutputFrame.OutputType.STDOUT; +import static org.testng.AssertJUnit.assertFalse; + +@Slf4j +public class GolangSaramaTest extends MockKafkaServiceBaseTest { + + @ClassRule + public GenericContainer container = new GenericContainer<>( + new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/golang/sarama"))); + + @BeforeClass + @Override + protected void setup() throws Exception { + + super.resetConfig(); + super.internalSetup(); + + if (!admin.clusters().getClusters().contains(configClusterName)) { + // so that clients can test short names + admin.clusters().createCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } else { + admin.clusters().updateCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } + + if (!admin.tenants().getTenants().contains("public")) { + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } else { + admin.tenants().updateTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } + if (!admin.namespaces().getNamespaces("public").contains("public/default")) { + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(60, 1000)); + } + if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) { + admin.namespaces().createNamespace("public/__kafka"); + admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/__kafka", + new RetentionPolicies(-1, -1)); + } + getAdmin().topics().createPartitionedTopic("persistent://public/default/my-sarama-topic", 1); + + container + .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) + .withNetworkMode("host"); + Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort)); + } + + @Test(timeOut = 120_000) + void simpleProduceAndConsume() throws Exception { + System.out.println("building container"); + container.start(); + System.out.println("container started"); + + Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); + container.followOutput(logConsumer); + WaitingConsumer consumer = new WaitingConsumer(); + container.followOutput(consumer, STDOUT); + + consumer.waitUntil(frame -> + frame.getUtf8String().contains("reached"), 30, TimeUnit.SECONDS); + System.out.println("after reached"); + + checkForSaramaErrors(container.getLogs()); + } + + @Override + @AfterClass + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + private void checkForSaramaErrors(String logs) { + assertFalse(logs.contains("no available broker to send metadata request to")); + assertFalse(logs.contains("panic")); + assertFalse(logs.contains("correlation ID didn't match")); + } +} From c4bea7d0ba931f36edc08716bee7d4730c0d909d Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Mon, 30 Dec 2019 10:16:13 +0100 Subject: [PATCH 02/21] Split sarama integration tests into two applications --- integrations/golang/sarama/Dockerfile | 2 +- integrations/golang/sarama/main.go | 79 +++++++++------ .../streamnative/kop/KafkaRequestHandler.java | 5 +- .../kop/integrations/GolangSaramaTest.java | 95 +++++++++++++++---- 4 files changed, 127 insertions(+), 54 deletions(-) diff --git a/integrations/golang/sarama/Dockerfile b/integrations/golang/sarama/Dockerfile index fb4b3b85c9..3d97f9d004 100644 --- a/integrations/golang/sarama/Dockerfile +++ b/integrations/golang/sarama/Dockerfile @@ -6,4 +6,4 @@ COPY . . RUN go get -d -v ./... RUN go install -v ./... -CMD [ "/go/bin/sarama-golang" ] \ No newline at end of file +CMD sh -c '/go/bin/sarama-golang; echo "ExitCode=$?"' \ No newline at end of file diff --git a/integrations/golang/sarama/main.go b/integrations/golang/sarama/main.go index 5e6da7052e..03f8dbc44c 100644 --- a/integrations/golang/sarama/main.go +++ b/integrations/golang/sarama/main.go @@ -28,6 +28,7 @@ func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi if h.counter == h.limit { fmt.Println("limit reached, exiting") h.wg.Done() + return nil } } return nil @@ -35,6 +36,8 @@ func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi func main() { + fmt.Println("starting...") + nbrMessages, err := strconv.Atoi(getEnv("KOP_NBR_MESSAGES", "10")) if err != nil { panic(err) @@ -44,11 +47,22 @@ func main() { panic(err) } + shouldProduce, err := strconv.ParseBool(getEnv("KOP_PRODUCE", "false")) + if err != nil { + panic(err) + } + + shouldConsume, err := strconv.ParseBool(getEnv("KOP_CONSUME", "false")) + if err != nil { + panic(err) + } + // Init config, specify appropriate version config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 config.Metadata.Retry.Max = 0 config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Producer.Return.Successes = true brokers := []string{getEnv("KOP_BROKER", "localhost:9092")} topic := getEnv("KOP_TOPIC", "my-sarama-topic") @@ -65,55 +79,58 @@ func main() { defer func() { _ = client.Close() }() var waitgroup sync.WaitGroup - waitgroup.Add(1) + if shouldConsume { + waitgroup.Add(1) - // Start a new consumer group - group, err := sarama.NewConsumerGroupFromClient("sarama-consumer", client) - if err != nil { - panic(err) - } - defer func() { _ = group.Close() }() + // Start a new consumer group + group, err := sarama.NewConsumerGroupFromClient("sarama-consumer", client) + if err != nil { + panic(err) + } + defer func() { _ = group.Close() }() - fmt.Println("ready to consume") + fmt.Println("ready to consume") - // Iterate over consumer sessions. - ctx := context.Background() - go func() { + // Iterate over consumer sessions. + ctx := context.Background() handler := exampleConsumerGroupHandler{counter: 0, limit: limit, wg: &waitgroup} - err := group.Consume(ctx, topics, handler) + err = group.Consume(ctx, topics, handler) if err != nil { panic(err) } - }() - - syncProducer, err := sarama.NewSyncProducerFromClient(client) - if err != nil { - panic(err) } - defer func() { _ = syncProducer.Close() }() - fmt.Println("starting to produce") - - for i := 0; i < nbrMessages; i++ { - msg := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.StringEncoder("hello from sarama"), - Metadata: "test", + if shouldProduce { + syncProducer, err := sarama.NewSyncProducerFromClient(client) + if err != nil { + panic(err) } + defer func() { _ = syncProducer.Close() }() - fmt.Println("send a message") + fmt.Println("starting to produce") - _, _, err := syncProducer.SendMessage(msg) - if err != nil { - panic(err) + for i := 0; i < nbrMessages; i++ { + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder("hello from sarama"), + Metadata: "test", + } + + fmt.Println("send a message") + + _, _, err := syncProducer.SendMessage(msg) + if err != nil { + panic(err) + } } - } - fmt.Printf("produced %d messages, waiting for consumption...\n", nbrMessages) + fmt.Printf("produced all messages successfully (%d) \n", nbrMessages) + } waitgroup.Wait() fmt.Println("exiting normally") + } func getEnv(key, fallback string) string { diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index 82923eb8c3..51ffea150a 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -422,7 +422,10 @@ protected CompletableFuture handleTopicMetadataRequest(KafkaHe allTopicMetadata.add( new TopicMetadata( Errors.NONE, - TopicName.get(topic).getLocalName(), + // we should answer with the right name, either local of full-name, + // depending on what was asked + topic.startsWith("persistent://") ? + TopicName.get(topic).toString(): TopicName.get(topic).getLocalName(), false, partitionMetadatas)); diff --git a/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java b/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java index f401c883c6..58c655a381 100644 --- a/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java +++ b/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java @@ -7,14 +7,15 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.junit.AfterClass; -import org.junit.ClassRule; import org.testcontainers.Testcontainers; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.WaitingConsumer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.nio.file.Paths; @@ -22,13 +23,20 @@ import static org.testcontainers.containers.output.OutputFrame.OutputType.STDOUT; import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; @Slf4j public class GolangSaramaTest extends MockKafkaServiceBaseTest { - @ClassRule - public GenericContainer container = new GenericContainer<>( - new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/golang/sarama"))); + private static final String SHORT_TOPIC_NAME = "my-sarama-topic"; + private static final String LONG_TOPIC_NAME = "persistent://public/default/my-sarama-topic-full-name"; + + @DataProvider + public static Object[][] topics() { + return new Object[][]{ + {SHORT_TOPIC_NAME}, {LONG_TOPIC_NAME} + }; + } @BeforeClass @Override @@ -65,30 +73,55 @@ protected void setup() throws Exception { admin.namespaces().setRetention("public/__kafka", new RetentionPolicies(-1, -1)); } - getAdmin().topics().createPartitionedTopic("persistent://public/default/my-sarama-topic", 1); + getAdmin().topics().createPartitionedTopic("persistent://public/default/" + SHORT_TOPIC_NAME, 1); + getAdmin().topics().createPartitionedTopic(LONG_TOPIC_NAME, 1); + - container - .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) - .withNetworkMode("host"); Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort)); } - @Test(timeOut = 120_000) - void simpleProduceAndConsume() throws Exception { - System.out.println("building container"); - container.start(); - System.out.println("container started"); + @Test(timeOut = 60_000, dataProvider = "topics") + void simpleProduceAndConsume(String topic) throws Exception { - Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); - container.followOutput(logConsumer); - WaitingConsumer consumer = new WaitingConsumer(); - container.followOutput(consumer, STDOUT); + GenericContainer producer = new GenericContainer<>( + new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/golang/sarama"))); + + GenericContainer consumer = new GenericContainer<>( + new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/golang/sarama"))); + + producer + .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) + .withEnv("KOP_PRODUCE", "true") + .withEnv("KOP_TOPIC", topic) + .waitingFor( + Wait.forLogMessage("starting to produce\\n", 1) + ) + .withNetworkMode("host"); + + consumer + .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) + .withEnv("KOP_TOPIC", topic) + .withEnv("KOP_CONSUME", "true") + .waitingFor( + Wait.forLogMessage("ready to consume\\n", 1) + ) + .withNetworkMode("host"); + + producer.start(); + WaitingConsumer consumerWaitingConsumer = createLogFollower(producer); + System.out.println("producer started"); + consumer.start(); + WaitingConsumer producerWaitingConsumer = createLogFollower(consumer); + System.out.println("consumer started"); - consumer.waitUntil(frame -> - frame.getUtf8String().contains("reached"), 30, TimeUnit.SECONDS); - System.out.println("after reached"); - checkForSaramaErrors(container.getLogs()); + producerWaitingConsumer.waitUntil(frame -> + frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + consumerWaitingConsumer.waitUntil(frame -> + frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + + checkForSaramaErrors(producer.getLogs()); + checkForSaramaErrors(consumer.getLogs()); } @Override @@ -97,9 +130,29 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + + private WaitingConsumer createLogFollower(GenericContainer container) { + Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); + container.followOutput(logConsumer); + WaitingConsumer waitingConsumer = new WaitingConsumer(); + container.followOutput(waitingConsumer, STDOUT); + + return waitingConsumer; + } + private void checkForSaramaErrors(String logs) { assertFalse(logs.contains("no available broker to send metadata request to")); assertFalse(logs.contains("panic")); assertFalse(logs.contains("correlation ID didn't match")); + + if (logs.contains("starting to produce")) { + assertTrue(logs.contains("produced all messages successfully")); + } + + if (logs.contains("ready to consume")) { + assertTrue(logs.contains("received msg")); + assertTrue(logs.contains("limit reached, exiting")); + } + assertTrue(logs.contains("ExitCode=0")); } } From 57cdc81677aaeecc09d3fb25808f46ad83f6e120 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Mon, 30 Dec 2019 14:33:23 +0100 Subject: [PATCH 03/21] refactor integration tests --- integrations/golang/sarama/Dockerfile | 9 -- integrations/golang/sarama/go.mod | 5 - integrations/golang/sarama/go.sum | 52 ------- integrations/golang/sarama/main.go | 142 ------------------ ...amaTest.java => KafkaIntegrationTest.java} | 48 +++--- 5 files changed, 20 insertions(+), 236 deletions(-) delete mode 100644 integrations/golang/sarama/Dockerfile delete mode 100644 integrations/golang/sarama/go.mod delete mode 100644 integrations/golang/sarama/go.sum delete mode 100644 integrations/golang/sarama/main.go rename src/test/java/io/streamnative/kop/{integrations/GolangSaramaTest.java => KafkaIntegrationTest.java} (82%) diff --git a/integrations/golang/sarama/Dockerfile b/integrations/golang/sarama/Dockerfile deleted file mode 100644 index 3d97f9d004..0000000000 --- a/integrations/golang/sarama/Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM golang:1.13.4-stretch - -WORKDIR /go/src/app -COPY . . - -RUN go get -d -v ./... -RUN go install -v ./... - -CMD sh -c '/go/bin/sarama-golang; echo "ExitCode=$?"' \ No newline at end of file diff --git a/integrations/golang/sarama/go.mod b/integrations/golang/sarama/go.mod deleted file mode 100644 index c13ce190b7..0000000000 --- a/integrations/golang/sarama/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module github.com/apache/pulsar/kop/integration/sarama-golang - -go 1.13 - -require github.com/Shopify/sarama v1.24.1 diff --git a/integrations/golang/sarama/go.sum b/integrations/golang/sarama/go.sum deleted file mode 100644 index f327335702..0000000000 --- a/integrations/golang/sarama/go.sum +++ /dev/null @@ -1,52 +0,0 @@ -github.com/Shopify/sarama v1.24.1 h1:svn9vfN3R1Hz21WR2Gj0VW9ehaDGkiOS+VqlIcZOkMI= -github.com/Shopify/sarama v1.24.1/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= -github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= -github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= -github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= -github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= -github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= -github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= -github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= -github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw= -github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= -github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE= -golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= -gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= -gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= -gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= -gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= -gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= -gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= diff --git a/integrations/golang/sarama/main.go b/integrations/golang/sarama/main.go deleted file mode 100644 index 03f8dbc44c..0000000000 --- a/integrations/golang/sarama/main.go +++ /dev/null @@ -1,142 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "strconv" - "sync" - - "github.com/Shopify/sarama" -) - -type exampleConsumerGroupHandler struct { - counter int - limit int - wg *sync.WaitGroup -} - -func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset) - sess.MarkMessage(msg, "") - h.counter++ - fmt.Printf("received msg %d/%d\n", h.counter, h.limit) - if h.counter == h.limit { - fmt.Println("limit reached, exiting") - h.wg.Done() - return nil - } - } - return nil -} - -func main() { - - fmt.Println("starting...") - - nbrMessages, err := strconv.Atoi(getEnv("KOP_NBR_MESSAGES", "10")) - if err != nil { - panic(err) - } - limit, err := strconv.Atoi(getEnv("KOP_EXPECT_MESSAGES", "10")) - if err != nil { - panic(err) - } - - shouldProduce, err := strconv.ParseBool(getEnv("KOP_PRODUCE", "false")) - if err != nil { - panic(err) - } - - shouldConsume, err := strconv.ParseBool(getEnv("KOP_CONSUME", "false")) - if err != nil { - panic(err) - } - - // Init config, specify appropriate version - config := sarama.NewConfig() - config.Version = sarama.V2_0_0_0 - config.Metadata.Retry.Max = 0 - config.Consumer.Return.Errors = true - config.Consumer.Offsets.Initial = sarama.OffsetOldest - config.Producer.Return.Successes = true - brokers := []string{getEnv("KOP_BROKER", "localhost:9092")} - topic := getEnv("KOP_TOPIC", "my-sarama-topic") - topics := []string{topic} - sarama.Logger = log.New(os.Stdout, "", log.Ltime) - - fmt.Println("connecting to", brokers) - - // Start with a client - client, err := sarama.NewClient(brokers, config) - if err != nil { - panic(err) - } - defer func() { _ = client.Close() }() - - var waitgroup sync.WaitGroup - if shouldConsume { - waitgroup.Add(1) - - // Start a new consumer group - group, err := sarama.NewConsumerGroupFromClient("sarama-consumer", client) - if err != nil { - panic(err) - } - defer func() { _ = group.Close() }() - - fmt.Println("ready to consume") - - // Iterate over consumer sessions. - ctx := context.Background() - handler := exampleConsumerGroupHandler{counter: 0, limit: limit, wg: &waitgroup} - - err = group.Consume(ctx, topics, handler) - if err != nil { - panic(err) - } - - } - - if shouldProduce { - syncProducer, err := sarama.NewSyncProducerFromClient(client) - if err != nil { - panic(err) - } - defer func() { _ = syncProducer.Close() }() - - fmt.Println("starting to produce") - - for i := 0; i < nbrMessages; i++ { - msg := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.StringEncoder("hello from sarama"), - Metadata: "test", - } - - fmt.Println("send a message") - - _, _, err := syncProducer.SendMessage(msg) - if err != nil { - panic(err) - } - } - fmt.Printf("produced all messages successfully (%d) \n", nbrMessages) - - } - waitgroup.Wait() - fmt.Println("exiting normally") - -} - -func getEnv(key, fallback string) string { - value, exists := os.LookupEnv(key) - if !exists { - value = fallback - } - return value -} diff --git a/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java similarity index 82% rename from src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java rename to src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 58c655a381..ca3dae120e 100644 --- a/src/test/java/io/streamnative/kop/integrations/GolangSaramaTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -1,4 +1,4 @@ -package io.streamnative.kop.integrations; +package io.streamnative.kop; import com.google.common.collect.Sets; import io.streamnative.kop.MockKafkaServiceBaseTest; @@ -26,15 +26,13 @@ import static org.testng.AssertJUnit.assertTrue; @Slf4j -public class GolangSaramaTest extends MockKafkaServiceBaseTest { - - private static final String SHORT_TOPIC_NAME = "my-sarama-topic"; - private static final String LONG_TOPIC_NAME = "persistent://public/default/my-sarama-topic-full-name"; +public class KafkaIntegrationTest extends MockKafkaServiceBaseTest { @DataProvider - public static Object[][] topics() { - return new Object[][]{ - {SHORT_TOPIC_NAME}, {LONG_TOPIC_NAME} + public static Object[][] integrations() { + return new Object[][] { + {"golang-sarama", "my-sarama-topic"}, + {"golang-sarama", "persistent://public/default/my-sarama-topic-full-name"}, }; } @@ -73,38 +71,32 @@ protected void setup() throws Exception { admin.namespaces().setRetention("public/__kafka", new RetentionPolicies(-1, -1)); } - getAdmin().topics().createPartitionedTopic("persistent://public/default/" + SHORT_TOPIC_NAME, 1); - getAdmin().topics().createPartitionedTopic(LONG_TOPIC_NAME, 1); - - Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort)); } - @Test(timeOut = 60_000, dataProvider = "topics") - void simpleProduceAndConsume(String topic) throws Exception { - - GenericContainer producer = new GenericContainer<>( - new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/golang/sarama"))); + @Test(timeOut = 60_000, dataProvider = "integrations") + void simpleProduceAndConsume(String integration, String topic) throws Exception { - GenericContainer consumer = new GenericContainer<>( - new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/golang/sarama"))); + getAdmin().topics().createPartitionedTopic(topic, 1); - producer + GenericContainer producer = new GenericContainer<>( + new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_PRODUCE", "true") .withEnv("KOP_TOPIC", topic) - .waitingFor( - Wait.forLogMessage("starting to produce\\n", 1) - ) + .withEnv("KOP_NBR_MESSAGES", "10") + .withEnv("KOP_EXPECT_MESSAGES", "10") + .waitingFor(Wait.forLogMessage("starting to produce\\n", 1)) .withNetworkMode("host"); - consumer + GenericContainer consumer = new GenericContainer<>( + new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_TOPIC", topic) .withEnv("KOP_CONSUME", "true") - .waitingFor( - Wait.forLogMessage("ready to consume\\n", 1) - ) + .withEnv("KOP_NBR_MESSAGES", "10") + .withEnv("KOP_EXPECT_MESSAGES", "10") + .waitingFor(Wait.forLogMessage("starting to consume\\n", 1)) .withNetworkMode("host"); producer.start(); @@ -149,7 +141,7 @@ private void checkForSaramaErrors(String logs) { assertTrue(logs.contains("produced all messages successfully")); } - if (logs.contains("ready to consume")) { + if (logs.contains("starting to consume")) { assertTrue(logs.contains("received msg")); assertTrue(logs.contains("limit reached, exiting")); } From 7c78311f9e9b1b5e16d98ff7d0aa17c1d5a90f51 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Mon, 30 Dec 2019 16:59:52 +0100 Subject: [PATCH 04/21] initial commit for integration test for confluent-golang client --- .../streamnative/kop/KafkaRequestHandler.java | 21 +++++++++++++++++-- .../kop/KafkaIntegrationTest.java | 12 +++++------ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index 51ffea150a..2f7934547e 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -41,6 +41,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -67,6 +68,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; @@ -201,13 +203,28 @@ protected void close() { } protected CompletableFuture handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest) { - ApiVersionsResponse apiResponse = ApiVersionsResponse.defaultApiVersionsResponse(); + AbstractResponse apiResponse = overloadDefaultApiVersionsResponse(); CompletableFuture resultFuture = new CompletableFuture<>(); - resultFuture.complete(apiResponse); return resultFuture; } + protected ApiVersionsResponse overloadDefaultApiVersionsResponse() { + List versionList = new ArrayList<>(); + for (ApiKeys apiKey : ApiKeys.values()) { + if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) { + switch (apiKey) { + case LIST_OFFSETS: + versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 1, apiKey.latestVersion())); + break; + default: + versionList.add(new ApiVersionsResponse.ApiVersion(apiKey)); + } + } + } + return new ApiVersionsResponse(0, Errors.NONE, versionList); + } + protected CompletableFuture handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest) { CompletableFuture resultFuture = new CompletableFuture<>(); String err = String.format("Kafka API (%s) Not supported by kop server.", diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index ca3dae120e..2b1aa14af2 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -1,7 +1,6 @@ package io.streamnative.kop; import com.google.common.collect.Sets; -import io.streamnative.kop.MockKafkaServiceBaseTest; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -31,8 +30,9 @@ public class KafkaIntegrationTest extends MockKafkaServiceBaseTest { @DataProvider public static Object[][] integrations() { return new Object[][] { - {"golang-sarama", "my-sarama-topic"}, - {"golang-sarama", "persistent://public/default/my-sarama-topic-full-name"}, + // {"golang-sarama", "my-sarama-topic"}, + // {"golang-sarama", "persistent://public/default/my-sarama-topic-full-name"}, + {"golang-confluent-kafka", "confluent-go"} }; } @@ -112,8 +112,8 @@ void simpleProduceAndConsume(String integration, String topic) throws Exception consumerWaitingConsumer.waitUntil(frame -> frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); - checkForSaramaErrors(producer.getLogs()); - checkForSaramaErrors(consumer.getLogs()); + checkForErrorsInLogs(producer.getLogs()); + checkForErrorsInLogs(consumer.getLogs()); } @Override @@ -132,7 +132,7 @@ private WaitingConsumer createLogFollower(GenericContainer container) { return waitingConsumer; } - private void checkForSaramaErrors(String logs) { + private void checkForErrorsInLogs(String logs) { assertFalse(logs.contains("no available broker to send metadata request to")); assertFalse(logs.contains("panic")); assertFalse(logs.contains("correlation ID didn't match")); From ce34d0d84c56fa3bda7bee722b1366bac7ba3b3f Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Tue, 31 Dec 2019 12:06:26 +0100 Subject: [PATCH 05/21] Initial commit for integration tests for rustlang --- .../kop/KafkaIntegrationTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 2b1aa14af2..353630f0f1 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -8,7 +8,6 @@ import org.junit.AfterClass; import org.testcontainers.Testcontainers; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.WaitingConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.ImageFromDockerfile; @@ -20,7 +19,6 @@ import java.nio.file.Paths; import java.util.concurrent.TimeUnit; -import static org.testcontainers.containers.output.OutputFrame.OutputType.STDOUT; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; @@ -30,9 +28,10 @@ public class KafkaIntegrationTest extends MockKafkaServiceBaseTest { @DataProvider public static Object[][] integrations() { return new Object[][] { - // {"golang-sarama", "my-sarama-topic"}, - // {"golang-sarama", "persistent://public/default/my-sarama-topic-full-name"}, - {"golang-confluent-kafka", "confluent-go"} + {"golang-sarama", "my-sarama-topic"}, + {"golang-sarama", "persistent://public/default/my-sarama-topic-full-name"}, + {"golang-confluent-kafka", "confluent-go"}, + {"rustlang-rdkafka", "rustlang-topic"}, }; } @@ -74,7 +73,7 @@ protected void setup() throws Exception { Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort)); } - @Test(timeOut = 60_000, dataProvider = "integrations") + @Test(timeOut = 120_000, dataProvider = "integrations") void simpleProduceAndConsume(String integration, String topic) throws Exception { getAdmin().topics().createPartitionedTopic(topic, 1); @@ -86,6 +85,7 @@ void simpleProduceAndConsume(String integration, String topic) throws Exception .withEnv("KOP_TOPIC", topic) .withEnv("KOP_NBR_MESSAGES", "10") .withEnv("KOP_EXPECT_MESSAGES", "10") + .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(log)) .waitingFor(Wait.forLogMessage("starting to produce\\n", 1)) .withNetworkMode("host"); @@ -96,6 +96,7 @@ void simpleProduceAndConsume(String integration, String topic) throws Exception .withEnv("KOP_CONSUME", "true") .withEnv("KOP_NBR_MESSAGES", "10") .withEnv("KOP_EXPECT_MESSAGES", "10") + .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(log)) .waitingFor(Wait.forLogMessage("starting to consume\\n", 1)) .withNetworkMode("host"); @@ -124,11 +125,8 @@ protected void cleanup() throws Exception { private WaitingConsumer createLogFollower(GenericContainer container) { - Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); - container.followOutput(logConsumer); WaitingConsumer waitingConsumer = new WaitingConsumer(); - container.followOutput(waitingConsumer, STDOUT); - + container.followOutput(waitingConsumer); return waitingConsumer; } @@ -136,6 +134,8 @@ private void checkForErrorsInLogs(String logs) { assertFalse(logs.contains("no available broker to send metadata request to")); assertFalse(logs.contains("panic")); assertFalse(logs.contains("correlation ID didn't match")); + assertFalse(logs.contains("Required feature not supported by broker")); + if (logs.contains("starting to produce")) { assertTrue(logs.contains("produced all messages successfully")); From e3fc20297a016ace71579d345ebe52ef838431f0 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Thu, 2 Jan 2020 15:54:59 +0100 Subject: [PATCH 06/21] Adding support of ListOffset in v0 --- .../streamnative/kop/KafkaRequestHandler.java | 169 +++++++++++++----- 1 file changed, 123 insertions(+), 46 deletions(-) diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index 2f7934547e..e2599354aa 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -215,7 +215,7 @@ protected ApiVersionsResponse overloadDefaultApiVersionsResponse() { if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) { switch (apiKey) { case LIST_OFFSETS: - versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 1, apiKey.latestVersion())); + versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, apiKey.latestVersion())); break; default: versionList.add(new ApiVersionsResponse.ApiVersion(apiKey)); @@ -602,7 +602,7 @@ protected CompletableFuture handleOffsetFetchRequest(KafkaHead } private CompletableFuture - fetchOffsetForTimestamp(CompletableFuture persistentTopic, Long timestamp) { + fetchOffsetForTimestamp(CompletableFuture persistentTopic, Long timestamp, boolean legacyMode) { CompletableFuture partitionData = new CompletableFuture<>(); persistentTopic.whenComplete((perTopic, t) -> { @@ -628,11 +628,20 @@ protected CompletableFuture handleOffsetFetchRequest(KafkaHead // no entry in ledger, then entry id could be -1 long entryId = position.getEntryId(); - partitionData.complete(new ListOffsetResponse.PartitionData( - Errors.NONE, - RecordBatch.NO_TIMESTAMP, - MessageIdUtils - .getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId))); + if (legacyMode) { + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + Collections.singletonList(MessageIdUtils + .getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId)))); + + } else { + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + RecordBatch.NO_TIMESTAMP, + MessageIdUtils + .getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId))); + } + } else if (timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) { PositionImpl position = OffsetFinder.getFirstValidPosition(managedLedger); @@ -641,10 +650,17 @@ protected CompletableFuture handleOffsetFetchRequest(KafkaHead perTopic.getName(), timestamp, position); } - partitionData.complete(new ListOffsetResponse.PartitionData( - Errors.NONE, - RecordBatch.NO_TIMESTAMP, - MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId()))); + if (legacyMode) { + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + Collections.singletonList(MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId())))); + } else { + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + RecordBatch.NO_TIMESTAMP, + MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId()))); + } + } else { // find with real wanted timestamp OffsetFinder offsetFinder = new OffsetFinder(managedLedger); @@ -659,11 +675,18 @@ public void findEntryComplete(Position position, Object ctx) { log.warn("Unable to find position for topic {} time {}. get NULL position", perTopic.getName(), timestamp); - partitionData.complete(new ListOffsetResponse - .PartitionData( - Errors.UNKNOWN_SERVER_ERROR, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET)); + if (legacyMode) { + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + Collections.emptyList())); + } else { + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)); + } return; } } else { @@ -674,10 +697,16 @@ public void findEntryComplete(Position position, Object ctx) { log.debug("Find position for topic {} time {}. position: {}", perTopic.getName(), timestamp, finalPosition); } - partitionData.complete(new ListOffsetResponse.PartitionData( - Errors.NONE, - RecordBatch.NO_TIMESTAMP, - MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId()))); + if (legacyMode) { + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + Collections.singletonList(MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId())))); + } else { + partitionData.complete(new ListOffsetResponse.PartitionData( + Errors.NONE, + RecordBatch.NO_TIMESTAMP, + MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId()))); + } } @Override @@ -685,11 +714,18 @@ public void findEntryFailed(ManagedLedgerException exception, Optional position, Object ctx) { log.warn("Unable to find position for topic {} time {}. Exception:", perTopic.getName(), timestamp, exception); - partitionData.complete(new ListOffsetResponse - .PartitionData( - Errors.UNKNOWN_SERVER_ERROR, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET)); + if (legacyMode) { + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + Collections.emptyList())); + } else { + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)); + } return; } }); @@ -712,43 +748,84 @@ private CompletableFuture handleListOffsetRequestV1AndAbove(Ka CompletableFuture partitionData; CompletableFuture persistentTopic = topicManager.getTopic(pulsarTopic.toString()); - partitionData = fetchOffsetForTimestamp(persistentTopic, times); + partitionData = fetchOffsetForTimestamp(persistentTopic, times, false); responseData.put(topic, partitionData); }); CompletableFuture - .allOf(responseData.values().stream().toArray(CompletableFuture[]::new)) - .whenComplete((ignore, ex) -> { - ListOffsetResponse response = - new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join())); + .allOf(responseData.values().stream().toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + ListOffsetResponse response = + new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join())); - resultFuture.complete(response); - }); + resultFuture.complete(response); + }); return resultFuture; } - // get offset from underline managedLedger - protected CompletableFuture handleListOffsetRequest(KafkaHeaderAndRequest listOffset) { - checkArgument(listOffset.getRequest() instanceof ListOffsetRequest); + // Some info can be found here https://web.archive.org/web/20170309152525/https://cfchou.github.io/blog/2015/04/23/a-closer-look-at-kafka-offsetrequest/ + private CompletableFuture handleListOffsetRequestV0(KafkaHeaderAndRequest listOffset) { + ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); - // not support version 0 - if (listOffset.getHeader().apiVersion() == 0) { - CompletableFuture resultFuture = new CompletableFuture<>(); - ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); + CompletableFuture resultFuture = new CompletableFuture<>(); + Map> responseData = Maps.newHashMap(); - log.error("ListOffset not support V0 format request"); + // in v0, the iterator is offsetData, + // in v1, the iterator is partitionTimestamps, + log.warn("received a v0 listOffset: {}", request.toString(true)); + request.offsetData().entrySet().stream().forEach(tms -> { + TopicPartition topic = tms.getKey(); + TopicName pulsarTopic = pulsarTopicName(topic, namespace); + Long times = tms.getValue().timestamp; + CompletableFuture partitionData; - ListOffsetResponse response = new ListOffsetResponse(CoreUtils.mapValue(request.partitionTimestamps(), - ignored -> new ListOffsetResponse - .PartitionData(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Lists.newArrayList()))); + // num_num_offsets > 1 is not handled for now, returning an error + if (tms.getValue().maxNumOffsets > 1) { + log.warn("request is asking for multiples offsets for {}, not supported for now", pulsarTopic.toString()); + partitionData = new CompletableFuture<>(); + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET))); + } - resultFuture.complete(response); + // topic not exist, return UNKNOWN_TOPIC_OR_PARTITION + if (!topicManager.topicExists(pulsarTopic.toString())) { + log.warn("Topic {} not exist in topic manager while list offset.", pulsarTopic.toString()); + partitionData = new CompletableFuture<>(); + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET))); + } else { + CompletableFuture persistentTopic = topicManager.getTopic(pulsarTopic.toString()); + partitionData = fetchOffsetForTimestamp(persistentTopic, times, true); + } + responseData.put(topic, partitionData); + }); - return resultFuture; - } + CompletableFuture + .allOf(responseData.values().stream().toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + ListOffsetResponse response = + new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join())); + + resultFuture.complete(response); + }); + return resultFuture; + } + + // get offset from underline managedLedger + protected CompletableFuture handleListOffsetRequest(KafkaHeaderAndRequest listOffset) { + checkArgument(listOffset.getRequest() instanceof ListOffsetRequest); + // the only difference between v0 and v1 is the `max_num_offsets => INT32` + // v0 is required because it is used by librdkafka + if (listOffset.getHeader().apiVersion() == 0) { + return handleListOffsetRequestV0(listOffset); + } return handleListOffsetRequestV1AndAbove(listOffset); } From 185d5f30aa84b79ede1d8720666f4a117e776e84 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Mon, 6 Jan 2020 13:40:36 +0100 Subject: [PATCH 07/21] Initial commit for node-kafka integration test --- .../streamnative/kop/KafkaRequestHandler.java | 5 ++ .../kop/KafkaIntegrationTest.java | 53 ++++++++++++------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index e2599354aa..c7d04e96b5 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -214,7 +214,12 @@ protected ApiVersionsResponse overloadDefaultApiVersionsResponse() { for (ApiKeys apiKey : ApiKeys.values()) { if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) { switch (apiKey) { + case FETCH: + // V4 added MessageSets responses. We need to make sure RecordBatch format is not used + versionList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4, apiKey.latestVersion())); + break; case LIST_OFFSETS: + // V0 is needed for librdkafka versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, apiKey.latestVersion())); break; default: diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 353630f0f1..768f18be47 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -17,6 +17,7 @@ import org.testng.annotations.Test; import java.nio.file.Paths; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.testng.AssertJUnit.assertFalse; @@ -28,10 +29,12 @@ public class KafkaIntegrationTest extends MockKafkaServiceBaseTest { @DataProvider public static Object[][] integrations() { return new Object[][] { - {"golang-sarama", "my-sarama-topic"}, - {"golang-sarama", "persistent://public/default/my-sarama-topic-full-name"}, - {"golang-confluent-kafka", "confluent-go"}, - {"rustlang-rdkafka", "rustlang-topic"}, + {"golang-sarama", Optional.empty(), true, true}, + {"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true}, + {"golang-confluent-kafka", Optional.empty(), true, true}, + {"rustlang-rdkafka", Optional.empty(), true, true}, + // consumer is broken, see integrations/README.md + {"node-kafka-node", Optional.empty(), true, false}, }; } @@ -74,15 +77,15 @@ protected void setup() throws Exception { } @Test(timeOut = 120_000, dataProvider = "integrations") - void simpleProduceAndConsume(String integration, String topic) throws Exception { + void simpleProduceAndConsume(String integration, Optional topic, boolean shouldProduce, boolean shouldConsume) throws Exception { - getAdmin().topics().createPartitionedTopic(topic, 1); + getAdmin().topics().createPartitionedTopic(topic.orElse(integration), 1); GenericContainer producer = new GenericContainer<>( new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_PRODUCE", "true") - .withEnv("KOP_TOPIC", topic) + .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_NBR_MESSAGES", "10") .withEnv("KOP_EXPECT_MESSAGES", "10") .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(log)) @@ -92,7 +95,7 @@ void simpleProduceAndConsume(String integration, String topic) throws Exception GenericContainer consumer = new GenericContainer<>( new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) - .withEnv("KOP_TOPIC", topic) + .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_CONSUME", "true") .withEnv("KOP_NBR_MESSAGES", "10") .withEnv("KOP_EXPECT_MESSAGES", "10") @@ -100,21 +103,31 @@ void simpleProduceAndConsume(String integration, String topic) throws Exception .waitingFor(Wait.forLogMessage("starting to consume\\n", 1)) .withNetworkMode("host"); - producer.start(); - WaitingConsumer consumerWaitingConsumer = createLogFollower(producer); - System.out.println("producer started"); - consumer.start(); - WaitingConsumer producerWaitingConsumer = createLogFollower(consumer); - System.out.println("consumer started"); + WaitingConsumer producerWaitingConsumer = null; + WaitingConsumer consumerWaitingConsumer = null; + if (shouldProduce) { + producer.start(); + producerWaitingConsumer = createLogFollower(producer); + System.out.println("producer started"); + } + if (shouldConsume) { + consumer.start(); + consumerWaitingConsumer = createLogFollower(consumer); + System.out.println("consumer started"); + } - producerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); - consumerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + if (shouldProduce) { + producerWaitingConsumer.waitUntil(frame -> + frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + checkForErrorsInLogs(producer.getLogs()); + } - checkForErrorsInLogs(producer.getLogs()); - checkForErrorsInLogs(consumer.getLogs()); + if (shouldConsume) { + consumerWaitingConsumer.waitUntil(frame -> + frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + checkForErrorsInLogs(consumer.getLogs()); + } } @Override From f6aeb7e718ab32d3b3c0e97f9af2fa977997cf5c Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Mon, 6 Jan 2020 17:21:38 +0100 Subject: [PATCH 08/21] Initial commit for node-rdkafka --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 768f18be47..be8141c08d 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -35,6 +35,7 @@ public static Object[][] integrations() { {"rustlang-rdkafka", Optional.empty(), true, true}, // consumer is broken, see integrations/README.md {"node-kafka-node", Optional.empty(), true, false}, + {"node-rdkafka", Optional.empty(), true, true}, }; } @@ -76,7 +77,7 @@ protected void setup() throws Exception { Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort)); } - @Test(timeOut = 120_000, dataProvider = "integrations") + @Test(timeOut = 5 * 60_000, dataProvider = "integrations") void simpleProduceAndConsume(String integration, Optional topic, boolean shouldProduce, boolean shouldConsume) throws Exception { getAdmin().topics().createPartitionedTopic(topic.orElse(integration), 1); From a3e550560fe263ed88e01869035020947e9dc3b3 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Wed, 8 Jan 2020 10:42:18 +0100 Subject: [PATCH 09/21] checkstyle --- .../kop/KafkaIntegrationTest.java | 152 +++++++++++------- 1 file changed, 92 insertions(+), 60 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index be8141c08d..459572def4 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -1,6 +1,25 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.streamnative.kop; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + import com.google.common.collect.Sets; +import java.nio.file.Paths; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -16,19 +35,33 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.file.Paths; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; - +/** + * This class is running Integration tests for Kafka clients. + * It uses testcontainers to spawn containers that will either: + * * produce a number of messages + * * consume a number of messages + * + *

As testcontainers is not capable of checking exitCode of the app running in the container, + * Every container should print the exitCode in stdout. + * + *

This class is waiting for some precise logs to come-up: + * * "ready to produce" + * * "ready to consume" + * * "produced all messages successfully" + * * "consumed all messages successfully" + * + *

This class is using environment variables to control the containers, such as: + * * broker address, + * * topic name, + * * produce or consume mode, + * * how many message to produce/consume, + */ @Slf4j public class KafkaIntegrationTest extends MockKafkaServiceBaseTest { @DataProvider public static Object[][] integrations() { - return new Object[][] { + return new Object[][]{ {"golang-sarama", Optional.empty(), true, true}, {"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true}, {"golang-confluent-kafka", Optional.empty(), true, true}, @@ -39,6 +72,29 @@ public static Object[][] integrations() { }; } + private static WaitingConsumer createLogFollower(final GenericContainer container) { + final WaitingConsumer waitingConsumer = new WaitingConsumer(); + container.followOutput(waitingConsumer); + return waitingConsumer; + } + + private static void checkForErrorsInLogs(final String logs) { + assertFalse(logs.contains("no available broker to send metadata request to")); + assertFalse(logs.contains("panic")); + assertFalse(logs.contains("correlation ID didn't match")); + assertFalse(logs.contains("Required feature not supported by broker")); + + + if (logs.contains("starting to produce")) { + assertTrue(logs.contains("produced all messages successfully")); + } + + if (logs.contains("starting to consume")) { + assertTrue(logs.contains("consumed all messages successfully")); + } + assertTrue(logs.contains("ExitCode=0")); + } + @BeforeClass @Override protected void setup() throws Exception { @@ -46,61 +102,62 @@ protected void setup() throws Exception { super.resetConfig(); super.internalSetup(); - if (!admin.clusters().getClusters().contains(configClusterName)) { + if (!this.admin.clusters().getClusters().contains(this.configClusterName)) { // so that clients can test short names - admin.clusters().createCluster(configClusterName, - new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + this.admin.clusters().createCluster(this.configClusterName, + new ClusterData("http://127.0.0.1:" + this.brokerWebservicePort)); } else { - admin.clusters().updateCluster(configClusterName, - new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + this.admin.clusters().updateCluster(this.configClusterName, + new ClusterData("http://127.0.0.1:" + this.brokerWebservicePort)); } - if (!admin.tenants().getTenants().contains("public")) { - admin.tenants().createTenant("public", + if (!this.admin.tenants().getTenants().contains("public")) { + this.admin.tenants().createTenant("public", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); } else { - admin.tenants().updateTenant("public", + this.admin.tenants().updateTenant("public", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); } - if (!admin.namespaces().getNamespaces("public").contains("public/default")) { - admin.namespaces().createNamespace("public/default"); - admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); - admin.namespaces().setRetention("public/default", + if (!this.admin.namespaces().getNamespaces("public").contains("public/default")) { + this.admin.namespaces().createNamespace("public/default"); + this.admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + this.admin.namespaces().setRetention("public/default", new RetentionPolicies(60, 1000)); } - if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) { - admin.namespaces().createNamespace("public/__kafka"); - admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); - admin.namespaces().setRetention("public/__kafka", + if (!this.admin.namespaces().getNamespaces("public").contains("public/__kafka")) { + this.admin.namespaces().createNamespace("public/__kafka"); + this.admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); + this.admin.namespaces().setRetention("public/__kafka", new RetentionPolicies(-1, -1)); } Testcontainers.exposeHostPorts(ImmutableMap.of(super.kafkaBrokerPort, super.kafkaBrokerPort)); } - @Test(timeOut = 5 * 60_000, dataProvider = "integrations") - void simpleProduceAndConsume(String integration, Optional topic, boolean shouldProduce, boolean shouldConsume) throws Exception { + @Test(timeOut = 3 * 60_000, dataProvider = "integrations") + void simpleProduceAndConsume(final String integration, final Optional topic, + final boolean shouldProduce, final boolean shouldConsume) throws Exception { - getAdmin().topics().createPartitionedTopic(topic.orElse(integration), 1); + this.getAdmin().topics().createPartitionedTopic(topic.orElse(integration), 1); - GenericContainer producer = new GenericContainer<>( + final GenericContainer producer = new GenericContainer<>( new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_PRODUCE", "true") .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_NBR_MESSAGES", "10") .withEnv("KOP_EXPECT_MESSAGES", "10") - .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(log)) + .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(KafkaIntegrationTest.log)) .waitingFor(Wait.forLogMessage("starting to produce\\n", 1)) .withNetworkMode("host"); - GenericContainer consumer = new GenericContainer<>( + final GenericContainer consumer = new GenericContainer<>( new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_CONSUME", "true") .withEnv("KOP_NBR_MESSAGES", "10") .withEnv("KOP_EXPECT_MESSAGES", "10") - .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(log)) + .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(KafkaIntegrationTest.log)) .waitingFor(Wait.forLogMessage("starting to consume\\n", 1)) .withNetworkMode("host"); @@ -108,26 +165,26 @@ void simpleProduceAndConsume(String integration, Optional topic, boolean WaitingConsumer consumerWaitingConsumer = null; if (shouldProduce) { producer.start(); - producerWaitingConsumer = createLogFollower(producer); + producerWaitingConsumer = KafkaIntegrationTest.createLogFollower(producer); System.out.println("producer started"); } if (shouldConsume) { consumer.start(); - consumerWaitingConsumer = createLogFollower(consumer); + consumerWaitingConsumer = KafkaIntegrationTest.createLogFollower(consumer); System.out.println("consumer started"); } if (shouldProduce) { producerWaitingConsumer.waitUntil(frame -> frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); - checkForErrorsInLogs(producer.getLogs()); + KafkaIntegrationTest.checkForErrorsInLogs(producer.getLogs()); } if (shouldConsume) { consumerWaitingConsumer.waitUntil(frame -> frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); - checkForErrorsInLogs(consumer.getLogs()); + KafkaIntegrationTest.checkForErrorsInLogs(consumer.getLogs()); } } @@ -136,29 +193,4 @@ void simpleProduceAndConsume(String integration, Optional topic, boolean protected void cleanup() throws Exception { super.internalCleanup(); } - - - private WaitingConsumer createLogFollower(GenericContainer container) { - WaitingConsumer waitingConsumer = new WaitingConsumer(); - container.followOutput(waitingConsumer); - return waitingConsumer; - } - - private void checkForErrorsInLogs(String logs) { - assertFalse(logs.contains("no available broker to send metadata request to")); - assertFalse(logs.contains("panic")); - assertFalse(logs.contains("correlation ID didn't match")); - assertFalse(logs.contains("Required feature not supported by broker")); - - - if (logs.contains("starting to produce")) { - assertTrue(logs.contains("produced all messages successfully")); - } - - if (logs.contains("starting to consume")) { - assertTrue(logs.contains("received msg")); - assertTrue(logs.contains("limit reached, exiting")); - } - assertTrue(logs.contains("ExitCode=0")); - } } From 5275d002daf665e1ec0fc27b7696e3d50749243b Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Wed, 8 Jan 2020 11:19:12 +0100 Subject: [PATCH 10/21] Refactor env vars to simplify describing a limit --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 459572def4..968c914ed8 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -144,8 +144,7 @@ void simpleProduceAndConsume(final String integration, final Optional to .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_PRODUCE", "true") .withEnv("KOP_TOPIC", topic.orElse(integration)) - .withEnv("KOP_NBR_MESSAGES", "10") - .withEnv("KOP_EXPECT_MESSAGES", "10") + .withEnv("KOP_LIMIT", "10") .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(KafkaIntegrationTest.log)) .waitingFor(Wait.forLogMessage("starting to produce\\n", 1)) .withNetworkMode("host"); @@ -155,8 +154,7 @@ void simpleProduceAndConsume(final String integration, final Optional to .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_CONSUME", "true") - .withEnv("KOP_NBR_MESSAGES", "10") - .withEnv("KOP_EXPECT_MESSAGES", "10") + .withEnv("KOP_LIMIT", "10") .withLogConsumer(new org.testcontainers.containers.output.Slf4jLogConsumer(KafkaIntegrationTest.log)) .waitingFor(Wait.forLogMessage("starting to consume\\n", 1)) .withNetworkMode("host"); From d85f37b9da2bb2dc46e1f006e7b7cc94c5e8b269 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Wed, 8 Jan 2020 11:41:47 +0100 Subject: [PATCH 11/21] mvn license:format --- .../io/streamnative/kop/KafkaIntegrationTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 968c914ed8..a4bf956161 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -1,3 +1,16 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /** * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 17db8e243972b97bcbe3bbedd77c116486449114 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Thu, 16 Jan 2020 10:45:58 +0100 Subject: [PATCH 12/21] rust-integration: Ensure the produce future is polled --- integrations/rustlang-rdkafka/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/rustlang-rdkafka/src/main.rs b/integrations/rustlang-rdkafka/src/main.rs index 145260d07c..0f0620bd8f 100644 --- a/integrations/rustlang-rdkafka/src/main.rs +++ b/integrations/rustlang-rdkafka/src/main.rs @@ -105,7 +105,7 @@ async fn produce(brokers: &str, topic_name: &str, limit: i8) -> Result<(), std:: // This loop will wait until all delivery statuses have been received received. for future in futures { - info!("Future completed. Result: {:?}", future.await); + println!("Future completed. Result: {:?}", future.await); } println!( "produced all messages successfully ({})", From 62267c9381aee61b336ec11319e9a4315d0b56fc Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 16 Jan 2020 20:54:18 +0800 Subject: [PATCH 13/21] Use pre-built docker image --- .../java/io/streamnative/kop/KafkaIntegrationTest.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index a4bf956161..1de259c1a5 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -30,7 +30,6 @@ import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Sets; -import java.nio.file.Paths; import java.util.Optional; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -42,7 +41,6 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.WaitingConsumer; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -152,8 +150,7 @@ void simpleProduceAndConsume(final String integration, final Optional to this.getAdmin().topics().createPartitionedTopic(topic.orElse(integration), 1); - final GenericContainer producer = new GenericContainer<>( - new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) + final GenericContainer producer = new GenericContainer<>("streamnative/kop-test-" + integration) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_PRODUCE", "true") .withEnv("KOP_TOPIC", topic.orElse(integration)) @@ -162,8 +159,7 @@ void simpleProduceAndConsume(final String integration, final Optional to .waitingFor(Wait.forLogMessage("starting to produce\\n", 1)) .withNetworkMode("host"); - final GenericContainer consumer = new GenericContainer<>( - new ImageFromDockerfile().withFileFromPath(".", Paths.get("integrations/" + integration))) + final GenericContainer consumer = new GenericContainer<>("streamnative/kop-test-" + integration) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_CONSUME", "true") From 03e59bbd83ec6be842a7dc6fb0a16d5de64e9fc5 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Mon, 20 Jan 2020 10:37:18 +0100 Subject: [PATCH 14/21] integration: Bump waitUntil time *Motivation* Some integrations tests are randomly failing on Github Actions. --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 1de259c1a5..6c95c6060a 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -184,13 +184,13 @@ void simpleProduceAndConsume(final String integration, final Optional to if (shouldProduce) { producerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(producer.getLogs()); } if (shouldConsume) { consumerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(consumer.getLogs()); } } From 4c30f1eb6154675fd1ad63f550cf5578bfbd9a57 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Thu, 23 Jan 2020 13:58:42 +0100 Subject: [PATCH 15/21] mvn checkstyle:check --- .../streamnative/kop/KafkaRequestHandler.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index c7d04e96b5..687bb9c5ee 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -216,11 +216,13 @@ protected ApiVersionsResponse overloadDefaultApiVersionsResponse() { switch (apiKey) { case FETCH: // V4 added MessageSets responses. We need to make sure RecordBatch format is not used - versionList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4, apiKey.latestVersion())); + versionList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4, + apiKey.latestVersion())); break; case LIST_OFFSETS: // V0 is needed for librdkafka - versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, apiKey.latestVersion())); + versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, + apiKey.latestVersion())); break; default: versionList.add(new ApiVersionsResponse.ApiVersion(apiKey)); @@ -446,8 +448,8 @@ protected CompletableFuture handleTopicMetadataRequest(KafkaHe Errors.NONE, // we should answer with the right name, either local of full-name, // depending on what was asked - topic.startsWith("persistent://") ? - TopicName.get(topic).toString(): TopicName.get(topic).getLocalName(), + topic.startsWith("persistent://") + ? TopicName.get(topic).toString() : TopicName.get(topic).getLocalName(), false, partitionMetadatas)); @@ -658,7 +660,8 @@ protected CompletableFuture handleOffsetFetchRequest(KafkaHead if (legacyMode) { partitionData.complete(new ListOffsetResponse.PartitionData( Errors.NONE, - Collections.singletonList(MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId())))); + Collections.singletonList(MessageIdUtils.getOffset(position.getLedgerId(), + position.getEntryId())))); } else { partitionData.complete(new ListOffsetResponse.PartitionData( Errors.NONE, @@ -705,7 +708,9 @@ public void findEntryComplete(Position position, Object ctx) { if (legacyMode) { partitionData.complete(new ListOffsetResponse.PartitionData( Errors.NONE, - Collections.singletonList(MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId())))); + Collections.singletonList( + MessageIdUtils.getOffset( + finalPosition.getLedgerId(), finalPosition.getEntryId())))); } else { partitionData.complete(new ListOffsetResponse.PartitionData( Errors.NONE, @@ -770,7 +775,8 @@ private CompletableFuture handleListOffsetRequestV1AndAbove(Ka return resultFuture; } - // Some info can be found here https://web.archive.org/web/20170309152525/https://cfchou.github.io/blog/2015/04/23/a-closer-look-at-kafka-offsetrequest/ + // Some info can be found here + // https://cfchou.github.io/blog/2015/04/23/a-closer-look-at-kafka-offsetrequest/ through web.archive.org private CompletableFuture handleListOffsetRequestV0(KafkaHeaderAndRequest listOffset) { ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); @@ -788,7 +794,8 @@ private CompletableFuture handleListOffsetRequestV0(KafkaHeade // num_num_offsets > 1 is not handled for now, returning an error if (tms.getValue().maxNumOffsets > 1) { - log.warn("request is asking for multiples offsets for {}, not supported for now", pulsarTopic.toString()); + log.warn("request is asking for multiples offsets for {}, not supported for now", + pulsarTopic.toString()); partitionData = new CompletableFuture<>(); partitionData.complete(new ListOffsetResponse .PartitionData( From 56d638658639fe3edc77035db726f44b310a8353 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Thu, 23 Jan 2020 14:20:59 +0100 Subject: [PATCH 16/21] integration: Bump waitUntil time (again) --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 6c95c6060a..ce71b819cf 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -184,13 +184,13 @@ void simpleProduceAndConsume(final String integration, final Optional to if (shouldProduce) { producerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 120, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(producer.getLogs()); } if (shouldConsume) { consumerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 120, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(consumer.getLogs()); } } From 5716b2398342a78f9d94f14e0ffe2325f7a5284f Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Tue, 4 Feb 2020 09:59:20 +0100 Subject: [PATCH 17/21] removing rustlang-rdkafka from integrations tests *Motivation* rustlang-rdkafka is working locally and not in Github Actions, opening a separate issue to handle it. --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index ce71b819cf..75462fadfb 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -76,7 +76,8 @@ public static Object[][] integrations() { {"golang-sarama", Optional.empty(), true, true}, {"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true}, {"golang-confluent-kafka", Optional.empty(), true, true}, - {"rustlang-rdkafka", Optional.empty(), true, true}, + // TODO: rustlang-rdkafka is failing on Github Actions and works locally, we need to investigate + // {"rustlang-rdkafka", Optional.empty(), true, true}, // consumer is broken, see integrations/README.md {"node-kafka-node", Optional.empty(), true, false}, {"node-rdkafka", Optional.empty(), true, true}, From c309d7b2439125c519812b01691e92489129934f Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Tue, 4 Feb 2020 10:01:42 +0100 Subject: [PATCH 18/21] Revert "integration: Bump waitUntil time (again)" This reverts commit 75a7f23182ec6488bef17c2e10ef85765b19c7f3. --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 75462fadfb..89a530d0fd 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -185,13 +185,13 @@ void simpleProduceAndConsume(final String integration, final Optional to if (shouldProduce) { producerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 120, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(producer.getLogs()); } if (shouldConsume) { consumerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 120, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(consumer.getLogs()); } } From 3866d995b74afc324f3cdb03c6cc8a29584f21b3 Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Tue, 4 Feb 2020 10:02:01 +0100 Subject: [PATCH 19/21] Revert "integration: Bump waitUntil time" This reverts commit 255a2f8cc924fe3607f2393bcb9c2aaaaca86ca8. --- src/test/java/io/streamnative/kop/KafkaIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 89a530d0fd..8ba85bbeeb 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -185,13 +185,13 @@ void simpleProduceAndConsume(final String integration, final Optional to if (shouldProduce) { producerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(producer.getLogs()); } if (shouldConsume) { consumerWaitingConsumer.waitUntil(frame -> - frame.getUtf8String().contains("ExitCode"), 60, TimeUnit.SECONDS); + frame.getUtf8String().contains("ExitCode"), 30, TimeUnit.SECONDS); KafkaIntegrationTest.checkForErrorsInLogs(consumer.getLogs()); } } From b4d99e044685200668f056d0ecca94afb5b36098 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 4 Feb 2020 21:09:08 -0800 Subject: [PATCH 20/21] Upload surefires --- .github/workflows/pr-test.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/.github/workflows/pr-test.yml b/.github/workflows/pr-test.yml index b1da731fda..9d86088e3c 100644 --- a/.github/workflows/pr-test.yml +++ b/.github/workflows/pr-test.yml @@ -28,3 +28,18 @@ jobs: - name: Build with Maven run: mvn clean install + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip From b33e56a5f25fd323fe9b7ee7e0200d44ac3ae96c Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Wed, 5 Feb 2020 17:58:35 +0100 Subject: [PATCH 21/21] Fixing using right topicName *Motivation* We need to be able to support both short and fully-qualified topic name. --- .../java/io/streamnative/kop/KafkaRequestHandler.java | 9 ++++++++- .../java/io/streamnative/kop/utils/TopicNameUtils.java | 7 +++++++ .../java/io/streamnative/kop/KafkaIntegrationTest.java | 6 +++++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index 687bb9c5ee..9065cd4b8c 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -124,6 +124,7 @@ import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; @@ -309,7 +310,13 @@ protected CompletableFuture handleTopicMetadataRequest(KafkaHe requestTopics.stream() .forEach(topic -> { - TopicName pulsarTopicName = pulsarTopicName(topic, namespace); + + TopicName pulsarTopicName; + if (topic.startsWith(TopicDomain.persistent.value()) && topic.contains(namespace.getLocalName())) { + pulsarTopicName = pulsarTopicName(topic); + } else { + pulsarTopicName = pulsarTopicName(topic, namespace); + } // get partition numbers for each topic. getPartitionedTopicMetadataAsync(pulsarTopicName.toString()) diff --git a/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java b/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java index 42361f1f32..ed413635de 100644 --- a/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java +++ b/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java @@ -46,6 +46,13 @@ public static TopicName pulsarTopicName(String topic) { } public static TopicName pulsarTopicName(String topic, int partitionIndex, NamespaceName namespace) { + if (topic.startsWith(TopicDomain.persistent.value())) { + topic = topic.replace(TopicDomain.persistent.value() + "://", ""); + } + + if (topic.contains(namespace.getNamespaceObject().toString())) { + topic = topic.replace(namespace.getNamespaceObject().toString() + "/", ""); + } return TopicName.get(TopicDomain.persistent.value(), namespace, topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex); diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 8ba85bbeeb..8941561898 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -148,8 +148,12 @@ protected void setup() throws Exception { @Test(timeOut = 3 * 60_000, dataProvider = "integrations") void simpleProduceAndConsume(final String integration, final Optional topic, final boolean shouldProduce, final boolean shouldConsume) throws Exception { + String topicName = topic.orElse(integration); + System.out.println("starting integration " + integration + " with topicName " + topicName); - this.getAdmin().topics().createPartitionedTopic(topic.orElse(integration), 1); + this.getAdmin().topics().createPartitionedTopic(topicName, 1); + + System.out.println("topic created"); final GenericContainer producer = new GenericContainer<>("streamnative/kop-test-" + integration) .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort)