From d216bc01bd9186eafba8b5858c78e23fedeeca93 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Fri, 9 Nov 2018 16:37:38 -0500 Subject: [PATCH 1/4] MINOR: Adding system test for named repartition topics --- .../tests/StreamsNamedRepartitionTest.java | 128 ++++++++++++++++++ tests/kafkatest/services/streams.py | 22 +++ .../streams_named_repartition_topic_test.py | 119 ++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java create mode 100644 tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java new file mode 100644 index 0000000000000..4d98f7ab93ab6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.TaskMetadata; +import org.apache.kafka.streams.processor.ThreadMetadata; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; + +public class StreamsNamedRepartitionTest { + + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: "); + } + final String propFileName = args.length > 0 ? args[0] : null; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST"); + System.out.println("props=" + streamsProperties); + + final String inputTopic = (String) streamsProperties.remove("input.topic"); + final String aggregationTopic = (String) streamsProperties.remove("aggregation.topic"); + final boolean addOperators = Boolean.valueOf((String) streamsProperties.remove("add.operations")); + + + final Initializer initializer = () -> 0; + final Aggregator aggregator = (k, v, agg) -> agg + Integer.parseInt(v); + + final Function keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); + + final KStream mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); + + final KStream maybeUpdatedStream; + + if (addOperators) { + maybeUpdatedStream = mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1)); + } else { + maybeUpdatedStream = mappedStream; + } + + maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String())) + .aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer())) + .toStream() + .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))) + .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer())); + + final Properties config = new Properties(); + + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest"); + config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + + + config.putAll(streamsProperties); + + final Topology topology = builder.build(config); + final KafkaStreams streams = new KafkaStreams(topology, config); + + + streams.setStateListener((oldState, newState) -> { + if (oldState == State.REBALANCING && newState == State.RUNNING) { + if(addOperators) { + System.out.println("REBALANCING -> RUNNING with UPDATED Topology"); + } else { + System.out.println("REBALANCING -> RUNNING"); + } + final Set threadMetadata = streams.localThreadsMetadata(); + for (final ThreadMetadata data : threadMetadata) { + System.out.println(String.format("TASK_ASSIGNMENT -> %s", data.activeTasks())); + } + System.out.flush(); + } + }); + + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(Duration.ofMillis(5000)); + System.out.println("NAMED_REPARTITION_TEST Streams Stopped"); + System.out.flush(); + })); + + } + +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 67e6f02874459..629cd6c49095e 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -480,3 +480,25 @@ def start_cmd(self, node): self.logger.info("Executing: " + cmd) return cmd + + +class StreamsNamedRepartitionTopicService(StreamsTestBaseService): + def __init__(self, test_context, kafka): + super(StreamsNamedRepartitionTopicService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StreamsNamedRepartitionTest", + "") + self.ADD_ADDITIONAL_OPS = 'false' + self.INPUT_TOPIC = None + self.AGGREGATION_TOPIC = None + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()} + + properties['input.topic'] = self.INPUT_TOPIC + properties['aggregation.topic'] = self.AGGREGATION_TOPIC + properties['add.operations'] = self.ADD_ADDITIONAL_OPS + + cfg = KafkaConfig(**properties) + return cfg.render() diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py new file mode 100644 index 0000000000000..11f3d8b1502b0 --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import KafkaService +from kafkatest.services.streams import StreamsNamedRepartitionTopicService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.zookeeper import ZookeeperService + + +class StreamsNamedRepartitionTopicTest(Test): + """ + Tests using a named repartition topic by starting + application then doing a rolling upgrade with added + operations and the application still runs + """ + + input_topic = 'inputTopic' + aggregation_topic = 'aggregationTopic' + pattern = 'AGGREGATED' + + def __init__(self, test_context): + super(StreamsNamedRepartitionTopicTest, self).__init__(test_context) + self.topics = { + self.input_topic: {'partitions': 6}, + self.aggregation_topic: {'partitions': 6} + } + + self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=3, + zk=self.zookeeper, topics=self.topics) + + self.producer = VerifiableProducer(self.test_context, + 1, + self.kafka, + self.input_topic, + throughput=1000, + acks=1) + + def test_upgrade_topology_with_named_repartition_topic(self): + self.zookeeper.start() + self.kafka.start() + + processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + + processors = [processor1, processor2, processor3] + + self.producer.start() + + for processor in processors: + processor.CLEAN_NODE_ENABLED = False + self.set_topics(processor) + processor.start() + self.verify_running(processor, 'REBALANCING -> RUNNING') + + self.verify_processing(processors) + + # do rolling upgrade + for processor in processors: + self.verify_stopped(processor) + # will tell app to add operations before repartition topic + processor.ADD_ADDITIONAL_OPS = 'true' + processor.start() + self.verify_running(processor, 'REBALANCING -> RUNNING with UPDATED Topology') + + self.verify_processing(processors) + + self.stop_processors(processors) + + self.producer.stop() + self.kafka.stop() + self.zookeeper.stop() + + @staticmethod + def verify_running(processor, message): + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + monitor.wait_until(message, + timeout_sec=120, + err_msg="Never saw '%s' message " % message + str(processor.node.account)) + + @staticmethod + def verify_stopped(processor): + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.stop() + monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped', + timeout_sec=120, + err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account)) + + def verify_processing(self, processors): + for processor in processors: + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + monitor.wait_until(self.pattern, + timeout_sec=120, + err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account)) + + def stop_processors(self, processors): + for processor in processors: + self.verify_stopped(processor) + + def set_topics(self, processor): + processor.INPUT_TOPIC = self.input_topic + processor.AGGREGATION_TOPIC = self.aggregation_topic From 82f2fd102bc0bb7d50496f42a07684e19f14edbd Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 14 Nov 2018 11:13:09 -0500 Subject: [PATCH 2/4] MINOR: Fix checkstyle errors, reduce timeouts --- .../kafka/streams/tests/StreamsNamedRepartitionTest.java | 5 +---- .../tests/streams/streams_named_repartition_topic_test.py | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 4d98f7ab93ab6..837a306d581d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -31,12 +31,9 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import java.util.Properties; import java.util.Set; import java.util.function.Function; @@ -100,7 +97,7 @@ public static void main(final String[] args) throws Exception { streams.setStateListener((oldState, newState) -> { if (oldState == State.REBALANCING && newState == State.RUNNING) { - if(addOperators) { + if (addOperators) { System.out.println("REBALANCING -> RUNNING with UPDATED Topology"); } else { System.out.println("REBALANCING -> RUNNING"); diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py index 11f3d8b1502b0..bf45af64f92e6 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -91,7 +91,7 @@ def test_upgrade_topology_with_named_repartition_topic(self): def verify_running(processor, message): with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: monitor.wait_until(message, - timeout_sec=120, + timeout_sec=60, err_msg="Never saw '%s' message " % message + str(processor.node.account)) @staticmethod @@ -100,14 +100,14 @@ def verify_stopped(processor): with node.account.monitor_log(processor.STDOUT_FILE) as monitor: processor.stop() monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped', - timeout_sec=120, + timeout_sec=60, err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account)) def verify_processing(self, processors): for processor in processors: with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: monitor.wait_until(self.pattern, - timeout_sec=120, + timeout_sec=60, err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account)) def stop_processors(self, processors): From c759435c23ece09ae090958cf5f513a40c929c93 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Fri, 16 Nov 2018 09:23:34 -0500 Subject: [PATCH 3/4] MINOR: Updates per comments --- .../tests/StreamsNamedRepartitionTest.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 837a306d581d4..8f79f13239635 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -31,29 +31,28 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.processor.ThreadMetadata; import java.time.Duration; +import java.util.Objects; import java.util.Properties; -import java.util.Set; import java.util.function.Function; public class StreamsNamedRepartitionTest { public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: "); + System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: "); } - final String propFileName = args.length > 0 ? args[0] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST"); System.out.println("props=" + streamsProperties); - final String inputTopic = (String) streamsProperties.remove("input.topic"); - final String aggregationTopic = (String) streamsProperties.remove("aggregation.topic"); - final boolean addOperators = Boolean.valueOf((String) streamsProperties.remove("add.operations")); + final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic"))); + final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic"))); + final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations"))); final Initializer initializer = () -> 0; @@ -102,10 +101,6 @@ public static void main(final String[] args) throws Exception { } else { System.out.println("REBALANCING -> RUNNING"); } - final Set threadMetadata = streams.localThreadsMetadata(); - for (final ThreadMetadata data : threadMetadata) { - System.out.println(String.format("TASK_ASSIGNMENT -> %s", data.activeTasks())); - } System.out.flush(); } }); From 91c2b7c487a3ea3fb84ce3dce96c0a4b97237d48 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Tue, 27 Nov 2018 11:21:11 -0500 Subject: [PATCH 4/4] MINOR: updates per comments --- .../kafka/streams/tests/StreamsNamedRepartitionTest.java | 2 +- .../tests/streams/streams_named_repartition_topic_test.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 8f79f13239635..660de397a0f92 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -97,7 +97,7 @@ public static void main(final String[] args) throws Exception { streams.setStateListener((oldState, newState) -> { if (oldState == State.REBALANCING && newState == State.RUNNING) { if (addOperators) { - System.out.println("REBALANCING -> RUNNING with UPDATED Topology"); + System.out.println("UPDATED Topology"); } else { System.out.println("REBALANCING -> RUNNING"); } diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py index bf45af64f92e6..5baf612d6c6cd 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -41,7 +41,7 @@ def __init__(self, test_context): } self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) - self.kafka = KafkaService(self.test_context, num_nodes=3, + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zookeeper, topics=self.topics) self.producer = VerifiableProducer(self.test_context, @@ -77,7 +77,7 @@ def test_upgrade_topology_with_named_repartition_topic(self): # will tell app to add operations before repartition topic processor.ADD_ADDITIONAL_OPS = 'true' processor.start() - self.verify_running(processor, 'REBALANCING -> RUNNING with UPDATED Topology') + self.verify_running(processor, 'UPDATED Topology') self.verify_processing(processors)