From c7657fd26f1086287b9bab0ed8aa69ae9f8bf87f Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Sun, 21 Jun 2020 23:35:59 +1000 Subject: [PATCH 1/8] KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter (rebased) --- .../scala/kafka/tools/ConsoleConsumer.scala | 171 ++++++++----- .../tools/DefaultMessageFormatterTest.scala | 226 ++++++++++++++++++ 2 files changed, 336 insertions(+), 61 deletions(-) create mode 100644 core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 4aa71c04ed2ad..6ba84be1e5be9 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -221,16 +221,22 @@ object ConsoleConsumer extends Logging { .ofType(classOf[String]) .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property", - "The properties to initialize the message formatter. Default properties include:\n" + - "\tprint.timestamp=true|false\n" + - "\tprint.key=true|false\n" + - "\tprint.value=true|false\n" + - "\tkey.separator=\n" + - "\tline.separator=\n" + - "\tkey.deserializer=\n" + - "\tvalue.deserializer=\n" + - "\nUsers can also pass in customized properties for their formatter; more specifically, users " + - "can pass in properties keyed with \'key.deserializer.\' and \'value.deserializer.\' prefixes to configure their deserializers.") + """The properties to initialize the message formatter. Default properties include: + | print.timestamp=true|false + | print.key=true|false + | print.offset=true|false + | print.partition=true|false + | print.headers=true|false + | print.value=true|false + | key.separator= + | line.separator= + | headers.separator= + | key.deserializer= + | value.deserializer= + | header.deserializer= + | + |Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.""" + .stripMargin) .withRequiredArg .describedAs("prop") .ofType(classOf[String]) @@ -459,48 +465,30 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var printValue = true var printPartition = false - var keySeparator = "\t".getBytes(StandardCharsets.UTF_8) - var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8) + var printOffset = false + var printHeaders = false + var keySeparator = utfBytes("\t") + var lineSeparator = utfBytes("\n") + var headersSeparator = utfBytes(",") var keyDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None - - override def configure(configs: Map[String, _]): Unit = { - val props = new java.util.Properties() - configs.asScala.forKeyValue { (key, value) => props.put(key, value.toString) } - if (props.containsKey("print.timestamp")) - printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") - if (props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true") - if (props.containsKey("print.value")) - printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true") - if (props.containsKey("print.partition")) - printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true") - if (props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8) - if (props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) - // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("key.deserializer")) { - keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor() - .newInstance().asInstanceOf[Deserializer[_]]) - keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true) - } - // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("value.deserializer")) { - valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor() - .newInstance().asInstanceOf[Deserializer[_]]) - valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false) - } - } - - private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { - val newProps = new Properties() - props.asScala.forKeyValue { (key, value) => - if (key.startsWith(prefix) && key.length > prefix.length) - newProps.put(key.substring(prefix.length), value) - } - newProps + var headersDeserializer: Option[Deserializer[_]] = None + + override def init(props: Properties): Unit = { + getPropertyIfExists(props, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) + getPropertyIfExists(props, "print.key", getBoolProperty).foreach(printKey = _) + getPropertyIfExists(props, "print.offset", getBoolProperty).foreach(printOffset = _) + getPropertyIfExists(props, "print.partition", getBoolProperty).foreach(printPartition = _) + getPropertyIfExists(props, "print.headers", getBoolProperty).foreach(printHeaders = _) + getPropertyIfExists(props, "print.value", getBoolProperty).foreach(printValue = _) + getPropertyIfExists(props, "key.separator", getByteProperty).foreach(keySeparator = _) + getPropertyIfExists(props, "line.separator", getByteProperty).foreach(lineSeparator = _) + getPropertyIfExists(props, "headers.separator", getByteProperty).foreach(headersSeparator = _) + + keyDeserializer = getPropertyIfExists(props, "key.deserializer", getDeserializerProperty(true)) + valueDeserializer = getPropertyIfExists(props, "value.deserializer", getDeserializerProperty(false)) + headersDeserializer = getPropertyIfExists(props, "headers.deserializer", getDeserializerProperty(false)) } def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { @@ -513,37 +501,98 @@ class DefaultMessageFormatter extends MessageFormatter { } def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = { - val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8)) - val convertedBytes = deserializer.map(_.deserialize(topic, consumerRecord.headers, nonNullBytes).toString. - getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes) - output.write(convertedBytes) + output.write(deserialize(deserializer, sourceBytes, topic)) } import consumerRecord._ if (printTimestamp) { if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) - output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8)) + output.write(utfBytes(s"$timestampType:$timestamp")) else - output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8)) - writeSeparator(printKey || printValue) + output.write(utfBytes("NO_TIMESTAMP")) + writeSeparator(columnSeparator = printKey || printOffset || printPartition || printHeaders || printValue) } if (printKey) { write(keyDeserializer, key, topic) - writeSeparator(printValue) + writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printValue) } - if (printValue) { - write(valueDeserializer, value, topic) - writeSeparator(printPartition) + if (printOffset) { + output.write(utfBytes(offset().toString)) + writeSeparator(columnSeparator = printPartition || printHeaders || printValue) } if (printPartition) { - output.write(s"$partition".getBytes(StandardCharsets.UTF_8)) + output.write(utfBytes(partition().toString)) + writeSeparator(columnSeparator = printHeaders || printValue) + } + + if (printHeaders) { + val headersIt = headers().iterator.asScala + if (headersIt.hasNext) { + headersIt.foreach { header => + output.write(utfBytes(header.key() + ":")) + output.write(deserialize(headersDeserializer, header.value(), topic)) + if (headersIt.hasNext) { + output.write(headersSeparator) + } + } + } else { + output.write(utfBytes("NO_HEADERS")) + } + writeSeparator(columnSeparator = printValue) + } + + if (printValue) { + write(valueDeserializer, value, topic) output.write(lineSeparator) } } + + private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { + val newProps = new Properties() + props.asScala.foreach { case (key, value) => + if (key.startsWith(prefix) && key.length > prefix.length) + newProps.put(key.substring(prefix.length), value) + } + newProps + } + + private def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { + val nonNullBytes = Option(sourceBytes).getOrElse(utfBytes("null")) + val convertedBytes = deserializer + .map(d => utfBytes(d.deserialize(topic, nonNullBytes).toString)) + .getOrElse(nonNullBytes) + convertedBytes + } + + private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8) + + private def getByteProperty(props: Properties, key: String): Array[Byte] = { + utfBytes(props.getProperty(key)) + } + + private def getBoolProperty(props: Properties, key: String): Boolean = { + props.getProperty(key).trim.equalsIgnoreCase("true") + } + + private def getDeserializerProperty(isKey: Boolean)(props: Properties, propertyName: String): Deserializer[_] = { + val deserializer = Class.forName(props.getProperty(propertyName)).newInstance().asInstanceOf[Deserializer[_]] + val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", props) + .asScala + .asJava + deserializer.configure(deserializerConfig, isKey) + deserializer + } + + private def getPropertyIfExists[T](props: Properties, key: String, getter: (Properties, String) => T): Option[T] = { + if (props.containsKey(key)) + Some(getter(props, key)) + else + None + } } class LoggingMessageFormatter extends MessageFormatter with LazyLogging { diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala new file mode 100644 index 0000000000000..915663d88434c --- /dev/null +++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.tools + +import java.io.{ByteArrayOutputStream, Closeable, PrintStream} +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.tools.DefaultMessageFormatter +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.serialization.Deserializer +import org.junit.Assert._ +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters + +import scala.jdk.CollectionConverters._ + +@RunWith(value = classOf[Parameterized]) +class DefaultMessageFormatterTest(name: String, record: ConsumerRecord[Array[Byte], Array[Byte]], properties: Map[String, String], expected: String) { + import DefaultMessageFormatterTest._ + + @Test + def testWriteRecord()= { + withResource(new ByteArrayOutputStream()) { baos => + withResource(new PrintStream(baos)) { ps => + val formatter = buildFormatter(properties) + formatter.writeTo(record, ps) + val actual = new String(baos.toByteArray(), StandardCharsets.UTF_8) + assertEquals(expected, actual) + + } + } + } +} + +object DefaultMessageFormatterTest { + @Parameters(name = "Test {index} - {0}") + def parameters: java.util.Collection[Array[Object]] = { + Seq( + Array( + "print nothing", + consumerRecord(), + Map("print.value" -> "false"), + ""), + Array( + "print key", + consumerRecord(), + Map("print.key" -> "true", "print.value" -> "false"), + "someKey\n"), + Array( + "print value", + consumerRecord(), + Map(), + "someValue\n"), + Array( + "print empty timestamp", + consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE), + Map("print.timestamp" -> "true", "print.value" -> "false"), + "NO_TIMESTAMP\n"), + Array( + "print log append time timestamp", + consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME), + Map("print.timestamp" -> "true", "print.value" -> "false"), + "LogAppendTime:1234\n"), + Array( + "print create time timestamp", + consumerRecord(timestampType = TimestampType.CREATE_TIME), + Map("print.timestamp" -> "true", "print.value" -> "false"), + "CreateTime:1234\n"), + Array( + "print partition", + consumerRecord(), + Map("print.partition" -> "true", "print.value" -> "false"), + "9\n"), + Array( + "print offset", + consumerRecord(), + Map("print.offset" -> "true", "print.value" -> "false"), + "9876\n"), + Array( + "print headers", + consumerRecord(), + Map("print.headers" -> "true", "print.value" -> "false"), + "h1:v1,h2:v2\n"), + Array( + "print empty headers", + consumerRecord(headers = Nil), + Map("print.headers" -> "true", "print.value" -> "false"), + "NO_HEADERS\n"), + Array( + "print all possible fields with default delimiters", + consumerRecord(), + Map("print.key" -> "true", + "print.timestamp" -> "true", + "print.partition" -> "true", + "print.offset" -> "true", + "print.headers" -> "true", + "print.value" -> "true"), + "CreateTime:1234\tsomeKey\t9876\t9\th1:v1,h2:v2\tsomeValue\n"), + Array( + "print all possible fields with custom delimiters", + consumerRecord(), + Map( + "key.separator" -> "|", + "line.separator" -> "^", + "headers.separator" -> "#", + "print.key" -> "true", + "print.timestamp" -> "true", + "print.partition" -> "true", + "print.offset" -> "true", + "print.headers" -> "true", + "print.value" -> "true"), + "CreateTime:1234|someKey|9876|9|h1:v1#h2:v2|someValue^"), + Array( + "print key with custom deserializer", + consumerRecord(), + Map( + "print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "key.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + "SOMEKEY\th1:v1,h2:v2\tsomeValue\n"), + Array( + "print value with custom deserializer", + consumerRecord(), + Map( + "print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "value.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + "someKey\th1:v1,h2:v2\tSOMEVALUE\n"), + Array( + "print headers with custom deserializer", + consumerRecord(), + Map( + "print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "headers.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + "someKey\th1:V1,h2:V2\tsomeValue\n"), + Array( + "print key and value", + consumerRecord(), + Map("print.key" -> "true", "print.value" -> "true"), + "someKey\tsomeValue\n"), + Array( + "print fields in the beginning, middle and the end", + consumerRecord(), + Map("print.key" -> "true", "print.value" -> "true", "print.partition" -> "true"), + "someKey\t9\tsomeValue\n") + ).asJava + } + + private def buildFormatter(propsToSet: Map[String, String]): DefaultMessageFormatter = { + val properties = new Properties() + //putAll doesn't work on java 9 - https://github.com/scala/bug/issues/10418 + propsToSet.foreach { case (k, v) => + properties.setProperty(k, v) + } + val formatter = new DefaultMessageFormatter() + formatter.init(properties) + formatter + } + + + private def header(key: String, value: String) = { + new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8)) + } + + private def consumerRecord(key: String = "someKey", + value: String = "someValue", + headers: Iterable[Header] = Seq(header("h1", "v1"), header("h2", "v2")), + partition: Int = 9, + offset: Long = 9876, + timestamp: Long = 1234, + timestampType: TimestampType = TimestampType.CREATE_TIME) = { + new ConsumerRecord[Array[Byte], Array[Byte]]( + "someTopic", + partition, + offset, + timestamp, + timestampType, + 0L, + 0, + 0, + key.getBytes(StandardCharsets.UTF_8), + value.getBytes(StandardCharsets.UTF_8), + new RecordHeaders(headers.asJava)) + + } + + private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = { + try { + handler(resource) + } finally { + resource.close() + } + } +} + +class UpperCaseDeserializer extends Deserializer[String] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} + override def deserialize(topic: String, data: Array[Byte]): String = new String(data, StandardCharsets.UTF_8).toUpperCase + override def close(): Unit = {} +} \ No newline at end of file From d49fe77067e545a8d6927821ead08382f9482b5c Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Sun, 19 Jul 2020 16:56:23 +1000 Subject: [PATCH 2/8] added tag for partition and offset to make them clearer --- .../scala/kafka/tools/ConsoleConsumer.scala | 22 ++++++++++--------- .../tools/DefaultMessageFormatterTest.scala | 16 +++++++------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 6ba84be1e5be9..9fbb2d8e54675 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -511,22 +511,19 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(utfBytes(s"$timestampType:$timestamp")) else output.write(utfBytes("NO_TIMESTAMP")) - writeSeparator(columnSeparator = printKey || printOffset || printPartition || printHeaders || printValue) + writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printKey || printValue) } - if (printKey) { - write(keyDeserializer, key, topic) - writeSeparator(columnSeparator = printOffset || printPartition || printHeaders || printValue) + if (printPartition) { + output.write(utfBytes("Partition:")) + output.write(utfBytes(partition().toString)) + writeSeparator(columnSeparator = printOffset || printHeaders || printKey || printValue) } if (printOffset) { + output.write(utfBytes("Offset:")) output.write(utfBytes(offset().toString)) - writeSeparator(columnSeparator = printPartition || printHeaders || printValue) - } - - if (printPartition) { - output.write(utfBytes(partition().toString)) - writeSeparator(columnSeparator = printHeaders || printValue) + writeSeparator(columnSeparator = printHeaders || printKey || printValue) } if (printHeaders) { @@ -542,6 +539,11 @@ class DefaultMessageFormatter extends MessageFormatter { } else { output.write(utfBytes("NO_HEADERS")) } + writeSeparator(columnSeparator = printKey || printValue) + } + + if (printKey) { + write(keyDeserializer, key, topic) writeSeparator(columnSeparator = printValue) } diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala index 915663d88434c..7143bc0a86488 100644 --- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala +++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala @@ -92,12 +92,12 @@ object DefaultMessageFormatterTest { "print partition", consumerRecord(), Map("print.partition" -> "true", "print.value" -> "false"), - "9\n"), + "Partition:9\n"), Array( "print offset", consumerRecord(), Map("print.offset" -> "true", "print.value" -> "false"), - "9876\n"), + "Offset:9876\n"), Array( "print headers", consumerRecord(), @@ -117,7 +117,7 @@ object DefaultMessageFormatterTest { "print.offset" -> "true", "print.headers" -> "true", "print.value" -> "true"), - "CreateTime:1234\tsomeKey\t9876\t9\th1:v1,h2:v2\tsomeValue\n"), + "CreateTime:1234\tPartition:9\tOffset:9876\th1:v1,h2:v2\tsomeKey\tsomeValue\n"), Array( "print all possible fields with custom delimiters", consumerRecord(), @@ -131,7 +131,7 @@ object DefaultMessageFormatterTest { "print.offset" -> "true", "print.headers" -> "true", "print.value" -> "true"), - "CreateTime:1234|someKey|9876|9|h1:v1#h2:v2|someValue^"), + "CreateTime:1234|Partition:9|Offset:9876|h1:v1#h2:v2|someKey|someValue^"), Array( "print key with custom deserializer", consumerRecord(), @@ -140,7 +140,7 @@ object DefaultMessageFormatterTest { "print.headers" -> "true", "print.value" -> "true", "key.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), - "SOMEKEY\th1:v1,h2:v2\tsomeValue\n"), + "h1:v1,h2:v2\tSOMEKEY\tsomeValue\n"), Array( "print value with custom deserializer", consumerRecord(), @@ -149,7 +149,7 @@ object DefaultMessageFormatterTest { "print.headers" -> "true", "print.value" -> "true", "value.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), - "someKey\th1:v1,h2:v2\tSOMEVALUE\n"), + "h1:v1,h2:v2\tsomeKey\tSOMEVALUE\n"), Array( "print headers with custom deserializer", consumerRecord(), @@ -158,7 +158,7 @@ object DefaultMessageFormatterTest { "print.headers" -> "true", "print.value" -> "true", "headers.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), - "someKey\th1:V1,h2:V2\tsomeValue\n"), + "h1:V1,h2:V2\tsomeKey\tsomeValue\n"), Array( "print key and value", consumerRecord(), @@ -168,7 +168,7 @@ object DefaultMessageFormatterTest { "print fields in the beginning, middle and the end", consumerRecord(), Map("print.key" -> "true", "print.value" -> "true", "print.partition" -> "true"), - "someKey\t9\tsomeValue\n") + "Partition:9\tsomeKey\tsomeValue\n") ).asJava } From 7632ac7f70e779db0b32698fa41b22b259b40abf Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Wed, 29 Jul 2020 22:09:41 +1000 Subject: [PATCH 3/8] added code and test for null.literal property --- .../scala/kafka/tools/ConsoleConsumer.scala | 29 ++++++++++--------- .../tools/DefaultMessageFormatterTest.scala | 19 ++++++++---- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 9fbb2d8e54675..f8f2b8309c79a 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -222,18 +222,19 @@ object ConsoleConsumer extends Logging { .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property", """The properties to initialize the message formatter. Default properties include: - | print.timestamp=true|false - | print.key=true|false - | print.offset=true|false - | print.partition=true|false - | print.headers=true|false - | print.value=true|false - | key.separator= - | line.separator= - | headers.separator= - | key.deserializer= - | value.deserializer= - | header.deserializer= + | print.timestamp=true|false + | print.key=true|false + | print.offset=true|false + | print.partition=true|false + | print.headers=true|false + | print.value=true|false + | key.separator= + | line.separator= + | headers.separator= + | null.literal= + | key.deserializer= + | value.deserializer= + | header.deserializer= | |Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.""" .stripMargin) @@ -470,6 +471,7 @@ class DefaultMessageFormatter extends MessageFormatter { var keySeparator = utfBytes("\t") var lineSeparator = utfBytes("\n") var headersSeparator = utfBytes(",") + var nullLiteral = utfBytes("null") var keyDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None @@ -485,6 +487,7 @@ class DefaultMessageFormatter extends MessageFormatter { getPropertyIfExists(props, "key.separator", getByteProperty).foreach(keySeparator = _) getPropertyIfExists(props, "line.separator", getByteProperty).foreach(lineSeparator = _) getPropertyIfExists(props, "headers.separator", getByteProperty).foreach(headersSeparator = _) + getPropertyIfExists(props, "null.literal", getByteProperty).foreach(nullLiteral = _) keyDeserializer = getPropertyIfExists(props, "key.deserializer", getDeserializerProperty(true)) valueDeserializer = getPropertyIfExists(props, "value.deserializer", getDeserializerProperty(false)) @@ -563,7 +566,7 @@ class DefaultMessageFormatter extends MessageFormatter { } private def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { - val nonNullBytes = Option(sourceBytes).getOrElse(utfBytes("null")) + val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) val convertedBytes = deserializer .map(d => utfBytes(d.deserialize(topic, nonNullBytes).toString)) .getOrElse(nonNullBytes) diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala index 7143bc0a86488..8c6473970cb69 100644 --- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala +++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala @@ -168,7 +168,17 @@ object DefaultMessageFormatterTest { "print fields in the beginning, middle and the end", consumerRecord(), Map("print.key" -> "true", "print.value" -> "true", "print.partition" -> "true"), - "Partition:9\tsomeKey\tsomeValue\n") + "Partition:9\tsomeKey\tsomeValue\n"), + Array( + "null value without custom null literal", + consumerRecord(value = null), + Map("print.key" -> "true"), + "someKey\tnull\n"), + Array( + "null value with custom null literal", + consumerRecord(value = null), + Map("print.key" -> "true", "null.literal" -> "NULL"), + "someKey\tNULL\n"), ).asJava } @@ -204,10 +214,9 @@ object DefaultMessageFormatterTest { 0L, 0, 0, - key.getBytes(StandardCharsets.UTF_8), - value.getBytes(StandardCharsets.UTF_8), + if (key == null) null else key.getBytes(StandardCharsets.UTF_8), + if (value == null) null else value.getBytes(StandardCharsets.UTF_8), new RecordHeaders(headers.asJava)) - } private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = { @@ -223,4 +232,4 @@ class UpperCaseDeserializer extends Deserializer[String] { override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} override def deserialize(topic: String, data: Array[Byte]): String = new String(data, StandardCharsets.UTF_8).toUpperCase override def close(): Unit = {} -} \ No newline at end of file +} From eb015f588de17ee33ee412ecdad9239063ce71c2 Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Tue, 4 Aug 2020 23:13:15 +1000 Subject: [PATCH 4/8] Changed "init(props:Properties)" method to "configure(configs:Map[String,_])" method --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 5 ++++- .../scala/kafka/tools/DefaultMessageFormatterTest.scala | 8 +------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index f8f2b8309c79a..b75cceaac7d4f 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -477,7 +477,10 @@ class DefaultMessageFormatter extends MessageFormatter { var valueDeserializer: Option[Deserializer[_]] = None var headersDeserializer: Option[Deserializer[_]] = None - override def init(props: Properties): Unit = { + override def configure(configs: Map[String, _]): Unit = { + val props = new java.util.Properties() + configs.asScala.foreach { case (key, value) => props.put(key, value.toString) } + getPropertyIfExists(props, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) getPropertyIfExists(props, "print.key", getBoolProperty).foreach(printKey = _) getPropertyIfExists(props, "print.offset", getBoolProperty).foreach(printOffset = _) diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala index 8c6473970cb69..c3e0c19d49985 100644 --- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala +++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala @@ -20,7 +20,6 @@ package unit.kafka.tools import java.io.{ByteArrayOutputStream, Closeable, PrintStream} import java.nio.charset.StandardCharsets import java.util -import java.util.Properties import kafka.tools.DefaultMessageFormatter import org.apache.kafka.clients.consumer.ConsumerRecord @@ -183,13 +182,8 @@ object DefaultMessageFormatterTest { } private def buildFormatter(propsToSet: Map[String, String]): DefaultMessageFormatter = { - val properties = new Properties() - //putAll doesn't work on java 9 - https://github.com/scala/bug/issues/10418 - propsToSet.foreach { case (k, v) => - properties.setProperty(k, v) - } val formatter = new DefaultMessageFormatter() - formatter.init(properties) + formatter.configure(propsToSet.asJava) formatter } From 1f82b9f53cd5b45b573c15bb1acdce77b7c466fe Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Tue, 4 Aug 2020 23:21:31 +1000 Subject: [PATCH 5/8] Fix style in the test class --- .../tools/DefaultMessageFormatterTest.scala | 90 ++++++++++--------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala index c3e0c19d49985..27442856515ab 100644 --- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala +++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala @@ -65,7 +65,8 @@ object DefaultMessageFormatterTest { Array( "print key", consumerRecord(), - Map("print.key" -> "true", "print.value" -> "false"), + Map("print.key" -> "true", + "print.value" -> "false"), "someKey\n"), Array( "print value", @@ -75,98 +76,104 @@ object DefaultMessageFormatterTest { Array( "print empty timestamp", consumerRecord(timestampType = TimestampType.NO_TIMESTAMP_TYPE), - Map("print.timestamp" -> "true", "print.value" -> "false"), + Map("print.timestamp" -> "true", + "print.value" -> "false"), "NO_TIMESTAMP\n"), Array( "print log append time timestamp", consumerRecord(timestampType = TimestampType.LOG_APPEND_TIME), - Map("print.timestamp" -> "true", "print.value" -> "false"), + Map("print.timestamp" -> "true", + "print.value" -> "false"), "LogAppendTime:1234\n"), Array( "print create time timestamp", consumerRecord(timestampType = TimestampType.CREATE_TIME), - Map("print.timestamp" -> "true", "print.value" -> "false"), + Map("print.timestamp" -> "true", + "print.value" -> "false"), "CreateTime:1234\n"), Array( "print partition", consumerRecord(), - Map("print.partition" -> "true", "print.value" -> "false"), + Map("print.partition" -> "true", + "print.value" -> "false"), "Partition:9\n"), Array( "print offset", consumerRecord(), - Map("print.offset" -> "true", "print.value" -> "false"), + Map("print.offset" -> "true", + "print.value" -> "false"), "Offset:9876\n"), Array( "print headers", consumerRecord(), - Map("print.headers" -> "true", "print.value" -> "false"), + Map("print.headers" -> "true", + "print.value" -> "false"), "h1:v1,h2:v2\n"), Array( "print empty headers", consumerRecord(headers = Nil), - Map("print.headers" -> "true", "print.value" -> "false"), + Map("print.headers" -> "true", + "print.value" -> "false"), "NO_HEADERS\n"), Array( "print all possible fields with default delimiters", consumerRecord(), Map("print.key" -> "true", - "print.timestamp" -> "true", - "print.partition" -> "true", - "print.offset" -> "true", - "print.headers" -> "true", - "print.value" -> "true"), + "print.timestamp" -> "true", + "print.partition" -> "true", + "print.offset" -> "true", + "print.headers" -> "true", + "print.value" -> "true"), "CreateTime:1234\tPartition:9\tOffset:9876\th1:v1,h2:v2\tsomeKey\tsomeValue\n"), Array( "print all possible fields with custom delimiters", consumerRecord(), - Map( - "key.separator" -> "|", - "line.separator" -> "^", - "headers.separator" -> "#", - "print.key" -> "true", - "print.timestamp" -> "true", - "print.partition" -> "true", - "print.offset" -> "true", - "print.headers" -> "true", - "print.value" -> "true"), + Map("key.separator" -> "|", + "line.separator" -> "^", + "headers.separator" -> "#", + "print.key" -> "true", + "print.timestamp" -> "true", + "print.partition" -> "true", + "print.offset" -> "true", + "print.headers" -> "true", + "print.value" -> "true"), "CreateTime:1234|Partition:9|Offset:9876|h1:v1#h2:v2|someKey|someValue^"), Array( "print key with custom deserializer", consumerRecord(), - Map( - "print.key" -> "true", - "print.headers" -> "true", - "print.value" -> "true", - "key.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + Map("print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "key.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), "h1:v1,h2:v2\tSOMEKEY\tsomeValue\n"), Array( "print value with custom deserializer", consumerRecord(), - Map( - "print.key" -> "true", - "print.headers" -> "true", - "print.value" -> "true", - "value.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + Map("print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "value.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), "h1:v1,h2:v2\tsomeKey\tSOMEVALUE\n"), Array( "print headers with custom deserializer", consumerRecord(), - Map( - "print.key" -> "true", - "print.headers" -> "true", - "print.value" -> "true", - "headers.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), + Map("print.key" -> "true", + "print.headers" -> "true", + "print.value" -> "true", + "headers.deserializer" -> "unit.kafka.tools.UpperCaseDeserializer"), "h1:V1,h2:V2\tsomeKey\tsomeValue\n"), Array( "print key and value", consumerRecord(), - Map("print.key" -> "true", "print.value" -> "true"), + Map("print.key" -> "true", + "print.value" -> "true"), "someKey\tsomeValue\n"), Array( "print fields in the beginning, middle and the end", consumerRecord(), - Map("print.key" -> "true", "print.value" -> "true", "print.partition" -> "true"), + Map("print.key" -> "true", + "print.value" -> "true", + "print.partition" -> "true"), "Partition:9\tsomeKey\tsomeValue\n"), Array( "null value without custom null literal", @@ -176,7 +183,8 @@ object DefaultMessageFormatterTest { Array( "null value with custom null literal", consumerRecord(value = null), - Map("print.key" -> "true", "null.literal" -> "NULL"), + Map("print.key" -> "true", + "null.literal" -> "NULL"), "someKey\tNULL\n"), ).asJava } From 35a759740a0dd95f3ffbbcf1791d7129f9c867ad Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Wed, 5 Aug 2020 00:38:27 +1000 Subject: [PATCH 6/8] Modify DefaultMessageFormatter to get configs from Map instead of Properties --- .../scala/kafka/tools/ConsoleConsumer.scala | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index b75cceaac7d4f..779e59f559bca 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -481,20 +481,20 @@ class DefaultMessageFormatter extends MessageFormatter { val props = new java.util.Properties() configs.asScala.foreach { case (key, value) => props.put(key, value.toString) } - getPropertyIfExists(props, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) - getPropertyIfExists(props, "print.key", getBoolProperty).foreach(printKey = _) - getPropertyIfExists(props, "print.offset", getBoolProperty).foreach(printOffset = _) - getPropertyIfExists(props, "print.partition", getBoolProperty).foreach(printPartition = _) - getPropertyIfExists(props, "print.headers", getBoolProperty).foreach(printHeaders = _) - getPropertyIfExists(props, "print.value", getBoolProperty).foreach(printValue = _) - getPropertyIfExists(props, "key.separator", getByteProperty).foreach(keySeparator = _) - getPropertyIfExists(props, "line.separator", getByteProperty).foreach(lineSeparator = _) - getPropertyIfExists(props, "headers.separator", getByteProperty).foreach(headersSeparator = _) - getPropertyIfExists(props, "null.literal", getByteProperty).foreach(nullLiteral = _) - - keyDeserializer = getPropertyIfExists(props, "key.deserializer", getDeserializerProperty(true)) - valueDeserializer = getPropertyIfExists(props, "value.deserializer", getDeserializerProperty(false)) - headersDeserializer = getPropertyIfExists(props, "headers.deserializer", getDeserializerProperty(false)) + getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) + getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _) + getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _) + getPropertyIfExists(configs, "print.partition", getBoolProperty).foreach(printPartition = _) + getPropertyIfExists(configs, "print.headers", getBoolProperty).foreach(printHeaders = _) + getPropertyIfExists(configs, "print.value", getBoolProperty).foreach(printValue = _) + getPropertyIfExists(configs, "key.separator", getByteProperty).foreach(keySeparator = _) + getPropertyIfExists(configs, "line.separator", getByteProperty).foreach(lineSeparator = _) + getPropertyIfExists(configs, "headers.separator", getByteProperty).foreach(headersSeparator = _) + getPropertyIfExists(configs, "null.literal", getByteProperty).foreach(nullLiteral = _) + + keyDeserializer = getPropertyIfExists(configs, "key.deserializer", getDeserializerProperty(true)) + valueDeserializer = getPropertyIfExists(configs, "value.deserializer", getDeserializerProperty(false)) + headersDeserializer = getPropertyIfExists(configs, "headers.deserializer", getDeserializerProperty(false)) } def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { @@ -559,13 +559,13 @@ class DefaultMessageFormatter extends MessageFormatter { } } - private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { - val newProps = new Properties() - props.asScala.foreach { case (key, value) => + private def propertiesWithKeyPrefixStripped(prefix: String, configs: Map[String, _]): Map[String, _] = { + val newConfigs = collection.mutable.Map[String, Any]() + configs.asScala.foreach { case (key, value) => if (key.startsWith(prefix) && key.length > prefix.length) - newProps.put(key.substring(prefix.length), value) + newConfigs.put(key.substring(prefix.length), value) } - newProps + newConfigs.asJava } private def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { @@ -578,26 +578,26 @@ class DefaultMessageFormatter extends MessageFormatter { private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8) - private def getByteProperty(props: Properties, key: String): Array[Byte] = { - utfBytes(props.getProperty(key)) + private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = { + utfBytes(configs.get(key).asInstanceOf[String]) } - private def getBoolProperty(props: Properties, key: String): Boolean = { - props.getProperty(key).trim.equalsIgnoreCase("true") + private def getBoolProperty(configs: Map[String, _], key: String): Boolean = { + configs.get(key).asInstanceOf[String].trim.equalsIgnoreCase("true") } - private def getDeserializerProperty(isKey: Boolean)(props: Properties, propertyName: String): Deserializer[_] = { - val deserializer = Class.forName(props.getProperty(propertyName)).newInstance().asInstanceOf[Deserializer[_]] - val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", props) + private def getDeserializerProperty(isKey: Boolean)(configs: Map[String, _], propertyName: String): Deserializer[_] = { + val deserializer = Class.forName(configs.get(propertyName).asInstanceOf[String]).newInstance().asInstanceOf[Deserializer[_]] + val deserializerConfig = propertiesWithKeyPrefixStripped(propertyName + ".", configs) .asScala .asJava deserializer.configure(deserializerConfig, isKey) deserializer } - private def getPropertyIfExists[T](props: Properties, key: String, getter: (Properties, String) => T): Option[T] = { - if (props.containsKey(key)) - Some(getter(props, key)) + private def getPropertyIfExists[T](configs: Map[String, _], key: String, getter: (Map[String, _], String) => T): Option[T] = { + if (configs.containsKey(key)) + Some(getter(configs, key)) else None } From b112a26455e5077f4f3ec411f082533b870faead Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Wed, 5 Aug 2020 00:43:11 +1000 Subject: [PATCH 7/8] remove unused props variable --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 779e59f559bca..ad88cd6a38837 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -478,9 +478,6 @@ class DefaultMessageFormatter extends MessageFormatter { var headersDeserializer: Option[Deserializer[_]] = None override def configure(configs: Map[String, _]): Unit = { - val props = new java.util.Properties() - configs.asScala.foreach { case (key, value) => props.put(key, value.toString) } - getPropertyIfExists(configs, "print.timestamp", getBoolProperty).foreach(printTimestamp = _) getPropertyIfExists(configs, "print.key", getBoolProperty).foreach(printKey = _) getPropertyIfExists(configs, "print.offset", getBoolProperty).foreach(printOffset = _) From 4f1c2f880ed3eb33e7f5e9b6eb7abe94df94856c Mon Sep 17 00:00:00 2001 From: Badai Aqrandista Date: Sat, 22 Aug 2020 23:27:21 +1000 Subject: [PATCH 8/8] Fix error thrown by ConsoleConsumerTest and CustomDeserializerTest --- .../scala/kafka/tools/ConsoleConsumer.scala | 20 ++++++++----------- .../kafka/tools/ConsoleConsumerTest.scala | 12 ++++++++--- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index ad88cd6a38837..4de15b9f373f5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -503,8 +503,12 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(lineSeparator) } - def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = { - output.write(deserialize(deserializer, sourceBytes, topic)) + def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { + val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) + val convertedBytes = deserializer + .map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString)) + .getOrElse(nonNullBytes) + convertedBytes } import consumerRecord._ @@ -546,12 +550,12 @@ class DefaultMessageFormatter extends MessageFormatter { } if (printKey) { - write(keyDeserializer, key, topic) + output.write(deserialize(keyDeserializer, key, topic)) writeSeparator(columnSeparator = printValue) } if (printValue) { - write(valueDeserializer, value, topic) + output.write(deserialize(valueDeserializer, value, topic)) output.write(lineSeparator) } } @@ -565,14 +569,6 @@ class DefaultMessageFormatter extends MessageFormatter { newConfigs.asJava } - private def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = { - val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral) - val convertedBytes = deserializer - .map(d => utfBytes(d.deserialize(topic, nonNullBytes).toString)) - .getOrElse(nonNullBytes) - convertedBytes - } - private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8) private def getByteProperty(configs: Map[String, _], key: String): Array[Byte] = { diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index e930ad5644d79..8387201f07398 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -518,18 +518,24 @@ class ConsoleConsumerTest { formatter.configure(configs) out = new ByteArrayOutputStream() formatter.writeTo(record, new PrintStream(out)) - assertEquals("key\tvalue\t0\n", out.toString) + assertEquals("Partition:0\tkey\tvalue\n", out.toString) configs.put("print.timestamp", "true") formatter.configure(configs) out = new ByteArrayOutputStream() formatter.writeTo(record, new PrintStream(out)) - assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString) + assertEquals("NO_TIMESTAMP\tPartition:0\tkey\tvalue\n", out.toString) + + configs.put("print.offset", "true") + formatter.configure(configs) + out = new ByteArrayOutputStream() + formatter.writeTo(record, new PrintStream(out)) + assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) out = new ByteArrayOutputStream() val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes) formatter.writeTo(record2, new PrintStream(out)) - assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString) + assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString) formatter.close() }