From af6c6d910ae3915c769ebac326ade329a724cda4 Mon Sep 17 00:00:00 2001 From: Sandesh K Date: Tue, 22 Nov 2016 16:38:03 +0530 Subject: [PATCH 1/4] KAFKA-4432: Added support to supply custom message payloads --- .../kafka/tools/ProducerPerformance.java | 78 ++++++++++++++++--- 1 file changed, 69 insertions(+), 9 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index a13d3ec68fc68..5faf4a5923a8f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -14,6 +14,12 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -42,15 +48,40 @@ public static void main(String[] args) throws Exception { /* parse args */ String topicName = res.getString("topic"); long numRecords = res.getLong("numRecords"); - int recordSize = res.getInt("recordSize"); + Integer recordSize = res.getInt("recordSize"); int throughput = res.getInt("throughput"); List producerProps = res.getList("producerConfig"); String producerConfig = res.getString("producerConfigFile"); + String payloadFilePath = res.getString("payloadFile"); + + // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. + String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter"); if (producerProps == null && producerConfig == null) { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } + if ((recordSize == null && payloadFilePath == null) || (recordSize != null && payloadFilePath != null)) { + throw new IllegalArgumentException("Either --record-size or --payload-file must be specified but not both."); + } + + List payloadByteList = new ArrayList<>(); + if (payloadFilePath != null) { + Path path = Paths.get(payloadFilePath); + System.out.println("Reading payloads from: " + path.toAbsolutePath()); + if (Files.notExists(path) || Files.size(path) == 0) { + throw new IllegalArgumentException("File does not exist or empty file provided."); + } + + String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter); + + System.out.println("Number of messages read: " + payloadList.length); + + for(String payload : payloadList) { + payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); + } + } + Properties props = new Properties(); if (producerConfig != null) { props.putAll(Utils.loadProps(producerConfig)); @@ -68,16 +99,24 @@ public static void main(String[] args) throws Exception { KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ - byte[] payload = new byte[recordSize]; + byte[] payload = null; Random random = new Random(0); - for (int i = 0; i < payload.length; ++i) - payload[i] = (byte) (random.nextInt(26) + 65); - ProducerRecord record = new ProducerRecord<>(topicName, payload); + if(recordSize != null) { + payload = new byte[recordSize]; + for (int i = 0; i < payload.length; ++i) + payload[i] = (byte) (random.nextInt(26) + 65); + } + ProducerRecord record; Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { + if(payloadFilePath != null) { + payload = payloadByteList.get(random.nextInt(payloadByteList.size())); + } + record = new ProducerRecord<>(topicName, payload); + long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); producer.send(record, cb); @@ -126,11 +165,32 @@ private static ArgumentParser argParser() { parser.addArgument("--record-size") .action(store()) - .required(true) + .required(false) .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") - .help("message size in bytes"); + .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); + + parser.addArgument("--payload-file") + .action(store()) + .required(false) + .type(String.class) + .metavar("PAYLOAD-FILE") + .dest("payloadFile") + .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + + "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + + "Note that you must provide exactly one of --record-size or --payload-file."); + + parser.addArgument("--payload-delimiter") + .action(store()) + .required(false) + .type(String.class) + .metavar("PAYLOAD-DELIMITER") + .dest("payloadDelimiter") + .setDefault("\\n") + .help("provides delimiter to be used when --payload-file is provided. " + + "Defaults to new line. " + + "Note that this parameter will be ignored if --payload-file is not provided."); parser.addArgument("--throughput") .action(store()) From df278060a04fa844f150f30ae534a310c47c0eb3 Mon Sep 17 00:00:00 2001 From: Sandesh K Date: Tue, 22 Nov 2016 21:23:52 +0530 Subject: [PATCH 2/4] KAFKA-4432: Fixed checkstyle issues --- .../java/org/apache/kafka/tools/ProducerPerformance.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 5faf4a5923a8f..e71b4e3755262 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -77,7 +77,7 @@ public static void main(String[] args) throws Exception { System.out.println("Number of messages read: " + payloadList.length); - for(String payload : payloadList) { + for (String payload : payloadList) { payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); } } @@ -101,7 +101,7 @@ public static void main(String[] args) throws Exception { /* setup perf test */ byte[] payload = null; Random random = new Random(0); - if(recordSize != null) { + if (recordSize != null) { payload = new byte[recordSize]; for (int i = 0; i < payload.length; ++i) payload[i] = (byte) (random.nextInt(26) + 65); @@ -112,7 +112,7 @@ public static void main(String[] args) throws Exception { ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { - if(payloadFilePath != null) { + if (payloadFilePath != null) { payload = payloadByteList.get(random.nextInt(payloadByteList.size())); } record = new ProducerRecord<>(topicName, payload); From ab80c6e214d0540b469d57c4d74c209ce4857afd Mon Sep 17 00:00:00 2001 From: Sandesh K Date: Tue, 29 Nov 2016 12:13:21 +0530 Subject: [PATCH 3/4] KAFKA-4432: Added record-size and payload-file to addMutuallyExclusiveGroup --- .../apache/kafka/tools/ProducerPerformance.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index e71b4e3755262..c277b83330767 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -25,6 +25,7 @@ import java.util.Properties; import java.util.Random; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -61,10 +62,6 @@ public static void main(String[] args) throws Exception { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } - if ((recordSize == null && payloadFilePath == null) || (recordSize != null && payloadFilePath != null)) { - throw new IllegalArgumentException("Either --record-size or --payload-file must be specified but not both."); - } - List payloadByteList = new ArrayList<>(); if (payloadFilePath != null) { Path path = Paths.get(payloadFilePath); @@ -148,6 +145,11 @@ private static ArgumentParser argParser() { .defaultHelp(true) .description("This tool is used to verify the producer performance."); + MutuallyExclusiveGroup payloadOptions = parser + .addMutuallyExclusiveGroup() + .required(true) + .description("either --record-size or --payload-file must be specified but not both."); + parser.addArgument("--topic") .action(store()) .required(true) @@ -163,7 +165,7 @@ private static ArgumentParser argParser() { .dest("numRecords") .help("number of messages to produce"); - parser.addArgument("--record-size") + payloadOptions.addArgument("--record-size") .action(store()) .required(false) .type(Integer.class) @@ -171,7 +173,7 @@ private static ArgumentParser argParser() { .dest("recordSize") .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); - parser.addArgument("--payload-file") + payloadOptions.addArgument("--payload-file") .action(store()) .required(false) .type(String.class) From 9e09e96e1f3f1802b9ce0ed23e786adc9c5f820b Mon Sep 17 00:00:00 2001 From: Sandesh K Date: Wed, 30 Nov 2016 12:58:15 +0530 Subject: [PATCH 4/4] Dummy commit to retrigger Jenkins job.