From 9696486acf51c6064ae92c3fa373d60338e4391f Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Wed, 21 Mar 2018 16:58:12 -0700 Subject: [PATCH 1/5] WorkerUtils method to get matching topic partitions --- .../kafka/trogdor/common/WorkerUtils.java | 77 ++++++++++++++++++- .../kafka/trogdor/common/WorkerUtilsTest.java | 36 ++++++++- 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 98dbf3836b692..3846d45324762 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -21,9 +21,14 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeTopicsOptions; import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; @@ -39,6 +44,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import java.util.regex.Pattern; /** * Utilities for Trogdor TaskWorkers. @@ -128,6 +134,32 @@ public static void createTopics( } } + /** + * Returns a list of all existing topic partitions that match the following criteria: topic + * name matches give regular expression 'topicRegex', topic is not internal, partitions are + * in range [startPartition, endPartition] + * + * @param log The logger to use. + * @param bootstrapServers The bootstrap server list. + * @param topicRegex Topic name regular expression + * @param startPartition Starting partition of partition range + * @param endPartition Ending partition of partition range + * @return List of topic partitions + * @throws Throwable If getting list of topics or their descriptions fails. + */ + public static Collection getMatchingTopicPartitions( + Logger log, String bootstrapServers, + Map commonClientConf, Map adminClientConf, + String topicRegex, int startPartition, int endPartition) throws Throwable { + try (AdminClient adminClient + = createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) { + return getMatchingTopicPartitions(adminClient, topicRegex, startPartition, endPartition); + } catch (Exception e) { + log.warn("Failed to get topic partitions matching {}", topicRegex, e); + throw e; + } + } + /** * The actual create topics functionality is separated into this method and called from the * above method to be able to unit test with mock adminClient. @@ -249,9 +281,50 @@ private static void verifyTopics( } } + /** + * Returns list of existing, not internal, topics/partitions that match given pattern and + * where partitions are in range [startPartition, endPartition] + * @param adminClient AdminClient + * @param topicRegex Topic regular expression to match + * @return list of topic names + * @throws Throwable If failed to get list of existing topics + */ + static Collection getMatchingTopicPartitions( + AdminClient adminClient, String topicRegex, int startPartition, int endPartition) + throws Throwable { + final Pattern topicNamePattern = Pattern.compile(topicRegex); + + // first get list of matching topics + List matchedTopics = new ArrayList<>(); + ListTopicsResult res = adminClient.listTopics( + new ListTopicsOptions().timeoutMs(CREATE_TOPICS_CALL_TIMEOUT)); + Map topicListingMap = res.namesToListings().get(); + for (Map.Entry topicListingEntry: topicListingMap.entrySet()) { + if (topicNamePattern.matcher(topicListingEntry.getKey()).matches() + && !topicListingEntry.getValue().isInternal()) { + matchedTopics.add(topicListingEntry.getKey()); + } + } + + // create a list of topic/partitions + List out = new ArrayList<>(); + DescribeTopicsResult topicsResult = adminClient.describeTopics( + matchedTopics, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); + Map topicDescriptionMap = topicsResult.all().get(); + for (TopicDescription desc: topicDescriptionMap.values()) { + List partitions = desc.partitions(); + for (TopicPartitionInfo info: partitions) { + if ((info.partition() >= startPartition) && (info.partition() <= endPartition)) { + out.add(new TopicPartition(desc.name(), info.partition())); + } + } + } + return out; + } + private static AdminClient createAdminClient( - String bootstrapServers, Map commonClientConf, - Map adminClientConf) { + String bootstrapServers, + Map commonClientConf, Map adminClientConf) { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java index fbe23899bc9dc..5f95ec9b484bd 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -18,9 +18,9 @@ package org.apache.kafka.trogdor.common; - import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.Node; @@ -35,9 +35,11 @@ import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -262,4 +264,36 @@ public void testClientConfigOverwritesBothDefaultAndCommonConfigs() { Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "0")); assertEquals(resultProps, props); } + + @Test + public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwable { + final String topic1 = "existing-topic"; + final String topic2 = "another-topic"; + final String topic3 = "one-more-topic"; + makeExistingTopicWithOneReplica(topic1, 10); + makeExistingTopicWithOneReplica(topic2, 20); + makeExistingTopicWithOneReplica(topic3, 30); + + Collection topicPartitions = + WorkerUtils.getMatchingTopicPartitions(adminClient, topic3, 0, 2); + assertTrue(topicPartitions.containsAll( + Arrays.asList(new TopicPartition(topic3, 0), new TopicPartition(topic3, 1)) + )); + } + + private void makeExistingTopicWithOneReplica(String topicName, int numPartitions) { + List tpInfo = new ArrayList<>(); + int brokerIndex = 0; + for (int i = 0; i < numPartitions; ++i) { + Node broker = cluster.get(brokerIndex); + tpInfo.add(new TopicPartitionInfo( + i, broker, singleReplica, Collections.emptyList())); + brokerIndex = (brokerIndex + 1) % cluster.size(); + } + adminClient.addTopic( + false, + topicName, + tpInfo, + null); + } } From 1401fe5bc7214101eb457b8f7179f2c492089ad6 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 26 Mar 2018 09:59:33 -0700 Subject: [PATCH 2/5] Kafka-6693: Added consumer workload --- .../kafka/trogdor/common/WorkerUtils.java | 2 +- .../trogdor/workload/ConsumeBenchSpec.java | 139 +++++++++ .../trogdor/workload/ConsumeBenchWorker.java | 293 ++++++++++++++++++ .../kafka/trogdor/common/WorkerUtilsTest.java | 37 ++- 4 files changed, 463 insertions(+), 8 deletions(-) create mode 100644 tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java create mode 100644 tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 3846d45324762..5902b5c112727 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -286,7 +286,7 @@ private static void verifyTopics( * where partitions are in range [startPartition, endPartition] * @param adminClient AdminClient * @param topicRegex Topic regular expression to match - * @return list of topic names + * @return List of topic names * @throws Throwable If failed to get list of existing topics */ static Collection getMatchingTopicPartitions( diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java new file mode 100644 index 0000000000000..cef913bc01a5f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.trogdor.task.TaskController; +import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The specification for a benchmark that produces messages to a set of topics. + */ +public class ConsumeBenchSpec extends TaskSpec { + + private final String consumerNode; + private final String bootstrapServers; + private final int targetMessagesPerSec; + private final int maxMessages; + private final Map consumerConf; + private final Map adminClientConf; + private final Map commonClientConf; + private final String topicRegex; + private final int startPartition; + private final int endPartition; + + + @JsonCreator + public ConsumeBenchSpec(@JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs, + @JsonProperty("consumerNode") String consumerNode, + @JsonProperty("bootstrapServers") String bootstrapServers, + @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, + @JsonProperty("maxMessages") int maxMessages, + @JsonProperty("consumerConf") Map consumerConf, + @JsonProperty("commonClientConf") Map commonClientConf, + @JsonProperty("adminClientConf") Map adminClientConf, + @JsonProperty("topicRegex") String topicRegex, + @JsonProperty("startPartition") int startPartition, + @JsonProperty("endPartition") int endPartition) { + super(startMs, durationMs); + this.consumerNode = (consumerNode == null) ? "" : consumerNode; + this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; + this.targetMessagesPerSec = targetMessagesPerSec; + this.maxMessages = maxMessages; + this.consumerConf = configOrEmptyMap(consumerConf); + this.commonClientConf = configOrEmptyMap(commonClientConf); + this.adminClientConf = configOrEmptyMap(adminClientConf); + this.topicRegex = topicRegex; + this.startPartition = startPartition; + this.endPartition = endPartition; + } + + @JsonProperty + public String consumerNode() { + return consumerNode; + } + + @JsonProperty + public String bootstrapServers() { + return bootstrapServers; + } + + @JsonProperty + public int targetMessagesPerSec() { + return targetMessagesPerSec; + } + + @JsonProperty + public int maxMessages() { + return maxMessages; + } + + @JsonProperty + public Map consumerConf() { + return consumerConf; + } + + @JsonProperty + public Map commonClientConf() { + return commonClientConf; + } + + @JsonProperty + public Map adminClientConf() { + return adminClientConf; + } + + @JsonProperty + public String topicRegex() { + return topicRegex; + } + + @JsonProperty + public int startPartition() { + return startPartition; + } + + @JsonProperty + public int endPartition() { + return endPartition; + } + + @Override + public TaskController newController(String id) { + return new TaskController() { + @Override + public Set targetNodes(Topology topology) { + return Collections.singleton(consumerNode); + } + }; + } + + @Override + public TaskWorker newTaskWorker(String id) { + return new ConsumeBenchWorker(id, this); + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java new file mode 100644 index 0000000000000..3eaf0f22b632c --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.ThreadUtils; +import org.apache.kafka.trogdor.common.WorkerUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class ConsumeBenchWorker implements TaskWorker { + private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class); + + private static final int THROTTLE_PERIOD_MS = 100; + + private final String id; + private final ConsumeBenchSpec spec; + private final AtomicBoolean running = new AtomicBoolean(false); + private ScheduledExecutorService executor; + private AtomicReference status; + private KafkaFutureImpl doneFuture; + private KafkaConsumer consumer; + + public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) { + this.id = id; + this.spec = spec; + } + + @Override + public void start(Platform platform, AtomicReference status, + KafkaFutureImpl doneFuture) throws Exception { + if (!running.compareAndSet(false, true)) { + throw new IllegalStateException("ConsumeBenchWorker is already running."); + } + log.info("{}: Activating ConsumeBenchWorker with {}", id, spec); + this.executor = Executors.newScheduledThreadPool( + 2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false)); + this.status = status; + this.doneFuture = doneFuture; + this.consumer = null; + executor.submit(new Prepare()); + } + + public class Prepare implements Runnable { + @Override + public void run() { + try { + // find topics to consume from based on provided topic regular expression + if (spec.topicRegex() == null) { + throw new ConfigException( + "Must provide topic name or regular expression to match existing topics."); + } + Collection topicPartitions = + WorkerUtils.getMatchingTopicPartitions( + log, spec.bootstrapServers(), + spec.commonClientConf(), spec.adminClientConf(), + spec.topicRegex(), spec.startPartition(), spec.endPartition()); + log.info("Will consume from {}", topicPartitions); + + executor.submit(new ConsumeRecords(topicPartitions)); + } catch (Throwable e) { + WorkerUtils.abort(log, "Prepare", e, doneFuture); + } + } + } + + public class ConsumeRecords implements Callable { + private final Histogram latencyHistogram; + private final Histogram messageSizeHistogram; + private final Future statusUpdaterFuture; + private final Throttle throttle; + + ConsumeRecords(Collection topicPartitions) { + this.latencyHistogram = new Histogram(5000); + this.messageSizeHistogram = new Histogram(2 * 1024 * 1024); + this.statusUpdaterFuture = executor.scheduleAtFixedRate( + new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); + for (Map.Entry entry : spec.consumerConf().entrySet()) { + props.setProperty(entry.getKey(), entry.getValue()); + } + consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), + new ByteArrayDeserializer()); + consumer.assign(topicPartitions); + int perPeriod = WorkerUtils.perSecToPerPeriod( + spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); + this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); + } + + @Override + public Void call() throws Exception { + long messagesConsumed = 0; + long bytesConsumed = 0; + long startTimeMs = Time.SYSTEM.milliseconds(); + long startBatchMs = startTimeMs; + try { + while (messagesConsumed < spec.maxMessages()) { + ConsumerRecords records = consumer.poll(50); + if (records.isEmpty()) { + continue; + } + long endBatchMs = Time.SYSTEM.milliseconds(); + long elapsedBatchMs = endBatchMs - startBatchMs; + for (ConsumerRecord record : records) { + messagesConsumed++; + long messageBytes = 0; + if (record.key() != null) { + messageBytes += record.serializedKeySize(); + } + if (record.value() != null) { + messageBytes += record.serializedValueSize(); + } + latencyHistogram.add(elapsedBatchMs); + messageSizeHistogram.add(messageBytes); + bytesConsumed += messageBytes; + throttle.increment(); + + } + startBatchMs = Time.SYSTEM.milliseconds(); + } + } catch (Exception e) { + WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture); + } finally { + statusUpdaterFuture.cancel(false); + new StatusUpdater(latencyHistogram, messageSizeHistogram).run(); + long curTimeMs = Time.SYSTEM.milliseconds(); + log.info("Consumed total number of records={}, bytes={} in {} ms. status: {}", + messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, status.get()); + } + doneFuture.complete(""); + return null; + } + } + + public class StatusUpdater implements Runnable { + private final Histogram latencyHistogram; + private final Histogram messageSizeHistogram; + + private final float[] percentiles; + + StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) { + this.latencyHistogram = latencyHistogram; + this.messageSizeHistogram = messageSizeHistogram; + this.percentiles = new float[] {0.50f, 0.95f, 0.99f}; + } + + @Override + public void run() { + try { + Histogram.Summary latSummary = latencyHistogram.summarize(percentiles); + Histogram.Summary msgSummary = messageSizeHistogram.summarize(percentiles); + StatusData statusData = new StatusData( + latSummary.numSamples(), + (long) (msgSummary.numSamples() * msgSummary.average()), + (long) msgSummary.average(), + latSummary.average(), + latSummary.percentiles().get(0).value(), + latSummary.percentiles().get(1).value(), + latSummary.percentiles().get(2).value()); + String statusDataString = JsonUtil.toJsonString(statusData); + status.set(statusDataString); + log.info("Status={}", statusDataString); + } catch (Exception e) { + WorkerUtils.abort(log, "StatusUpdater", e, doneFuture); + } + } + } + + public static class StatusData { + private final long totalMessagesReceived; + private final long totalBytesReceived; + private final long averageMessageSizeBytes; + private final float averageLatencyMs; + private final int p50LatencyMs; + private final int p90LatencyMs; + private final int p99LatencyMs; + + @JsonCreator + StatusData(@JsonProperty("totalMessagesReceived") long totalMessagesReceived, + @JsonProperty("totalBytesReceived") long totalBytesReceived, + @JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes, + @JsonProperty("averageLatencyMs") float averageLatencyMs, + @JsonProperty("p50LatencyMs") int p50latencyMs, + @JsonProperty("p90LatencyMs") int p90latencyMs, + @JsonProperty("p99LatencyMs") int p99latencyMs) { + this.totalMessagesReceived = totalMessagesReceived; + this.totalBytesReceived = totalBytesReceived; + this.averageMessageSizeBytes = averageMessageSizeBytes; + this.averageLatencyMs = averageLatencyMs; + this.p50LatencyMs = p50latencyMs; + this.p90LatencyMs = p90latencyMs; + this.p99LatencyMs = p99latencyMs; + } + + @JsonProperty + public long totalMessagesReceived() { + return totalMessagesReceived; + } + + @JsonProperty + public long totalBytesReceived() { + return totalBytesReceived; + } + + @JsonProperty + public long averageMessageSizeBytes() { + return averageMessageSizeBytes; + } + + @JsonProperty + public float averageLatencyMs() { + return averageLatencyMs; + } + + @JsonProperty + public int p50LatencyMs() { + return p50LatencyMs; + } + + @JsonProperty + public int p90LatencyMs() { + return p90LatencyMs; + } + + @JsonProperty + public int p99LatencyMs() { + return p99LatencyMs; + } + } + + @Override + public void stop(Platform platform) throws Exception { + if (!running.compareAndSet(true, false)) { + throw new IllegalStateException("ConsumeBenchWorker is not running."); + } + log.info("{}: Deactivating ConsumeBenchWorker.", id); + doneFuture.complete(""); + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.DAYS); + Utils.closeQuietly(consumer, "consumer"); + this.consumer = null; + this.executor = null; + this.status = null; + this.doneFuture = null; + } + +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java index 5f95ec9b484bd..5d840bc03419b 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -35,13 +35,13 @@ import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -266,19 +266,42 @@ public void testClientConfigOverwritesBothDefaultAndCommonConfigs() { } @Test - public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwable { + public void testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName() throws Throwable { final String topic1 = "existing-topic"; final String topic2 = "another-topic"; - final String topic3 = "one-more-topic"; + makeExistingTopicWithOneReplica(topic1, 10); + makeExistingTopicWithOneReplica(topic2, 20); + + Collection topicPartitions = + WorkerUtils.getMatchingTopicPartitions(adminClient, topic2, 0, 2); + assertEquals( + new HashSet<>(Arrays.asList( + new TopicPartition(topic2, 0), new TopicPartition(topic2, 1), + new TopicPartition(topic2, 2) + )), + new HashSet<>(topicPartitions) + ); + } + + @Test + public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwable { + final String topic1 = "test-topic"; + final String topic2 = "another-test-topic"; + final String topic3 = "one-more"; makeExistingTopicWithOneReplica(topic1, 10); makeExistingTopicWithOneReplica(topic2, 20); makeExistingTopicWithOneReplica(topic3, 30); Collection topicPartitions = - WorkerUtils.getMatchingTopicPartitions(adminClient, topic3, 0, 2); - assertTrue(topicPartitions.containsAll( - Arrays.asList(new TopicPartition(topic3, 0), new TopicPartition(topic3, 1)) - )); + WorkerUtils.getMatchingTopicPartitions(adminClient, ".*-topic$", 0, 1); + + assertEquals( + new HashSet<>(Arrays.asList( + new TopicPartition(topic1, 0), new TopicPartition(topic1, 1), + new TopicPartition(topic2, 0), new TopicPartition(topic2, 1) + )), + new HashSet<>(topicPartitions) + ); } private void makeExistingTopicWithOneReplica(String topicName, int numPartitions) { From caee26948fb0779003d4147a27da3d610b223b3a Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Fri, 6 Apr 2018 18:40:50 -0700 Subject: [PATCH 3/5] Fixed percentiles --- .../kafka/trogdor/common/WorkerUtils.java | 2 +- .../trogdor/workload/ConsumeBenchWorker.java | 30 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 5902b5c112727..3b7ff0ac74ea3 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -297,7 +297,7 @@ static Collection getMatchingTopicPartitions( // first get list of matching topics List matchedTopics = new ArrayList<>(); ListTopicsResult res = adminClient.listTopics( - new ListTopicsOptions().timeoutMs(CREATE_TOPICS_CALL_TIMEOUT)); + new ListTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); Map topicListingMap = res.namesToListings().get(); for (Map.Entry topicListingEntry: topicListingMap.entrySet()) { if (topicNamePattern.matcher(topicListingEntry.getKey()).matches() diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index 3eaf0f22b632c..edab69d6f28bf 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -39,7 +39,6 @@ import org.apache.kafka.trogdor.task.TaskWorker; import java.util.Collection; -import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -122,9 +121,9 @@ public class ConsumeRecords implements Callable { props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); - for (Map.Entry entry : spec.consumerConf().entrySet()) { - props.setProperty(entry.getKey(), entry.getValue()); - } + // these defaults maybe over-written by the user-specified commonClientConf or + // consumerConf + WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); consumer.assign(topicPartitions); @@ -182,19 +181,16 @@ public class StatusUpdater implements Runnable { private final Histogram latencyHistogram; private final Histogram messageSizeHistogram; - private final float[] percentiles; - StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) { this.latencyHistogram = latencyHistogram; this.messageSizeHistogram = messageSizeHistogram; - this.percentiles = new float[] {0.50f, 0.95f, 0.99f}; } @Override public void run() { try { - Histogram.Summary latSummary = latencyHistogram.summarize(percentiles); - Histogram.Summary msgSummary = messageSizeHistogram.summarize(percentiles); + Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES); + Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES); StatusData statusData = new StatusData( latSummary.numSamples(), (long) (msgSummary.numSamples() * msgSummary.average()), @@ -218,23 +214,29 @@ public static class StatusData { private final long averageMessageSizeBytes; private final float averageLatencyMs; private final int p50LatencyMs; - private final int p90LatencyMs; + private final int p95LatencyMs; private final int p99LatencyMs; + /** + * The percentiles to use when calculating the histogram data. + * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields. + */ + final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f}; + @JsonCreator StatusData(@JsonProperty("totalMessagesReceived") long totalMessagesReceived, @JsonProperty("totalBytesReceived") long totalBytesReceived, @JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes, @JsonProperty("averageLatencyMs") float averageLatencyMs, @JsonProperty("p50LatencyMs") int p50latencyMs, - @JsonProperty("p90LatencyMs") int p90latencyMs, + @JsonProperty("p95LatencyMs") int p95latencyMs, @JsonProperty("p99LatencyMs") int p99latencyMs) { this.totalMessagesReceived = totalMessagesReceived; this.totalBytesReceived = totalBytesReceived; this.averageMessageSizeBytes = averageMessageSizeBytes; this.averageLatencyMs = averageLatencyMs; this.p50LatencyMs = p50latencyMs; - this.p90LatencyMs = p90latencyMs; + this.p95LatencyMs = p95latencyMs; this.p99LatencyMs = p99latencyMs; } @@ -264,8 +266,8 @@ public int p50LatencyMs() { } @JsonProperty - public int p90LatencyMs() { - return p90LatencyMs; + public int p95LatencyMs() { + return p95LatencyMs; } @JsonProperty From b4b0e762ee901ae1ed2523144985513f8d0e0e43 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 9 Apr 2018 15:36:35 -0700 Subject: [PATCH 4/5] addressed review comments --- .../apache/kafka/trogdor/common/WorkerUtils.java | 14 +++++++------- .../kafka/trogdor/workload/ConsumeBenchWorker.java | 8 +++----- .../kafka/trogdor/common/WorkerUtilsTest.java | 9 ++++----- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 3b7ff0ac74ea3..3d4871a17296b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -97,7 +97,7 @@ public static void addConfigsToProperties( } } - private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000; + private static final int ADMIN_REQUEST_TIMEOUT = 25000; private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000; private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10; @@ -265,7 +265,7 @@ private static void verifyTopics( Logger log, AdminClient adminClient, Collection topicsToVerify, Map topicsInfo) throws Throwable { DescribeTopicsResult topicsResult = adminClient.describeTopics( - topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); + topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicDescriptionMap = topicsResult.all().get(); for (TopicDescription desc: topicDescriptionMap.values()) { // map will always contain the topic since all topics in 'topicsExists' are in given @@ -297,11 +297,11 @@ static Collection getMatchingTopicPartitions( // first get list of matching topics List matchedTopics = new ArrayList<>(); ListTopicsResult res = adminClient.listTopics( - new ListTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); + new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicListingMap = res.namesToListings().get(); for (Map.Entry topicListingEntry: topicListingMap.entrySet()) { - if (topicNamePattern.matcher(topicListingEntry.getKey()).matches() - && !topicListingEntry.getValue().isInternal()) { + if (!topicListingEntry.getValue().isInternal() + && topicNamePattern.matcher(topicListingEntry.getKey()).matches()) { matchedTopics.add(topicListingEntry.getKey()); } } @@ -309,7 +309,7 @@ static Collection getMatchingTopicPartitions( // create a list of topic/partitions List out = new ArrayList<>(); DescribeTopicsResult topicsResult = adminClient.describeTopics( - matchedTopics, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); + matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicDescriptionMap = topicsResult.all().get(); for (TopicDescription desc: topicDescriptionMap.values()) { List partitions = desc.partitions(); @@ -327,7 +327,7 @@ private static AdminClient createAdminClient( Map commonClientConf, Map adminClientConf) { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT); // first add common client config, and then admin client config to properties, possibly // over-writing default or common properties. addConfigsToProperties(props, commonClientConf, adminClientConf); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index edab69d6f28bf..56f95ecbb9499 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -77,7 +77,6 @@ public void start(Platform platform, AtomicReference status, 2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false)); this.status = status; this.doneFuture = doneFuture; - this.consumer = null; executor.submit(new Prepare()); } @@ -97,20 +96,20 @@ public void run() { spec.topicRegex(), spec.startPartition(), spec.endPartition()); log.info("Will consume from {}", topicPartitions); - executor.submit(new ConsumeRecords(topicPartitions)); + executor.submit(new ConsumeMessages(topicPartitions)); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); } } } - public class ConsumeRecords implements Callable { + public class ConsumeMessages implements Callable { private final Histogram latencyHistogram; private final Histogram messageSizeHistogram; private final Future statusUpdaterFuture; private final Throttle throttle; - ConsumeRecords(Collection topicPartitions) { + ConsumeMessages(Collection topicPartitions) { this.latencyHistogram = new Histogram(5000); this.messageSizeHistogram = new Histogram(2 * 1024 * 1024); this.statusUpdaterFuture = executor.scheduleAtFixedRate( @@ -159,7 +158,6 @@ public Void call() throws Exception { messageSizeHistogram.add(messageBytes); bytesConsumed += messageBytes; throttle.increment(); - } startBatchMs = Time.SYSTEM.milliseconds(); } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java index 5d840bc03419b..a35efe199aa8f 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -275,10 +275,10 @@ public void testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName() throw Collection topicPartitions = WorkerUtils.getMatchingTopicPartitions(adminClient, topic2, 0, 2); assertEquals( - new HashSet<>(Arrays.asList( + Utils.mkSet( new TopicPartition(topic2, 0), new TopicPartition(topic2, 1), new TopicPartition(topic2, 2) - )), + ), new HashSet<>(topicPartitions) ); } @@ -294,12 +294,11 @@ public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwa Collection topicPartitions = WorkerUtils.getMatchingTopicPartitions(adminClient, ".*-topic$", 0, 1); - assertEquals( - new HashSet<>(Arrays.asList( + Utils.mkSet( new TopicPartition(topic1, 0), new TopicPartition(topic1, 1), new TopicPartition(topic2, 0), new TopicPartition(topic2, 1) - )), + ), new HashSet<>(topicPartitions) ); } From 28b4569cf2be3c21ea1dc2c1d0372daf333ab2d2 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 9 Apr 2018 17:29:42 -0700 Subject: [PATCH 5/5] rebased and fixed merge issues --- .../trogdor/workload/ConsumeBenchWorker.java | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index 56f95ecbb9499..5c74d906601a4 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -33,6 +33,7 @@ import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public class ConsumeBenchWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class); @@ -57,7 +57,7 @@ public class ConsumeBenchWorker implements TaskWorker { private final ConsumeBenchSpec spec; private final AtomicBoolean running = new AtomicBoolean(false); private ScheduledExecutorService executor; - private AtomicReference status; + private WorkerStatusTracker status; private KafkaFutureImpl doneFuture; private KafkaConsumer consumer; @@ -67,7 +67,7 @@ public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) { } @Override - public void start(Platform platform, AtomicReference status, + public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl doneFuture) throws Exception { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("ConsumeBenchWorker is already running."); @@ -165,10 +165,11 @@ public Void call() throws Exception { WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture); } finally { statusUpdaterFuture.cancel(false); - new StatusUpdater(latencyHistogram, messageSizeHistogram).run(); + StatusData statusData = + new StatusUpdater(latencyHistogram, messageSizeHistogram).update(); long curTimeMs = Time.SYSTEM.milliseconds(); - log.info("Consumed total number of records={}, bytes={} in {} ms. status: {}", - messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, status.get()); + log.info("Consumed total number of messages={}, bytes={} in {} ms. status: {}", + messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData); } doneFuture.complete(""); return null; @@ -187,23 +188,27 @@ public class StatusUpdater implements Runnable { @Override public void run() { try { - Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES); - Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES); - StatusData statusData = new StatusData( - latSummary.numSamples(), - (long) (msgSummary.numSamples() * msgSummary.average()), - (long) msgSummary.average(), - latSummary.average(), - latSummary.percentiles().get(0).value(), - latSummary.percentiles().get(1).value(), - latSummary.percentiles().get(2).value()); - String statusDataString = JsonUtil.toJsonString(statusData); - status.set(statusDataString); - log.info("Status={}", statusDataString); + update(); } catch (Exception e) { WorkerUtils.abort(log, "StatusUpdater", e, doneFuture); } } + + StatusData update() { + Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES); + Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES); + StatusData statusData = new StatusData( + latSummary.numSamples(), + (long) (msgSummary.numSamples() * msgSummary.average()), + (long) msgSummary.average(), + latSummary.average(), + latSummary.percentiles().get(0).value(), + latSummary.percentiles().get(1).value(), + latSummary.percentiles().get(2).value()); + status.update(JsonUtil.JSON_SERDE.valueToTree(statusData)); + log.info("Status={}", JsonUtil.toJsonString(statusData)); + return statusData; + } } public static class StatusData {