diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java new file mode 100644 index 0000000000000..660de397a0f92 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; + +import java.time.Duration; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Function; + +public class StreamsNamedRepartitionTest { + + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: "); + } + final String propFileName = args[0]; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST"); + System.out.println("props=" + streamsProperties); + + final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic"))); + final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic"))); + final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations"))); + + + final Initializer initializer = () -> 0; + final Aggregator aggregator = (k, v, agg) -> agg + Integer.parseInt(v); + + final Function keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); + + final KStream mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); + + final KStream maybeUpdatedStream; + + if (addOperators) { + maybeUpdatedStream = mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1)); + } else { + maybeUpdatedStream = mappedStream; + } + + maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String())) + .aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer())) + .toStream() + .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))) + .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer())); + + final Properties config = new Properties(); + + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest"); + config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + + + config.putAll(streamsProperties); + + final Topology topology = builder.build(config); + final KafkaStreams streams = new KafkaStreams(topology, config); + + + streams.setStateListener((oldState, newState) -> { + if (oldState == State.REBALANCING && newState == State.RUNNING) { + if (addOperators) { + System.out.println("UPDATED Topology"); + } else { + System.out.println("REBALANCING -> RUNNING"); + } + System.out.flush(); + } + }); + + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(Duration.ofMillis(5000)); + System.out.println("NAMED_REPARTITION_TEST Streams Stopped"); + System.out.flush(); + })); + + } + +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 67e6f02874459..629cd6c49095e 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -480,3 +480,25 @@ def start_cmd(self, node): self.logger.info("Executing: " + cmd) return cmd + + +class StreamsNamedRepartitionTopicService(StreamsTestBaseService): + def __init__(self, test_context, kafka): + super(StreamsNamedRepartitionTopicService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StreamsNamedRepartitionTest", + "") + self.ADD_ADDITIONAL_OPS = 'false' + self.INPUT_TOPIC = None + self.AGGREGATION_TOPIC = None + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()} + + properties['input.topic'] = self.INPUT_TOPIC + properties['aggregation.topic'] = self.AGGREGATION_TOPIC + properties['add.operations'] = self.ADD_ADDITIONAL_OPS + + cfg = KafkaConfig(**properties) + return cfg.render() diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py new file mode 100644 index 0000000000000..5baf612d6c6cd --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import KafkaService +from kafkatest.services.streams import StreamsNamedRepartitionTopicService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.zookeeper import ZookeeperService + + +class StreamsNamedRepartitionTopicTest(Test): + """ + Tests using a named repartition topic by starting + application then doing a rolling upgrade with added + operations and the application still runs + """ + + input_topic = 'inputTopic' + aggregation_topic = 'aggregationTopic' + pattern = 'AGGREGATED' + + def __init__(self, test_context): + super(StreamsNamedRepartitionTopicTest, self).__init__(test_context) + self.topics = { + self.input_topic: {'partitions': 6}, + self.aggregation_topic: {'partitions': 6} + } + + self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=1, + zk=self.zookeeper, topics=self.topics) + + self.producer = VerifiableProducer(self.test_context, + 1, + self.kafka, + self.input_topic, + throughput=1000, + acks=1) + + def test_upgrade_topology_with_named_repartition_topic(self): + self.zookeeper.start() + self.kafka.start() + + processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + + processors = [processor1, processor2, processor3] + + self.producer.start() + + for processor in processors: + processor.CLEAN_NODE_ENABLED = False + self.set_topics(processor) + processor.start() + self.verify_running(processor, 'REBALANCING -> RUNNING') + + self.verify_processing(processors) + + # do rolling upgrade + for processor in processors: + self.verify_stopped(processor) + # will tell app to add operations before repartition topic + processor.ADD_ADDITIONAL_OPS = 'true' + processor.start() + self.verify_running(processor, 'UPDATED Topology') + + self.verify_processing(processors) + + self.stop_processors(processors) + + self.producer.stop() + self.kafka.stop() + self.zookeeper.stop() + + @staticmethod + def verify_running(processor, message): + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + monitor.wait_until(message, + timeout_sec=60, + err_msg="Never saw '%s' message " % message + str(processor.node.account)) + + @staticmethod + def verify_stopped(processor): + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.stop() + monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped', + timeout_sec=60, + err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account)) + + def verify_processing(self, processors): + for processor in processors: + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + monitor.wait_until(self.pattern, + timeout_sec=60, + err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account)) + + def stop_processors(self, processors): + for processor in processors: + self.verify_stopped(processor) + + def set_topics(self, processor): + processor.INPUT_TOPIC = self.input_topic + processor.AGGREGATION_TOPIC = self.aggregation_topic