From b9ec53665691e9ea5121030ead677a2ed85c99ca Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Sun, 12 Jan 2020 11:47:41 -0500 Subject: [PATCH 1/2] KAFKA-9408: Use StandardCharsets UTF-8 in ApiUtils --- core/src/main/scala/kafka/api/ApiUtils.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala index 1644358f7bb3a..9be1e4bb87467 100644 --- a/core/src/main/scala/kafka/api/ApiUtils.scala +++ b/core/src/main/scala/kafka/api/ApiUtils.scala @@ -17,6 +17,7 @@ package kafka.api import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import org.apache.kafka.common.KafkaException @@ -24,8 +25,6 @@ import org.apache.kafka.common.KafkaException * Helper functions specific to parsing or serializing requests and responses */ object ApiUtils { - - val ProtocolEncoding = "UTF-8" /** * Read size prefixed string where the size is stored as a 2 byte short. @@ -37,7 +36,7 @@ object ApiUtils { return null val bytes = new Array[Byte](size) buffer.get(bytes) - new String(bytes, ProtocolEncoding) + new String(bytes, StandardCharsets.UTF_8) } /** @@ -49,7 +48,7 @@ object ApiUtils { if(string == null) { buffer.putShort(-1) } else { - val encodedString = string.getBytes(ProtocolEncoding) + val encodedString = string.getBytes(StandardCharsets.UTF_8) if(encodedString.length > Short.MaxValue) { throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") } else { @@ -67,7 +66,7 @@ object ApiUtils { if(string == null) { 2 } else { - val encodedString = string.getBytes(ProtocolEncoding) + val encodedString = string.getBytes(StandardCharsets.UTF_8) if(encodedString.length > Short.MaxValue) { throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") } else { From 4f71c9d6c7e59c3b587645efd6cd9da0398c3ff0 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Mon, 13 Jan 2020 12:42:51 -0500 Subject: [PATCH 2/2] Add StandardCharset to Test classes as well --- .../scala/integration/kafka/api/TransactionsTest.scala | 9 +++++---- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 3 ++- .../test/scala/unit/kafka/network/SocketServerTest.scala | 3 ++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 63bfc9f4639c4..14d59b1184038 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -18,6 +18,7 @@ package kafka.api import java.lang.{Long => JLong} +import java.nio.charset.StandardCharsets import java.time.Duration import java.util.{Optional, Properties} import java.util.concurrent.TimeUnit @@ -258,8 +259,8 @@ class TransactionsTest extends KafkaServerTestHarness { shouldCommit = !shouldCommit records.foreach { record => - val key = new String(record.key(), "UTF-8") - val value = new String(record.value(), "UTF-8") + val key = new String(record.key(), StandardCharsets.UTF_8) + val value = new String(record.value(), StandardCharsets.UTF_8) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit)) } @@ -267,11 +268,11 @@ class TransactionsTest extends KafkaServerTestHarness { if (shouldCommit) { producer.commitTransaction() recordsProcessed += records.size - debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + + debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), StandardCharsets.UTF_8)}. Num " + s"records written to $topic2: $recordsProcessed") } else { producer.abortTransaction() - debug(s"aborted transaction Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + + debug(s"aborted transaction Last committed record: ${new String(records.last.value(), StandardCharsets.UTF_8)}. Num " + s"records written to $topic2: $recordsProcessed") TestUtils.resetToCommittedPositions(consumer) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index bb30287bb12bd..61d7a539673cf 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io.{File, RandomAccessFile} import java.nio._ +import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Properties import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -1681,7 +1682,7 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { var lastOffset = -1L private def keyFor(key: ByteBuffer) = - new String(Utils.readBytes(key.duplicate), "UTF-8") + new String(Utils.readBytes(key.duplicate), StandardCharsets.UTF_8) override def put(key: ByteBuffer, offset: Long): Unit = { lastOffset = offset diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 33dada06c235a..c8618872a81f7 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -21,6 +21,7 @@ import java.io._ import java.net._ import java.nio.ByteBuffer import java.nio.channels.SocketChannel +import java.nio.charset.StandardCharsets import java.util.concurrent.{CompletableFuture, Executors} import java.util.{HashMap, Properties, Random} @@ -823,7 +824,7 @@ class SocketServerTest { receiveResponse(socket) // now send credentials - val authBytes = "admin\u0000admin\u0000admin-secret".getBytes("UTF-8") + val authBytes = "admin\u0000admin\u0000admin-secret".getBytes(StandardCharsets.UTF_8) if (leverageKip152SaslAuthenticateRequest) { // send credentials within a SaslAuthenticateRequest val saslAuthenticateRequest = new SaslAuthenticateRequest.Builder(new SaslAuthenticateRequestData()