diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index a13d3ec68fc68..c277b83330767 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -14,11 +14,18 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Random; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -42,15 +49,36 @@ public static void main(String[] args) throws Exception { /* parse args */ String topicName = res.getString("topic"); long numRecords = res.getLong("numRecords"); - int recordSize = res.getInt("recordSize"); + Integer recordSize = res.getInt("recordSize"); int throughput = res.getInt("throughput"); List producerProps = res.getList("producerConfig"); String producerConfig = res.getString("producerConfigFile"); + String payloadFilePath = res.getString("payloadFile"); + + // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. + String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter"); if (producerProps == null && producerConfig == null) { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } + List payloadByteList = new ArrayList<>(); + if (payloadFilePath != null) { + Path path = Paths.get(payloadFilePath); + System.out.println("Reading payloads from: " + path.toAbsolutePath()); + if (Files.notExists(path) || Files.size(path) == 0) { + throw new IllegalArgumentException("File does not exist or empty file provided."); + } + + String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter); + + System.out.println("Number of messages read: " + payloadList.length); + + for (String payload : payloadList) { + payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); + } + } + Properties props = new Properties(); if (producerConfig != null) { props.putAll(Utils.loadProps(producerConfig)); @@ -68,16 +96,24 @@ public static void main(String[] args) throws Exception { KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ - byte[] payload = new byte[recordSize]; + byte[] payload = null; Random random = new Random(0); - for (int i = 0; i < payload.length; ++i) - payload[i] = (byte) (random.nextInt(26) + 65); - ProducerRecord record = new ProducerRecord<>(topicName, payload); + if (recordSize != null) { + payload = new byte[recordSize]; + for (int i = 0; i < payload.length; ++i) + payload[i] = (byte) (random.nextInt(26) + 65); + } + ProducerRecord record; Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { + if (payloadFilePath != null) { + payload = payloadByteList.get(random.nextInt(payloadByteList.size())); + } + record = new ProducerRecord<>(topicName, payload); + long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); producer.send(record, cb); @@ -109,6 +145,11 @@ private static ArgumentParser argParser() { .defaultHelp(true) .description("This tool is used to verify the producer performance."); + MutuallyExclusiveGroup payloadOptions = parser + .addMutuallyExclusiveGroup() + .required(true) + .description("either --record-size or --payload-file must be specified but not both."); + parser.addArgument("--topic") .action(store()) .required(true) @@ -124,13 +165,34 @@ private static ArgumentParser argParser() { .dest("numRecords") .help("number of messages to produce"); - parser.addArgument("--record-size") + payloadOptions.addArgument("--record-size") .action(store()) - .required(true) + .required(false) .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") - .help("message size in bytes"); + .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); + + payloadOptions.addArgument("--payload-file") + .action(store()) + .required(false) + .type(String.class) + .metavar("PAYLOAD-FILE") + .dest("payloadFile") + .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + + "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + + "Note that you must provide exactly one of --record-size or --payload-file."); + + parser.addArgument("--payload-delimiter") + .action(store()) + .required(false) + .type(String.class) + .metavar("PAYLOAD-DELIMITER") + .dest("payloadDelimiter") + .setDefault("\\n") + .help("provides delimiter to be used when --payload-file is provided. " + + "Defaults to new line. " + + "Note that this parameter will be ignored if --payload-file is not provided."); parser.addArgument("--throughput") .action(store())