Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
Expand All @@ -39,6 +44,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.regex.Pattern;

/**
* Utilities for Trogdor TaskWorkers.
Expand Down Expand Up @@ -91,7 +97,7 @@ public static void addConfigsToProperties(
}
}

private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
private static final int ADMIN_REQUEST_TIMEOUT = 25000;
private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;

Expand Down Expand Up @@ -128,6 +134,32 @@ public static void createTopics(
}
}

/**
* Returns a list of all existing topic partitions that match the following criteria: topic
* name matches give regular expression 'topicRegex', topic is not internal, partitions are
* in range [startPartition, endPartition]
*
* @param log The logger to use.
* @param bootstrapServers The bootstrap server list.
* @param topicRegex Topic name regular expression
* @param startPartition Starting partition of partition range
* @param endPartition Ending partition of partition range
* @return List of topic partitions
* @throws Throwable If getting list of topics or their descriptions fails.
*/
public static Collection<TopicPartition> getMatchingTopicPartitions(
Logger log, String bootstrapServers,
Map<String, String> commonClientConf, Map<String, String> adminClientConf,
String topicRegex, int startPartition, int endPartition) throws Throwable {
try (AdminClient adminClient
= createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) {
return getMatchingTopicPartitions(adminClient, topicRegex, startPartition, endPartition);
} catch (Exception e) {
log.warn("Failed to get topic partitions matching {}", topicRegex, e);
throw e;
}
}

/**
* The actual create topics functionality is separated into this method and called from the
* above method to be able to unit test with mock adminClient.
Expand Down Expand Up @@ -233,7 +265,7 @@ private static void verifyTopics(
Logger log, AdminClient adminClient,
Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
DescribeTopicsResult topicsResult = adminClient.describeTopics(
topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT));
topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
for (TopicDescription desc: topicDescriptionMap.values()) {
// map will always contain the topic since all topics in 'topicsExists' are in given
Expand All @@ -249,12 +281,53 @@ private static void verifyTopics(
}
}

/**
* Returns list of existing, not internal, topics/partitions that match given pattern and
* where partitions are in range [startPartition, endPartition]
* @param adminClient AdminClient
* @param topicRegex Topic regular expression to match
* @return List of topic names
* @throws Throwable If failed to get list of existing topics
*/
static Collection<TopicPartition> getMatchingTopicPartitions(
AdminClient adminClient, String topicRegex, int startPartition, int endPartition)
throws Throwable {
final Pattern topicNamePattern = Pattern.compile(topicRegex);

// first get list of matching topics
List<String> matchedTopics = new ArrayList<>();
ListTopicsResult res = adminClient.listTopics(
new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
Map<String, TopicListing> topicListingMap = res.namesToListings().get();
for (Map.Entry<String, TopicListing> topicListingEntry: topicListingMap.entrySet()) {
if (!topicListingEntry.getValue().isInternal()
&& topicNamePattern.matcher(topicListingEntry.getKey()).matches()) {
matchedTopics.add(topicListingEntry.getKey());
}
}

// create a list of topic/partitions
List<TopicPartition> out = new ArrayList<>();
DescribeTopicsResult topicsResult = adminClient.describeTopics(
matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
for (TopicDescription desc: topicDescriptionMap.values()) {
List<TopicPartitionInfo> partitions = desc.partitions();
for (TopicPartitionInfo info: partitions) {
if ((info.partition() >= startPartition) && (info.partition() <= endPartition)) {
out.add(new TopicPartition(desc.name(), info.partition()));
}
}
}
return out;
}

private static AdminClient createAdminClient(
String bootstrapServers, Map<String, String> commonClientConf,
Map<String, String> adminClientConf) {
String bootstrapServers,
Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT);
// first add common client config, and then admin client config to properties, possibly
// over-writing default or common properties.
addConfigsToProperties(props, commonClientConf, adminClientConf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* The specification for a benchmark that produces messages to a set of topics.
*/
public class ConsumeBenchSpec extends TaskSpec {

private final String consumerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final int maxMessages;
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
private final String topicRegex;
private final int startPartition;
private final int endPartition;


@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("consumerNode") String consumerNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a clientConf as well, right-- as per the discussion earlier about admin client security configs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added commonConf and adminClientConf.

@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("topicRegex") String topicRegex,
@JsonProperty("startPartition") int startPartition,
@JsonProperty("endPartition") int endPartition) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
this.targetMessagesPerSec = targetMessagesPerSec;
this.maxMessages = maxMessages;
this.consumerConf = configOrEmptyMap(consumerConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.topicRegex = topicRegex;
this.startPartition = startPartition;
this.endPartition = endPartition;
}

@JsonProperty
public String consumerNode() {
return consumerNode;
}

@JsonProperty
public String bootstrapServers() {
return bootstrapServers;
}

@JsonProperty
public int targetMessagesPerSec() {
return targetMessagesPerSec;
}

@JsonProperty
public int maxMessages() {
return maxMessages;
}

@JsonProperty
public Map<String, String> consumerConf() {
return consumerConf;
}

@JsonProperty
public Map<String, String> commonClientConf() {
return commonClientConf;
}

@JsonProperty
public Map<String, String> adminClientConf() {
return adminClientConf;
}

@JsonProperty
public String topicRegex() {
return topicRegex;
}

@JsonProperty
public int startPartition() {
return startPartition;
}

@JsonProperty
public int endPartition() {
return endPartition;
}

@Override
public TaskController newController(String id) {
return new TaskController() {
@Override
public Set<String> targetNodes(Topology topology) {
return Collections.singleton(consumerNode);
}
};
}

@Override
public TaskWorker newTaskWorker(String id) {
return new ConsumeBenchWorker(id, this);
}
}
Loading