diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 98dbf3836b692..3d4871a17296b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -21,9 +21,14 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeTopicsOptions; import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; @@ -39,6 +44,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import java.util.regex.Pattern; /** * Utilities for Trogdor TaskWorkers. @@ -91,7 +97,7 @@ public static void addConfigsToProperties( } } - private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000; + private static final int ADMIN_REQUEST_TIMEOUT = 25000; private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000; private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10; @@ -128,6 +134,32 @@ public static void createTopics( } } + /** + * Returns a list of all existing topic partitions that match the following criteria: topic + * name matches give regular expression 'topicRegex', topic is not internal, partitions are + * in range [startPartition, endPartition] + * + * @param log The logger to use. + * @param bootstrapServers The bootstrap server list. + * @param topicRegex Topic name regular expression + * @param startPartition Starting partition of partition range + * @param endPartition Ending partition of partition range + * @return List of topic partitions + * @throws Throwable If getting list of topics or their descriptions fails. + */ + public static Collection getMatchingTopicPartitions( + Logger log, String bootstrapServers, + Map commonClientConf, Map adminClientConf, + String topicRegex, int startPartition, int endPartition) throws Throwable { + try (AdminClient adminClient + = createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) { + return getMatchingTopicPartitions(adminClient, topicRegex, startPartition, endPartition); + } catch (Exception e) { + log.warn("Failed to get topic partitions matching {}", topicRegex, e); + throw e; + } + } + /** * The actual create topics functionality is separated into this method and called from the * above method to be able to unit test with mock adminClient. @@ -233,7 +265,7 @@ private static void verifyTopics( Logger log, AdminClient adminClient, Collection topicsToVerify, Map topicsInfo) throws Throwable { DescribeTopicsResult topicsResult = adminClient.describeTopics( - topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT)); + topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicDescriptionMap = topicsResult.all().get(); for (TopicDescription desc: topicDescriptionMap.values()) { // map will always contain the topic since all topics in 'topicsExists' are in given @@ -249,12 +281,53 @@ private static void verifyTopics( } } + /** + * Returns list of existing, not internal, topics/partitions that match given pattern and + * where partitions are in range [startPartition, endPartition] + * @param adminClient AdminClient + * @param topicRegex Topic regular expression to match + * @return List of topic names + * @throws Throwable If failed to get list of existing topics + */ + static Collection getMatchingTopicPartitions( + AdminClient adminClient, String topicRegex, int startPartition, int endPartition) + throws Throwable { + final Pattern topicNamePattern = Pattern.compile(topicRegex); + + // first get list of matching topics + List matchedTopics = new ArrayList<>(); + ListTopicsResult res = adminClient.listTopics( + new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); + Map topicListingMap = res.namesToListings().get(); + for (Map.Entry topicListingEntry: topicListingMap.entrySet()) { + if (!topicListingEntry.getValue().isInternal() + && topicNamePattern.matcher(topicListingEntry.getKey()).matches()) { + matchedTopics.add(topicListingEntry.getKey()); + } + } + + // create a list of topic/partitions + List out = new ArrayList<>(); + DescribeTopicsResult topicsResult = adminClient.describeTopics( + matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); + Map topicDescriptionMap = topicsResult.all().get(); + for (TopicDescription desc: topicDescriptionMap.values()) { + List partitions = desc.partitions(); + for (TopicPartitionInfo info: partitions) { + if ((info.partition() >= startPartition) && (info.partition() <= endPartition)) { + out.add(new TopicPartition(desc.name(), info.partition())); + } + } + } + return out; + } + private static AdminClient createAdminClient( - String bootstrapServers, Map commonClientConf, - Map adminClientConf) { + String bootstrapServers, + Map commonClientConf, Map adminClientConf) { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT); // first add common client config, and then admin client config to properties, possibly // over-writing default or common properties. addConfigsToProperties(props, commonClientConf, adminClientConf); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java new file mode 100644 index 0000000000000..cef913bc01a5f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.trogdor.task.TaskController; +import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The specification for a benchmark that produces messages to a set of topics. + */ +public class ConsumeBenchSpec extends TaskSpec { + + private final String consumerNode; + private final String bootstrapServers; + private final int targetMessagesPerSec; + private final int maxMessages; + private final Map consumerConf; + private final Map adminClientConf; + private final Map commonClientConf; + private final String topicRegex; + private final int startPartition; + private final int endPartition; + + + @JsonCreator + public ConsumeBenchSpec(@JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs, + @JsonProperty("consumerNode") String consumerNode, + @JsonProperty("bootstrapServers") String bootstrapServers, + @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, + @JsonProperty("maxMessages") int maxMessages, + @JsonProperty("consumerConf") Map consumerConf, + @JsonProperty("commonClientConf") Map commonClientConf, + @JsonProperty("adminClientConf") Map adminClientConf, + @JsonProperty("topicRegex") String topicRegex, + @JsonProperty("startPartition") int startPartition, + @JsonProperty("endPartition") int endPartition) { + super(startMs, durationMs); + this.consumerNode = (consumerNode == null) ? "" : consumerNode; + this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; + this.targetMessagesPerSec = targetMessagesPerSec; + this.maxMessages = maxMessages; + this.consumerConf = configOrEmptyMap(consumerConf); + this.commonClientConf = configOrEmptyMap(commonClientConf); + this.adminClientConf = configOrEmptyMap(adminClientConf); + this.topicRegex = topicRegex; + this.startPartition = startPartition; + this.endPartition = endPartition; + } + + @JsonProperty + public String consumerNode() { + return consumerNode; + } + + @JsonProperty + public String bootstrapServers() { + return bootstrapServers; + } + + @JsonProperty + public int targetMessagesPerSec() { + return targetMessagesPerSec; + } + + @JsonProperty + public int maxMessages() { + return maxMessages; + } + + @JsonProperty + public Map consumerConf() { + return consumerConf; + } + + @JsonProperty + public Map commonClientConf() { + return commonClientConf; + } + + @JsonProperty + public Map adminClientConf() { + return adminClientConf; + } + + @JsonProperty + public String topicRegex() { + return topicRegex; + } + + @JsonProperty + public int startPartition() { + return startPartition; + } + + @JsonProperty + public int endPartition() { + return endPartition; + } + + @Override + public TaskController newController(String id) { + return new TaskController() { + @Override + public Set targetNodes(Topology topology) { + return Collections.singleton(consumerNode); + } + }; + } + + @Override + public TaskWorker newTaskWorker(String id) { + return new ConsumeBenchWorker(id, this); + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java new file mode 100644 index 0000000000000..5c74d906601a4 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.ThreadUtils; +import org.apache.kafka.trogdor.common.WorkerUtils; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ConsumeBenchWorker implements TaskWorker { + private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class); + + private static final int THROTTLE_PERIOD_MS = 100; + + private final String id; + private final ConsumeBenchSpec spec; + private final AtomicBoolean running = new AtomicBoolean(false); + private ScheduledExecutorService executor; + private WorkerStatusTracker status; + private KafkaFutureImpl doneFuture; + private KafkaConsumer consumer; + + public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) { + this.id = id; + this.spec = spec; + } + + @Override + public void start(Platform platform, WorkerStatusTracker status, + KafkaFutureImpl doneFuture) throws Exception { + if (!running.compareAndSet(false, true)) { + throw new IllegalStateException("ConsumeBenchWorker is already running."); + } + log.info("{}: Activating ConsumeBenchWorker with {}", id, spec); + this.executor = Executors.newScheduledThreadPool( + 2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false)); + this.status = status; + this.doneFuture = doneFuture; + executor.submit(new Prepare()); + } + + public class Prepare implements Runnable { + @Override + public void run() { + try { + // find topics to consume from based on provided topic regular expression + if (spec.topicRegex() == null) { + throw new ConfigException( + "Must provide topic name or regular expression to match existing topics."); + } + Collection topicPartitions = + WorkerUtils.getMatchingTopicPartitions( + log, spec.bootstrapServers(), + spec.commonClientConf(), spec.adminClientConf(), + spec.topicRegex(), spec.startPartition(), spec.endPartition()); + log.info("Will consume from {}", topicPartitions); + + executor.submit(new ConsumeMessages(topicPartitions)); + } catch (Throwable e) { + WorkerUtils.abort(log, "Prepare", e, doneFuture); + } + } + } + + public class ConsumeMessages implements Callable { + private final Histogram latencyHistogram; + private final Histogram messageSizeHistogram; + private final Future statusUpdaterFuture; + private final Throttle throttle; + + ConsumeMessages(Collection topicPartitions) { + this.latencyHistogram = new Histogram(5000); + this.messageSizeHistogram = new Histogram(2 * 1024 * 1024); + this.statusUpdaterFuture = executor.scheduleAtFixedRate( + new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); + // these defaults maybe over-written by the user-specified commonClientConf or + // consumerConf + WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); + consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), + new ByteArrayDeserializer()); + consumer.assign(topicPartitions); + int perPeriod = WorkerUtils.perSecToPerPeriod( + spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); + this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); + } + + @Override + public Void call() throws Exception { + long messagesConsumed = 0; + long bytesConsumed = 0; + long startTimeMs = Time.SYSTEM.milliseconds(); + long startBatchMs = startTimeMs; + try { + while (messagesConsumed < spec.maxMessages()) { + ConsumerRecords records = consumer.poll(50); + if (records.isEmpty()) { + continue; + } + long endBatchMs = Time.SYSTEM.milliseconds(); + long elapsedBatchMs = endBatchMs - startBatchMs; + for (ConsumerRecord record : records) { + messagesConsumed++; + long messageBytes = 0; + if (record.key() != null) { + messageBytes += record.serializedKeySize(); + } + if (record.value() != null) { + messageBytes += record.serializedValueSize(); + } + latencyHistogram.add(elapsedBatchMs); + messageSizeHistogram.add(messageBytes); + bytesConsumed += messageBytes; + throttle.increment(); + } + startBatchMs = Time.SYSTEM.milliseconds(); + } + } catch (Exception e) { + WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture); + } finally { + statusUpdaterFuture.cancel(false); + StatusData statusData = + new StatusUpdater(latencyHistogram, messageSizeHistogram).update(); + long curTimeMs = Time.SYSTEM.milliseconds(); + log.info("Consumed total number of messages={}, bytes={} in {} ms. status: {}", + messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData); + } + doneFuture.complete(""); + return null; + } + } + + public class StatusUpdater implements Runnable { + private final Histogram latencyHistogram; + private final Histogram messageSizeHistogram; + + StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) { + this.latencyHistogram = latencyHistogram; + this.messageSizeHistogram = messageSizeHistogram; + } + + @Override + public void run() { + try { + update(); + } catch (Exception e) { + WorkerUtils.abort(log, "StatusUpdater", e, doneFuture); + } + } + + StatusData update() { + Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES); + Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES); + StatusData statusData = new StatusData( + latSummary.numSamples(), + (long) (msgSummary.numSamples() * msgSummary.average()), + (long) msgSummary.average(), + latSummary.average(), + latSummary.percentiles().get(0).value(), + latSummary.percentiles().get(1).value(), + latSummary.percentiles().get(2).value()); + status.update(JsonUtil.JSON_SERDE.valueToTree(statusData)); + log.info("Status={}", JsonUtil.toJsonString(statusData)); + return statusData; + } + } + + public static class StatusData { + private final long totalMessagesReceived; + private final long totalBytesReceived; + private final long averageMessageSizeBytes; + private final float averageLatencyMs; + private final int p50LatencyMs; + private final int p95LatencyMs; + private final int p99LatencyMs; + + /** + * The percentiles to use when calculating the histogram data. + * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields. + */ + final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f}; + + @JsonCreator + StatusData(@JsonProperty("totalMessagesReceived") long totalMessagesReceived, + @JsonProperty("totalBytesReceived") long totalBytesReceived, + @JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes, + @JsonProperty("averageLatencyMs") float averageLatencyMs, + @JsonProperty("p50LatencyMs") int p50latencyMs, + @JsonProperty("p95LatencyMs") int p95latencyMs, + @JsonProperty("p99LatencyMs") int p99latencyMs) { + this.totalMessagesReceived = totalMessagesReceived; + this.totalBytesReceived = totalBytesReceived; + this.averageMessageSizeBytes = averageMessageSizeBytes; + this.averageLatencyMs = averageLatencyMs; + this.p50LatencyMs = p50latencyMs; + this.p95LatencyMs = p95latencyMs; + this.p99LatencyMs = p99latencyMs; + } + + @JsonProperty + public long totalMessagesReceived() { + return totalMessagesReceived; + } + + @JsonProperty + public long totalBytesReceived() { + return totalBytesReceived; + } + + @JsonProperty + public long averageMessageSizeBytes() { + return averageMessageSizeBytes; + } + + @JsonProperty + public float averageLatencyMs() { + return averageLatencyMs; + } + + @JsonProperty + public int p50LatencyMs() { + return p50LatencyMs; + } + + @JsonProperty + public int p95LatencyMs() { + return p95LatencyMs; + } + + @JsonProperty + public int p99LatencyMs() { + return p99LatencyMs; + } + } + + @Override + public void stop(Platform platform) throws Exception { + if (!running.compareAndSet(true, false)) { + throw new IllegalStateException("ConsumeBenchWorker is not running."); + } + log.info("{}: Deactivating ConsumeBenchWorker.", id); + doneFuture.complete(""); + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.DAYS); + Utils.closeQuietly(consumer, "consumer"); + this.consumer = null; + this.executor = null; + this.status = null; + this.doneFuture = null; + } + +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java index fbe23899bc9dc..a35efe199aa8f 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -18,9 +18,9 @@ package org.apache.kafka.trogdor.common; - import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.Node; @@ -38,8 +38,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -262,4 +264,58 @@ public void testClientConfigOverwritesBothDefaultAndCommonConfigs() { Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "0")); assertEquals(resultProps, props); } + + @Test + public void testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName() throws Throwable { + final String topic1 = "existing-topic"; + final String topic2 = "another-topic"; + makeExistingTopicWithOneReplica(topic1, 10); + makeExistingTopicWithOneReplica(topic2, 20); + + Collection topicPartitions = + WorkerUtils.getMatchingTopicPartitions(adminClient, topic2, 0, 2); + assertEquals( + Utils.mkSet( + new TopicPartition(topic2, 0), new TopicPartition(topic2, 1), + new TopicPartition(topic2, 2) + ), + new HashSet<>(topicPartitions) + ); + } + + @Test + public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwable { + final String topic1 = "test-topic"; + final String topic2 = "another-test-topic"; + final String topic3 = "one-more"; + makeExistingTopicWithOneReplica(topic1, 10); + makeExistingTopicWithOneReplica(topic2, 20); + makeExistingTopicWithOneReplica(topic3, 30); + + Collection topicPartitions = + WorkerUtils.getMatchingTopicPartitions(adminClient, ".*-topic$", 0, 1); + assertEquals( + Utils.mkSet( + new TopicPartition(topic1, 0), new TopicPartition(topic1, 1), + new TopicPartition(topic2, 0), new TopicPartition(topic2, 1) + ), + new HashSet<>(topicPartitions) + ); + } + + private void makeExistingTopicWithOneReplica(String topicName, int numPartitions) { + List tpInfo = new ArrayList<>(); + int brokerIndex = 0; + for (int i = 0; i < numPartitions; ++i) { + Node broker = cluster.get(brokerIndex); + tpInfo.add(new TopicPartitionInfo( + i, broker, singleReplica, Collections.emptyList())); + brokerIndex = (brokerIndex + 1) % cluster.size(); + } + adminClient.addTopic( + false, + topicName, + tpInfo, + null); + } }