From b856d3a6fdf9f859bcb116780e870b6b6843a269 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sun, 30 Jan 2022 11:02:03 -0800 Subject: [PATCH 1/9] KAFKA-13629: use intrinsics for ByteUtils sizeOfXxx algorithm --- .../apache/kafka/common/utils/ByteUtils.java | 41 ++++++--- .../kafka/common/utils/ByteUtilsTest.java | 38 +++++++++ .../kafka/jmh/util/ByteUtilsBenchmark.java | 83 +++++++++++++++++++ 3 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java index 15868721da9ea..8a71a5186710b 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } + final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { + // 32 bits, and each 7-bits adds one byte to the output + 5, 5, 5, 5, // 32 + 4, 4, 4, 4, 4, 4, 4, // 28 + 3, 3, 3, 3, 3, 3, 3, // 21 + 2, 2, 2, 2, 2, 2, 2, // 14 + 1, 1, 1, 1, 1, 1, 1, // 7 + 1 // 0 + }; + + final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { + // 64 bits, and each 7-bits adds one byte to the output + 10, // 64 + 9, 9, 9, 9, 9, 9, 9, // 63 + 8, 8, 8, 8, 8, 8, 8, // 56 + 7, 7, 7, 7, 7, 7, 7, // 49 + 6, 6, 6, 6, 6, 6, 6, // 42 + 5, 5, 5, 5, 5, 5, 5, // 35 + 4, 4, 4, 4, 4, 4, 4, // 28 + 3, 3, 3, 3, 3, 3, 3, // 21 + 2, 2, 2, 2, 2, 2, 2, // 14 + 1, 1, 1, 1, 1, 1, 1, // 7 + 1 // 0 + }; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { - int bytes = 1; - while ((value & 0xffffff80) != 0L) { - bytes += 1; - value >>>= 7; - } - return bytes; + int leadingZeros = Integer.numberOfLeadingZeros(value); + return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; } /** @@ -416,12 +437,8 @@ public static int sizeOfVarint(int value) { */ public static int sizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); - int bytes = 1; - while ((v & 0xffffffffffffff80L) != 0L) { - bytes += 1; - v >>>= 7; - } - return bytes; + int leadingZeros = Long.numberOfLeadingZeros(v); + return LEADING_ZEROS_TO_U_VARLONG_SIZE[leadingZeros]; } private static IllegalArgumentException illegalVarintException(int value) { diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java index 8f432f7632353..4f6fa1ce0218b 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java @@ -239,6 +239,44 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L); } + private static int oldSizeOfUnsignedVarint(int value) { + int bytes = 1; + // use highestOneBit or numberOfLeadingZeros + while ((value & 0xffffff80) != 0L) { + bytes += 1; + value >>>= 7; + } + return bytes; + } + + @Test + public void testSizeOfUnsignedVarint() { + for (int i = 0; i < Integer.MAX_VALUE; i++) { + final int expected = oldSizeOfUnsignedVarint(i); + final int actual = ByteUtils.sizeOfUnsignedVarint(i); + assertEquals(expected, actual); + } + } + + private static int oldSizeOfVarlong(long value) { + long v = (value << 1) ^ (value >> 63); + int bytes = 1; + while ((v & 0xffffffffffffff80L) != 0L) { + bytes += 1; + v >>>= 7; + } + return bytes; + } + + @Test + public void testSizeOfVarlong() { + for (long l = Integer.MIN_VALUE - 100; l <= Integer.MAX_VALUE + 100; l++) { + final int expected = oldSizeOfVarlong(l); + final int actual = ByteUtils.sizeOfVarlong(l); + assertEquals(expected, actual); + } + } + private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throws IOException { ByteBuffer buf = ByteBuffer.allocate(32); ByteUtils.writeUnsignedVarint(value, buf); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java new file mode 100644 index 0000000000000..ff2fdc948788c --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.utils.ByteUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ByteUtilsBenchmark { + private static final int INPUT_COUNT = 10_000; + private static final int MAX_INT = 2 * 1024 * 1024; + + private final int[] sizeOfInputs = new int[INPUT_COUNT]; + + @Setup(Level.Trial) + public void setUp() { + for (int i = 0; i < INPUT_COUNT; ++i) { + sizeOfInputs[i] = ThreadLocalRandom.current().nextInt(MAX_INT); + } + } + + @Benchmark + public long testSizeOfUnsignedVarint() { + long result = 0; + for (final int input : sizeOfInputs) { + result += ByteUtils.sizeOfUnsignedVarint(input); + } + return result; + } + + @Benchmark + public long testSizeOfUnsignedVarintOriginal() { + long result = 0; + for (int input : sizeOfInputs) { + int bytes = 1; + // use highestOneBit or numberOfLeadingZeros + while ((input & 0xffffff80) != 0L) { + bytes += 1; + input >>>= 7; + } + result += bytes; + } + return result; + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(ByteUtilsBenchmark.class.getSimpleName()) + .forks(2) + .build(); + + new Runner(opt).run(); + } + +} From 87d4b19d0b664c20c4ae34613da720a644b5b187 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Tue, 1 Feb 2022 17:45:17 -0800 Subject: [PATCH 2/9] add benchmark to compare math-based vs lookup-based sizeof calc --- .../apache/kafka/jmh/util/ByteUtilsBenchmark.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index ff2fdc948788c..218a10c32aa8f 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -56,6 +56,16 @@ public long testSizeOfUnsignedVarint() { return result; } + @Benchmark + public long testSizeOfUnsignedVarintMath() { + long result = 0; + for (final int input : sizeOfInputs) { + int leadingZeros = Integer.numberOfLeadingZeros(input); + result += (38 - leadingZeros) / 7 + leadingZeros / 32; + } + return result; + } + @Benchmark public long testSizeOfUnsignedVarintOriginal() { long result = 0; @@ -80,4 +90,6 @@ public static void main(String[] args) throws RunnerException { new Runner(opt).run(); } + + } From bc1afabe38f6602cea09ce9bc6d84f7a68da4259 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Tue, 1 Feb 2022 18:05:25 -0800 Subject: [PATCH 3/9] add benchmarks of single-iterations to avoid loop unroll --- .../kafka/jmh/util/ByteUtilsBenchmark.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index 218a10c32aa8f..c70e3aa84d12d 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -56,6 +56,11 @@ public long testSizeOfUnsignedVarint() { return result; } + @Benchmark + public long testSizeOfUnsignedVarintOne() { + return ByteUtils.sizeOfUnsignedVarint(sizeOfInputs[0]); + } + @Benchmark public long testSizeOfUnsignedVarintMath() { long result = 0; @@ -66,6 +71,12 @@ public long testSizeOfUnsignedVarintMath() { return result; } + @Benchmark + public long testSizeOfUnsignedVarintMathOne() { + int leadingZeros = Integer.numberOfLeadingZeros(sizeOfInputs[0]); + return (38 - leadingZeros) / 7 + leadingZeros / 32; + } + @Benchmark public long testSizeOfUnsignedVarintOriginal() { long result = 0; @@ -81,6 +92,18 @@ public long testSizeOfUnsignedVarintOriginal() { return result; } + @Benchmark + public long testSizeOfUnsignedVarintOriginalOne() { + int input = sizeOfInputs[0]; + int bytes = 1; + // use highestOneBit or numberOfLeadingZeros + while ((input & 0xffffff80) != 0L) { + bytes += 1; + input >>>= 7; + } + return bytes; + } + public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() .include(ByteUtilsBenchmark.class.getSimpleName()) From d6aeeb1f034f8b75e3f608a940aa158e09497dcf Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Wed, 2 Feb 2022 14:45:43 -0800 Subject: [PATCH 4/9] remove benchmarks with loops --- .../kafka/jmh/util/ByteUtilsBenchmark.java | 69 ++++++------------- 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index c70e3aa84d12d..fc98dd1d1fea9 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -22,84 +22,57 @@ import org.apache.kafka.common.utils.ByteUtils; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; -@State(Scope.Thread) +@State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class ByteUtilsBenchmark { - private static final int INPUT_COUNT = 10_000; - private static final int MAX_INT = 2 * 1024 * 1024; + private int input; - private final int[] sizeOfInputs = new int[INPUT_COUNT]; - - @Setup(Level.Trial) + @Setup(Level.Iteration) public void setUp() { - for (int i = 0; i < INPUT_COUNT; ++i) { - sizeOfInputs[i] = ThreadLocalRandom.current().nextInt(MAX_INT); - } + input = ThreadLocalRandom.current().nextInt(2 * 1024 * 1024); } @Benchmark + @Fork(3) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) public long testSizeOfUnsignedVarint() { - long result = 0; - for (final int input : sizeOfInputs) { - result += ByteUtils.sizeOfUnsignedVarint(input); - } - return result; - } - - @Benchmark - public long testSizeOfUnsignedVarintOne() { - return ByteUtils.sizeOfUnsignedVarint(sizeOfInputs[0]); + return ByteUtils.sizeOfUnsignedVarint(input); } @Benchmark + @Fork(3) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) public long testSizeOfUnsignedVarintMath() { - long result = 0; - for (final int input : sizeOfInputs) { - int leadingZeros = Integer.numberOfLeadingZeros(input); - result += (38 - leadingZeros) / 7 + leadingZeros / 32; - } - return result; - } - - @Benchmark - public long testSizeOfUnsignedVarintMathOne() { - int leadingZeros = Integer.numberOfLeadingZeros(sizeOfInputs[0]); + int leadingZeros = Integer.numberOfLeadingZeros(input); return (38 - leadingZeros) / 7 + leadingZeros / 32; } @Benchmark + @Fork(3) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) public long testSizeOfUnsignedVarintOriginal() { - long result = 0; - for (int input : sizeOfInputs) { - int bytes = 1; - // use highestOneBit or numberOfLeadingZeros - while ((input & 0xffffff80) != 0L) { - bytes += 1; - input >>>= 7; - } - result += bytes; - } - return result; - } - - @Benchmark - public long testSizeOfUnsignedVarintOriginalOne() { - int input = sizeOfInputs[0]; + int value = input; int bytes = 1; // use highestOneBit or numberOfLeadingZeros - while ((input & 0xffffff80) != 0L) { + while ((value & 0xffffff80) != 0L) { bytes += 1; - input >>>= 7; + value >>>= 7; } return bytes; } From f040109bae99e904b4135d7dd783279f1d0b8e13 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Thu, 3 Feb 2022 16:46:04 -0800 Subject: [PATCH 5/9] switch implementation to use bit-math instead of a lookup table h/t @artemlivshits --- .../apache/kafka/common/utils/ByteUtils.java | 79 ++++++++++++------- .../kafka/common/utils/ByteUtilsTest.java | 24 +++++- .../kafka/jmh/util/ByteUtilsBenchmark.java | 16 +--- 3 files changed, 77 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java index 8a71a5186710b..6da292b8c52c1 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -386,31 +386,6 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } - final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { - // 32 bits, and each 7-bits adds one byte to the output - 5, 5, 5, 5, // 32 - 4, 4, 4, 4, 4, 4, 4, // 28 - 3, 3, 3, 3, 3, 3, 3, // 21 - 2, 2, 2, 2, 2, 2, 2, // 14 - 1, 1, 1, 1, 1, 1, 1, // 7 - 1 // 0 - }; - - final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { - // 64 bits, and each 7-bits adds one byte to the output - 10, // 64 - 9, 9, 9, 9, 9, 9, 9, // 63 - 8, 8, 8, 8, 8, 8, 8, // 56 - 7, 7, 7, 7, 7, 7, 7, // 49 - 6, 6, 6, 6, 6, 6, 6, // 42 - 5, 5, 5, 5, 5, 5, 5, // 35 - 4, 4, 4, 4, 4, 4, 4, // 28 - 3, 3, 3, 3, 3, 3, 3, // 21 - 2, 2, 2, 2, 2, 2, 2, // 14 - 1, 1, 1, 1, 1, 1, 1, // 7 - 1 // 0 - }; - /** * Number of bytes needed to encode an integer in unsigned variable-length format. * @@ -418,7 +393,32 @@ public static void writeDouble(double value, ByteBuffer buffer) { */ public static int sizeOfUnsignedVarint(int value) { int leadingZeros = Integer.numberOfLeadingZeros(value); - return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; + + // magic sequence of numbers that produces a function equivalent to this lookup + // table, where the index in the lookup table is provided by the number of + // leading zeros of the value, and the result is the number of bytes used + // in the output + + // see the test cases as well to verify the implementation matches the prior + // for-loop logic + + // final static byte[] LEADING_ZEROS_TO_U_VARINT_SIZE = new byte[] { + // // 32 bits, and each 7-bits adds one byte to the output + // 5, 5, 5, 5, // 32 + // 4, 4, 4, 4, 4, 4, 4, // 28 + // 3, 3, 3, 3, 3, 3, 3, // 21 + // 2, 2, 2, 2, 2, 2, 2, // 14 + // 1, 1, 1, 1, 1, 1, 1, // 7 + // 1 // 0 + // }; + + // this is the core logic, but the Java encoding is suboptimal when we have a narrow + // range of integers, so we can do better here + + // return (38 - leadingZeros) / 7 + leadingZeros / 32; + + int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; + return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); } /** @@ -434,11 +434,36 @@ public static int sizeOfVarint(int value) { * Number of bytes needed to encode a long in variable-length format. * * @param value The signed value + * @see #sizeOfUnsignedVarint(int) */ public static int sizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); int leadingZeros = Long.numberOfLeadingZeros(v); - return LEADING_ZEROS_TO_U_VARLONG_SIZE[leadingZeros]; + + // For implementation notes see sizeOfUnsignedVarint, assuming the below table + + // final static byte[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new byte[] { + // // 63 bits, and each 7-bits adds one byte to the output + // 9, // 64 + // 8, 9, 9, 9, 9, 9, 9, // 63 + // 7, 8, 8, 8, 8, 8, 8, // 56 + // 6, 7, 7, 7, 7, 7, 7, // 49 + // 5, 6, 6, 6, 6, 6, 6, // 42 + // 4, 5, 5, 5, 5, 5, 5, // 35 + // 3, 4, 4, 4, 4, 4, 4, // 28 + // 2, 3, 3, 3, 3, 3, 3, // 21 + // 1, 2, 2, 2, 2, 2, 2, // 14 + // 0, 1, 1, 1, 1, 1, 1, // 7 + // 0 // 0 + // }; + + // this is the core logic, but the Java encoding is suboptimal when we have a narrow + // range of integers, so we can do better here + + // return (70 - leadingZeros) / 7 + leadingZeros / 64; + + int leadingZerosBelow70DividedBy7 = ((70 - leadingZeros) * 0b10010010010010011) >>> 19; + return leadingZerosBelow70DividedBy7 + (leadingZeros >>> 6); } private static IllegalArgumentException illegalVarintException(int value) { diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java index 4f6fa1ce0218b..66a11d53b19db 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java @@ -239,6 +239,25 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L); } + private static int mathSizeOfUnsignedVarint(int value) { + int leadingZeros = Integer.numberOfLeadingZeros(value); + // return (38 - leadingZeros) / 7 + leadingZeros / 32; + int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; + return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); + } + + @Test + public void testSizeOfUnsignedVarintMath() { + for (int i = 0; i < Integer.MAX_VALUE; i++) { + final int actual = mathSizeOfUnsignedVarint(i); + final int expected = oldSizeOfUnsignedVarint(i); + assertEquals(expected, actual); + } + } + + /** + * The old well-known implementation for sizeOfUnsignedVarint + */ private static int oldSizeOfUnsignedVarint(int value) { int bytes = 1; // use highestOneBit or numberOfLeadingZeros @@ -258,6 +277,9 @@ public void testSizeOfUnsignedVarint() { } } + /** + * The old well-known implementation for sizeOfVarlong + */ private static int oldSizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); int bytes = 1; @@ -270,7 +292,7 @@ private static int oldSizeOfVarlong(long value) { @Test public void testSizeOfVarlong() { - for (long l = Integer.MIN_VALUE - 100; l <= Integer.MAX_VALUE + 100; l++) { + for (long l = Integer.MIN_VALUE - 10000000000L; l <= Integer.MAX_VALUE + 10000000000L; l += 10000) { final int expected = oldSizeOfVarlong(l); final int actual = ByteUtils.sizeOfVarlong(l); assertEquals(expected, actual); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index fc98dd1d1fea9..03cb234ef3efc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -49,7 +49,7 @@ public void setUp() { @Fork(3) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - public long testSizeOfUnsignedVarint() { + public int testSizeOfUnsignedVarint() { return ByteUtils.sizeOfUnsignedVarint(input); } @@ -57,16 +57,7 @@ public long testSizeOfUnsignedVarint() { @Fork(3) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - public long testSizeOfUnsignedVarintMath() { - int leadingZeros = Integer.numberOfLeadingZeros(input); - return (38 - leadingZeros) / 7 + leadingZeros / 32; - } - - @Benchmark - @Fork(3) - @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) - public long testSizeOfUnsignedVarintOriginal() { + public int testSizeOfUnsignedVarintOriginal() { int value = input; int bytes = 1; // use highestOneBit or numberOfLeadingZeros @@ -85,7 +76,4 @@ public static void main(String[] args) throws RunnerException { new Runner(opt).run(); } - - - } From e891ebfd46ed97ab1c6face21f3d7f6565734a77 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Fri, 4 Feb 2022 10:46:01 -0800 Subject: [PATCH 6/9] pr feedback, rename and tidy up code and comments --- .../apache/kafka/common/utils/ByteUtils.java | 56 ++++---------- .../kafka/common/utils/ByteUtilsTest.java | 77 ++++++++----------- .../kafka/jmh/util/ByteUtilsBenchmark.java | 25 +++++- 3 files changed, 69 insertions(+), 89 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java index 6da292b8c52c1..967d65be6da37 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -390,33 +390,23 @@ public static void writeDouble(double value, ByteBuffer buffer) { * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value + * + * @see #writeUnsignedVarint(int, DataOutput) */ public static int sizeOfUnsignedVarint(int value) { - int leadingZeros = Integer.numberOfLeadingZeros(value); - - // magic sequence of numbers that produces a function equivalent to this lookup - // table, where the index in the lookup table is provided by the number of - // leading zeros of the value, and the result is the number of bytes used - // in the output - - // see the test cases as well to verify the implementation matches the prior - // for-loop logic - - // final static byte[] LEADING_ZEROS_TO_U_VARINT_SIZE = new byte[] { - // // 32 bits, and each 7-bits adds one byte to the output - // 5, 5, 5, 5, // 32 - // 4, 4, 4, 4, 4, 4, 4, // 28 - // 3, 3, 3, 3, 3, 3, 3, // 21 - // 2, 2, 2, 2, 2, 2, 2, // 14 - // 1, 1, 1, 1, 1, 1, 1, // 7 - // 1 // 0 - // }; + // Protocol buffers varint encoding is variable length, with a minimum of 1 byte + // (for zero). The values themselves are not important. What's important here is + // any leading zero bits are dropped from output. We can use this leading zero + // count w/ fast intrinsic to calc the output length directly. - // this is the core logic, but the Java encoding is suboptimal when we have a narrow - // range of integers, so we can do better here + // Test cases verify this matches the output for loop logic exactly. // return (38 - leadingZeros) / 7 + leadingZeros / 32; + // The above formula provides the implementation, but the Java encoding is suboptimal + // when we have a narrow range of integers, so we can do better manually + + int leadingZeros = Integer.numberOfLeadingZeros(value); int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); } @@ -438,30 +428,14 @@ public static int sizeOfVarint(int value) { */ public static int sizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); - int leadingZeros = Long.numberOfLeadingZeros(v); - // For implementation notes see sizeOfUnsignedVarint, assuming the below table - - // final static byte[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new byte[] { - // // 63 bits, and each 7-bits adds one byte to the output - // 9, // 64 - // 8, 9, 9, 9, 9, 9, 9, // 63 - // 7, 8, 8, 8, 8, 8, 8, // 56 - // 6, 7, 7, 7, 7, 7, 7, // 49 - // 5, 6, 6, 6, 6, 6, 6, // 42 - // 4, 5, 5, 5, 5, 5, 5, // 35 - // 3, 4, 4, 4, 4, 4, 4, // 28 - // 2, 3, 3, 3, 3, 3, 3, // 21 - // 1, 2, 2, 2, 2, 2, 2, // 14 - // 0, 1, 1, 1, 1, 1, 1, // 7 - // 0 // 0 - // }; - - // this is the core logic, but the Java encoding is suboptimal when we have a narrow - // range of integers, so we can do better here + // For implementation notes @see #sizeOfUnsignedVarint(int) + + // Similar logic is applied to allow for 64bit input -> 1-9byte output. // return (70 - leadingZeros) / 7 + leadingZeros / 64; + int leadingZeros = Long.numberOfLeadingZeros(v); int leadingZerosBelow70DividedBy7 = ((70 - leadingZeros) * 0b10010010010010011) >>> 19; return leadingZerosBelow70DividedBy7 + (leadingZeros >>> 6); } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java index 66a11d53b19db..5f855fa4a9c76 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java @@ -24,6 +24,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.function.IntFunction; +import java.util.function.LongFunction; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -239,64 +241,47 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L); } - private static int mathSizeOfUnsignedVarint(int value) { - int leadingZeros = Integer.numberOfLeadingZeros(value); - // return (38 - leadingZeros) / 7 + leadingZeros / 32; - int leadingZerosBelow38DividedBy7 = ((38 - leadingZeros) * 0b10010010010010011) >>> 19; - return leadingZerosBelow38DividedBy7 + (leadingZeros >>> 5); - } - - @Test - public void testSizeOfUnsignedVarintMath() { - for (int i = 0; i < Integer.MAX_VALUE; i++) { - final int actual = mathSizeOfUnsignedVarint(i); - final int expected = oldSizeOfUnsignedVarint(i); - assertEquals(expected, actual); - } - } - - /** - * The old well-known implementation for sizeOfUnsignedVarint - */ - private static int oldSizeOfUnsignedVarint(int value) { - int bytes = 1; - // use highestOneBit or numberOfLeadingZeros - while ((value & 0xffffff80) != 0L) { - bytes += 1; - value >>>= 7; - } - return bytes; - } - @Test public void testSizeOfUnsignedVarint() { - for (int i = 0; i < Integer.MAX_VALUE; i++) { - final int expected = oldSizeOfUnsignedVarint(i); + // The old well-known implementation for sizeOfUnsignedVarint + IntFunction simpleImplementation = (int value) -> { + int bytes = 1; + while ((value & 0xffffff80) != 0L) { + bytes += 1; + value >>>= 7; + } + return bytes; + }; + + // compare the full range of values + for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) { final int actual = ByteUtils.sizeOfUnsignedVarint(i); + final int expected = simpleImplementation.apply(i); assertEquals(expected, actual); } } - /** - * The old well-known implementation for sizeOfVarlong - */ - private static int oldSizeOfVarlong(long value) { - long v = (value << 1) ^ (value >> 63); - int bytes = 1; - while ((v & 0xffffffffffffff80L) != 0L) { - bytes += 1; - v >>>= 7; - } - return bytes; - } - @Test public void testSizeOfVarlong() { - for (long l = Integer.MIN_VALUE - 10000000000L; l <= Integer.MAX_VALUE + 10000000000L; l += 10000) { - final int expected = oldSizeOfVarlong(l); + // The old well-known implementation for sizeOfVarlong + LongFunction simpleImplementation = (long value) -> { + long v = (value << 1) ^ (value >> 63); + int bytes = 1; + while ((v & 0xffffffffffffff80L) != 0L) { + bytes += 1; + v >>>= 7; + } + return bytes; + }; + + for (long l = 1; l < Long.MAX_VALUE && l >= 0; l = l << 1) { + final int expected = simpleImplementation.apply(l); final int actual = ByteUtils.sizeOfVarlong(l); assertEquals(expected, actual); } + + // check zero as well + assertEquals(simpleImplementation.apply(0), ByteUtils.sizeOfVarlong(0)); } private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throws IOException { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index 03cb234ef3efc..fb56bfa6d8fed 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -57,10 +57,9 @@ public int testSizeOfUnsignedVarint() { @Fork(3) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - public int testSizeOfUnsignedVarintOriginal() { + public int testSizeOfUnsignedVarintSimple() { int value = input; int bytes = 1; - // use highestOneBit or numberOfLeadingZeros while ((value & 0xffffff80) != 0L) { bytes += 1; value >>>= 7; @@ -68,6 +67,28 @@ public int testSizeOfUnsignedVarintOriginal() { return bytes; } + @Benchmark + @Fork(3) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + public int testSizeOfVarlong() { + return ByteUtils.sizeOfVarlong(input); + } + + @Benchmark + @Fork(3) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + public int testSizeOfVarlongSimple() { + long v = (input << 1) ^ (input >> 63); + int bytes = 1; + while ((v & 0xffffffffffffff80L) != 0L) { + bytes += 1; + v >>>= 7; + } + return bytes; + } + public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() .include(ByteUtilsBenchmark.class.getSimpleName()) From 9dbff976c047ec82097b0a6cd156f78e4c7bba45 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Fri, 4 Feb 2022 17:28:52 -0800 Subject: [PATCH 7/9] pr feedback, rename and tidy up comments --- .../org/apache/kafka/common/utils/ByteUtils.java | 2 -- .../apache/kafka/jmh/util/ByteUtilsBenchmark.java | 15 +++------------ 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java index 967d65be6da37..7bd1d9257f84c 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -430,9 +430,7 @@ public static int sizeOfVarlong(long value) { long v = (value << 1) ^ (value >> 63); // For implementation notes @see #sizeOfUnsignedVarint(int) - // Similar logic is applied to allow for 64bit input -> 1-9byte output. - // return (70 - leadingZeros) / 7 + leadingZeros / 64; int leadingZeros = Long.numberOfLeadingZeros(v); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index fb56bfa6d8fed..3742c12ee94ac 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -37,6 +37,9 @@ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(3) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) public class ByteUtilsBenchmark { private int input; @@ -46,17 +49,11 @@ public void setUp() { } @Benchmark - @Fork(3) - @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) public int testSizeOfUnsignedVarint() { return ByteUtils.sizeOfUnsignedVarint(input); } @Benchmark - @Fork(3) - @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) public int testSizeOfUnsignedVarintSimple() { int value = input; int bytes = 1; @@ -68,17 +65,11 @@ public int testSizeOfUnsignedVarintSimple() { } @Benchmark - @Fork(3) - @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) public int testSizeOfVarlong() { return ByteUtils.sizeOfVarlong(input); } @Benchmark - @Fork(3) - @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) public int testSizeOfVarlongSimple() { long v = (input << 1) ^ (input >> 63); int bytes = 1; From dc88a0fcbaeeef9df584dabccd8427dfb0588fd2 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sat, 5 Feb 2022 14:50:47 -0800 Subject: [PATCH 8/9] fix spotbugs issue in benchmark --- .../kafka/jmh/util/ByteUtilsBenchmark.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index 3742c12ee94ac..1f4c81a6d75ab 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -41,21 +41,22 @@ @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) public class ByteUtilsBenchmark { - private int input; - + private int inputInt; + private long inputLong; @Setup(Level.Iteration) public void setUp() { - input = ThreadLocalRandom.current().nextInt(2 * 1024 * 1024); + inputInt = ThreadLocalRandom.current().nextInt(); + inputLong = ThreadLocalRandom.current().nextLong(); } @Benchmark public int testSizeOfUnsignedVarint() { - return ByteUtils.sizeOfUnsignedVarint(input); + return ByteUtils.sizeOfUnsignedVarint(inputInt); } @Benchmark public int testSizeOfUnsignedVarintSimple() { - int value = input; + int value = inputInt; int bytes = 1; while ((value & 0xffffff80) != 0L) { bytes += 1; @@ -66,12 +67,14 @@ public int testSizeOfUnsignedVarintSimple() { @Benchmark public int testSizeOfVarlong() { - return ByteUtils.sizeOfVarlong(input); + return ByteUtils.sizeOfVarlong(inputLong); } @Benchmark public int testSizeOfVarlongSimple() { - long v = (input << 1) ^ (input >> 63); + // spotbugs does not like the >> 63 rightshift if input is an int + // quite reasonable + long v = (inputLong << 1) ^ (inputLong >> 63); int bytes = 1; while ((v & 0xffffffffffffff80L) != 0L) { bytes += 1; From 329674ef41cf5804e2781b4290712d1117904c1a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 6 Feb 2022 13:14:40 -0800 Subject: [PATCH 9/9] Remove unnecessary comment --- .../main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index 1f4c81a6d75ab..bee8c16a260fa 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -72,8 +72,6 @@ public int testSizeOfVarlong() { @Benchmark public int testSizeOfVarlongSimple() { - // spotbugs does not like the >> 63 rightshift if input is an int - // quite reasonable long v = (inputLong << 1) ^ (inputLong >> 63); int bytes = 1; while ((v & 0xffffffffffffff80L) != 0L) {