Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.tests;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;

public class StreamsNamedRepartitionTest {

public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: ");
}
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST");
System.out.println("props=" + streamsProperties);

final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations")));


final Initializer<Integer> initializer = () -> 0;
final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + Integer.parseInt(v);

final Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);

final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));

final KStream<String, String> maybeUpdatedStream;

if (addOperators) {
maybeUpdatedStream = mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1));
} else {
maybeUpdatedStream = mappedStream;
}

maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
.aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer()))
.toStream()
.peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
.to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));

final Properties config = new Properties();

config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());


config.putAll(streamsProperties);

final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);


streams.setStateListener((oldState, newState) -> {
if (oldState == State.REBALANCING && newState == State.RUNNING) {
if (addOperators) {
System.out.println("UPDATED Topology");
} else {
System.out.println("REBALANCING -> RUNNING");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it clarify the pattern matching in the ducktape driver if we make this not a prefix of the other case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

}
System.out.flush();
}
});

streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close(Duration.ofMillis(5000));
System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
System.out.flush();
}));

}

}
22 changes: 22 additions & 0 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,25 @@ def start_cmd(self, node):
self.logger.info("Executing: " + cmd)

return cmd


class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsNamedRepartitionTopicService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsNamedRepartitionTest",
"")
self.ADD_ADDITIONAL_OPS = 'false'
self.INPUT_TOPIC = None
self.AGGREGATION_TOPIC = None

def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}

properties['input.topic'] = self.INPUT_TOPIC
properties['aggregation.topic'] = self.AGGREGATION_TOPIC
properties['add.operations'] = self.ADD_ADDITIONAL_OPS

cfg = KafkaConfig(**properties)
return cfg.render()
119 changes: 119 additions & 0 deletions tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsNamedRepartitionTopicService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService


class StreamsNamedRepartitionTopicTest(Test):
"""
Tests using a named repartition topic by starting
application then doing a rolling upgrade with added
operations and the application still runs
"""

input_topic = 'inputTopic'
aggregation_topic = 'aggregationTopic'
pattern = 'AGGREGATED'

def __init__(self, test_context):
super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
self.topics = {
self.input_topic: {'partitions': 6},
self.aggregation_topic: {'partitions': 6}
}

self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=1,
zk=self.zookeeper, topics=self.topics)

self.producer = VerifiableProducer(self.test_context,
1,
self.kafka,
self.input_topic,
throughput=1000,
acks=1)

def test_upgrade_topology_with_named_repartition_topic(self):
self.zookeeper.start()
self.kafka.start()

processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)

processors = [processor1, processor2, processor3]

self.producer.start()

for processor in processors:
processor.CLEAN_NODE_ENABLED = False
self.set_topics(processor)
processor.start()
self.verify_running(processor, 'REBALANCING -> RUNNING')

self.verify_processing(processors)

# do rolling upgrade
for processor in processors:
self.verify_stopped(processor)
# will tell app to add operations before repartition topic
processor.ADD_ADDITIONAL_OPS = 'true'
processor.start()
self.verify_running(processor, 'UPDATED Topology')

self.verify_processing(processors)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the dumb question... I guess this only tails the file, so there's no danger of a false positive from seeing the patter in the file from before the restart?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct this method delegates to LogMonitor#wait_for which grabs the offset of the file when the object is created and waits for the specified pattern in the file from that point going forward

https://github.com/confluentinc/ducktape/blob/master/ducktape/cluster/remoteaccount.py#L586
https://github.com/confluentinc/ducktape/blob/master/ducktape/cluster/remoteaccount.py#L675


self.stop_processors(processors)

self.producer.stop()
self.kafka.stop()
self.zookeeper.stop()

@staticmethod
def verify_running(processor, message):
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(message,
timeout_sec=60,
err_msg="Never saw '%s' message " % message + str(processor.node.account))

@staticmethod
def verify_stopped(processor):
node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.stop()
monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
timeout_sec=60,
err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account))

def verify_processing(self, processors):
for processor in processors:
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(self.pattern,
timeout_sec=60,
err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account))

def stop_processors(self, processors):
for processor in processors:
self.verify_stopped(processor)

def set_topics(self, processor):
processor.INPUT_TOPIC = self.input_topic
processor.AGGREGATION_TOPIC = self.aggregation_topic