diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 445bed1a99551..29c82845f9ee9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -60,6 +60,7 @@ import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionConfig; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.serialization.Serializer; @@ -395,7 +396,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this.apiVersions = new ApiVersions(); this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), - this.compressionType, + CompressionConfig.of(this.compressionType), lingerMs(config), retryBackoffMs, deliveryTimeoutMs, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 80372cbc7cfe1..285f74c987ec9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -298,8 +297,7 @@ private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batc // Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional) // for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent // with how normal batches are handled). - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(), - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic()).compressionType(recordsBuilder.compressionType()).build(); return new ProducerBatch(topicPartition, builder, this.createdMs, true); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 91fb8c92c2e0a..c9331f0df7549 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; @@ -44,13 +45,12 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionConfig; import org.apache.kafka.common.record.CompressionRatioEstimator; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -71,7 +71,7 @@ public final class RecordAccumulator { private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; private final int batchSize; - private final CompressionType compression; + private final CompressionConfig compressionConfig; private final int lingerMs; private final long retryBackoffMs; private final int deliveryTimeoutMs; @@ -91,7 +91,7 @@ public final class RecordAccumulator { * * @param logContext The log context used for logging * @param batchSize The size to use when allocating {@link MemoryRecords} instances - * @param compression The compression codec for the records + * @param compressionConfig The compression type/level/buffer size for the records * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some * latency for potentially better throughput due to more batching (and hence fewer, larger requests). @@ -105,7 +105,7 @@ public final class RecordAccumulator { */ public RecordAccumulator(LogContext logContext, int batchSize, - CompressionType compression, + CompressionConfig compressionConfig, int lingerMs, long retryBackoffMs, int deliveryTimeoutMs, @@ -121,7 +121,7 @@ public RecordAccumulator(LogContext logContext, this.flushesInProgress = new AtomicInteger(0); this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; - this.compression = compression; + this.compressionConfig = compressionConfig; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.deliveryTimeoutMs = deliveryTimeoutMs; @@ -205,7 +205,7 @@ public RecordAppendResult append(TopicPartition tp, // we don't have an in-progress record batch try to allocate a new batch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compressionConfig.getType(), key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { @@ -242,7 +242,10 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + "support the required message format (v2). The broker must be version 0.11 or later."); } - return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L); + return MemoryRecords.builder(buffer) + .magic(maxUsableMagic) + .compressionConfig(compressionConfig) + .build(); } /** @@ -340,7 +343,7 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever // is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure // the split doesn't happen too often. - CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression, + CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compressionConfig.getType(), Math.max(1.0f, (float) bigBatch.compressionRatio())); Deque dq = bigBatch.split(this.batchSize); int numSplitBatches = dq.size(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java new file mode 100644 index 0000000000000..9f877eec4039e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.utils.ByteBufferOutputStream; + +import java.io.DataOutputStream; + +/** + * This class holds all compression configurations: compression type, compression level and the size of compression buffer. + */ +public class CompressionConfig { + private final CompressionType type; + private final Integer level; + private final Integer bufferSize; + + private CompressionConfig(CompressionType type, Integer level, Integer bufferSize) { + this.type = type; + this.level = level; + this.bufferSize = bufferSize; + } + + public CompressionType getType() { + return type; + } + + public Integer getLevel() { + return level; + } + + public Integer getBufferSize() { + return bufferSize; + } + + /** + * Returns an {@link DataOutputStream} that compresses given bytes into output {@link ByteBufferOutputStream} + * with specified magic. + */ + public DataOutputStream outputStream(ByteBufferOutputStream output, byte magic) { + return new DataOutputStream(type.wrapForOutput(output, magic, this.level, this.bufferSize)); + } + + /** + * Creates a not-compressing configuration. + */ + public static CompressionConfig none() { + return of(CompressionType.NONE); + } + + /** + * Creates a configuration of specified {@link CompressionType}, default compression level, and compression buffer size. + */ + public static CompressionConfig of(CompressionType type) { + return of(type, null); + } + + /** + * Creates a configuration of specified {@link CompressionType}, specified compression level, and default compression buffer size. + */ + public static CompressionConfig of(CompressionType type, Integer level) { + return of(type, level, null); + } + + /** + * Creates a configuration of specified {@link CompressionType}, compression level, and compression buffer size. + */ + public static CompressionConfig of(CompressionType type, Integer level, Integer bufferSize) { + return new CompressionConfig(type, level, bufferSize); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 352d12d834977..bf520d2e1c8c1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -29,7 +29,6 @@ import java.lang.invoke.MethodType; import java.nio.ByteBuffer; import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; /** * The compression type to use @@ -37,7 +36,7 @@ public enum CompressionType { NONE(0, "none", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) { return buffer; } @@ -49,12 +48,12 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu GZIP(1, "gzip", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) { try { // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small // number of bytes to write (potentially a single byte) - return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024); + return new BufferedOutputStream(GZipOutputStream.of(buffer, level, blockSize), 16 * 1024); } catch (Exception e) { throw new KafkaException(e); } @@ -76,9 +75,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu SNAPPY(2, "snappy", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) { try { - return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer); + // Snappy does not support compression level; so given parameter is ignored. + if (blockSize == null) { + return (OutputStream) SnappyConstructors.OUTPUT_WITHOUT_BLOCK_SIZE.invoke(buffer); + } else { + return (OutputStream) SnappyConstructors.OUTPUT_WITH_BLOCK_SIZE.invoke(buffer, blockSize); + } } catch (Throwable e) { throw new KafkaException(e); } @@ -96,9 +100,9 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu LZ4(3, "lz4", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) { try { - return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0); + return KafkaLZ4BlockOutputStream.of(buffer, messageVersion, level, blockSize); } catch (Throwable e) { throw new KafkaException(e); } @@ -108,7 +112,7 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier, - messageVersion == RecordBatch.MAGIC_VALUE_V0); + messageVersion == RecordBatch.MAGIC_VALUE_V0); } catch (Throwable e) { throw new KafkaException(e); } @@ -117,9 +121,14 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf ZSTD(4, "zstd", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) { try { - return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer); + // Zstd does not support block size configuration; so given parameter is ignored. + if (level == null) { + return (OutputStream) ZstdConstructors.OUTPUT_WITHOUT_LEVEL.invoke(buffer); + } else { + return (OutputStream) ZstdConstructors.OUTPUT_WITH_LEVEL.invoke(buffer, level); + } } catch (Throwable e) { throw new KafkaException(e); } @@ -146,14 +155,17 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu } /** - * Wrap bufferStream with an OutputStream that will compress data with this CompressionType. - * + * Wrap bufferStream with an OutputStream that will compress data with this CompressionType with given buffer size and compression level. + *

* Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@link ByteBuffer}s directly. * Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()} * write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written. * In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved. + * + * @param level The compression level to use. If null, it falls back to the default level. + * @param blockSize The buffer size to use during compression. If null, it falls back to the default block size. */ - public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion); + public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion, Integer level, Integer blockSize); /** * Wrap buffer with an InputStream that will decompress data with this CompressionType. @@ -210,15 +222,19 @@ else if (ZSTD.name.equals(name)) private static class SnappyConstructors { static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream", MethodType.methodType(void.class, InputStream.class)); - static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream", + static final MethodHandle OUTPUT_WITHOUT_BLOCK_SIZE = findConstructor("org.xerial.snappy.SnappyOutputStream", MethodType.methodType(void.class, OutputStream.class)); + static final MethodHandle OUTPUT_WITH_BLOCK_SIZE = findConstructor("org.xerial.snappy.SnappyOutputStream", + MethodType.methodType(void.class, OutputStream.class, int.class)); } private static class ZstdConstructors { static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream", - MethodType.methodType(void.class, InputStream.class)); - static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream", - MethodType.methodType(void.class, OutputStream.class)); + MethodType.methodType(void.class, InputStream.class)); + static final MethodHandle OUTPUT_WITHOUT_LEVEL = findConstructor("com.github.luben.zstd.ZstdOutputStream", + MethodType.methodType(void.class, OutputStream.class)); + static final MethodHandle OUTPUT_WITH_LEVEL = findConstructor("com.github.luben.zstd.ZstdOutputStream", + MethodType.methodType(void.class, OutputStream.class, int.class)); } private static MethodHandle findConstructor(String className, MethodType methodType) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/GZipOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/GZipOutputStream.java new file mode 100644 index 0000000000000..430571854809d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/GZipOutputStream.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Deflater; +import java.util.zip.GZIPOutputStream; + +/** + * An extension of {@link GZIPOutputStream}, with compression level, block size configuration functionality. + */ +public class GZipOutputStream extends GZIPOutputStream { + /** + * Creates a new {@link OutputStream} with the specified buffer size and compression level. + * + * @param out the output stream + * @param size the output buffer size + * @param level the compression level + * @throws IOException If an I/O error has occurred. + */ + private GZipOutputStream(OutputStream out, int size, int level) throws IOException { + super(out, size); + setLevel(level); + } + + /** + * Sets the compression level. + * + * @param level the compression level + * @throws IllegalArgumentException If given level is not valid (e.g, not between {@link Deflater#BEST_SPEED} and {@link Deflater#BEST_COMPRESSION}) + */ + private void setLevel(int level) { + // Given compression level is not in the valid range, nor default one. + if ((level < Deflater.BEST_SPEED || Deflater.BEST_COMPRESSION < level) && level != Deflater.DEFAULT_COMPRESSION) { + throw new IllegalArgumentException("Gzip doesn't support given compression level: " + level); + } + + def.setLevel(level); + } + + /** + * Create a new {@link OutputStream} that will compress data using the GZip algorithm the specified buffer size and compression level. + *

+ * The default buffer size is 8192(=8KB) for historical reason. For details, see {@link CompressionType#GZIP}. + * + * @param out The output stream to compress. + * @param compressionLevel The compression level to use. If null, it falls back to the default level. + * @param blockSize The buffer size to use during compression. If null, it falls back to the default block size. + * @throws IllegalArgumentException If given level is not valid (e.g, not between {@link Deflater#BEST_SPEED} and {@link Deflater#BEST_COMPRESSION}) + * @throws IOException If an I/O error has occurred. + */ + public static GZipOutputStream of(OutputStream out, Integer compressionLevel, Integer blockSize) throws IOException { + if (null == blockSize) { + return new GZipOutputStream(out, 8 * 1024, compressionLevel == null ? Deflater.DEFAULT_COMPRESSION : compressionLevel); + } else { + return new GZipOutputStream(out, blockSize, compressionLevel == null ? Deflater.DEFAULT_COMPRESSION : compressionLevel); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java index 591ab1693646c..1d7e3e9dd2d6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java @@ -16,35 +16,39 @@ */ package org.apache.kafka.common.record; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.kafka.common.utils.ByteUtils; - import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; /** * A partial implementation of the v1.5.1 LZ4 Frame format. * * @see LZ4 Frame Format - * + *

* This class is not thread-safe. */ public final class KafkaLZ4BlockOutputStream extends OutputStream { public static final int MAGIC = 0x184D2204; - public static final int LZ4_MAX_HEADER_LENGTH = 19; public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000; public static final String CLOSED_STREAM = "The stream is already closed"; - public static final int BLOCKSIZE_64KB = 4; - public static final int BLOCKSIZE_256KB = 5; - public static final int BLOCKSIZE_1MB = 6; - public static final int BLOCKSIZE_4MB = 7; + private static final int BLOCKSIZE_64KB = 4; + private static final int BLOCKSIZE_256KB = 5; + private static final int BLOCKSIZE_1MB = 6; + private static final int BLOCKSIZE_4MB = 7; + private static final Set AVAILABLE_BLOCKSIZES = new HashSet<>( + Arrays.asList(BLOCKSIZE_64KB, BLOCKSIZE_256KB, BLOCKSIZE_1MB, BLOCKSIZE_4MB) + ); private final LZ4Compressor compressor; private final XXHash32 checksum; @@ -61,22 +65,20 @@ public final class KafkaLZ4BlockOutputStream extends OutputStream { /** * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. * - * @param out The output stream to compress - * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other - * values will generate an exception - * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for - * every block of data + * @param out The output stream to compress. + * @param compressor The compressor to use. + * @param bd The block descriptor to use. * @param useBrokenFlagDescriptorChecksum Default: false. When true, writes an incorrect FrameDescriptor checksum - * compatible with older kafka clients. + * compatible with older kafka clients. * @throws IOException */ - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException { + private KafkaLZ4BlockOutputStream(OutputStream out, LZ4Compressor compressor, BD bd, boolean useBrokenFlagDescriptorChecksum) throws IOException { this.out = out; - compressor = LZ4Factory.fastestInstance().fastCompressor(); + this.compressor = compressor; checksum = XXHashFactory.fastestInstance().hash32(); this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; - bd = new BD(blockSize); - flg = new FLG(blockChecksum); + this.bd = bd; + flg = new FLG(); bufferOffset = 0; maxBlockSize = bd.getBlockMaximumSize(); buffer = new byte[maxBlockSize]; @@ -86,43 +88,26 @@ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockC } /** - * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. - * - * @param out The output stream to compress - * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other - * values will generate an exception - * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for - * every block of data - * @throws IOException - */ - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { - this(out, blockSize, blockChecksum, false); - } - - /** - * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm with the specified buffer size and compression level. * - * @param out The stream to compress - * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other - * values will generate an exception + * @param out The output stream to compress. + * @param magic The message version. + * @param compressionLevel The compression level to use. If null, it falls back to the default level. + * @param blockSize The buffer size to use. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will raise an exception. + * If null, it falls back to the default block size, 4. + * @throws IllegalArgumentException * @throws IOException */ - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException { - this(out, blockSize, false, false); - } - - /** - * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. - * - * @param out The output stream to compress - * @throws IOException - */ - public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException { - this(out, BLOCKSIZE_64KB); - } - - public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) throws IOException { - this(out, BLOCKSIZE_64KB, false, useBrokenHC); + public static KafkaLZ4BlockOutputStream of(OutputStream out, int magic, Integer compressionLevel, Integer blockSize) throws IOException { + BD bd = blockSize == null ? new BD() : new BD(blockSize); + + if (magic < RecordBatch.MAGIC_VALUE_V2) { + return new KafkaLZ4BlockOutputStream(out, LZ4Factory.fastestInstance().fastCompressor(), bd, magic == RecordBatch.MAGIC_VALUE_V0); + } else { + LZ4Compressor compressor = + compressionLevel == null ? LZ4Factory.fastestInstance().highCompressor() : LZ4Factory.fastestInstance().highCompressor(compressionLevel); + return new KafkaLZ4BlockOutputStream(out, compressor, bd, false); + } } /** @@ -369,6 +354,9 @@ public int getVersion() { } } + /** + * Block Descriptor: specifies the block(buffer) size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will raise an exception. + */ public static class BD { private final int reserved2; @@ -402,8 +390,8 @@ private void validate() { if (reserved2 != 0) { throw new RuntimeException("Reserved2 field must be 0"); } - if (blockSizeValue < 4 || blockSizeValue > 7) { - throw new RuntimeException("Block size value must be between 4 and 7"); + if (!AVAILABLE_BLOCKSIZES.contains(blockSizeValue)) { + throw new RuntimeException("Unsupported block size: " + blockSizeValue); } if (reserved3 != 0) { throw new RuntimeException("Reserved3 field must be 0"); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index c6db18d419162..7f3545709a50a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -39,7 +39,7 @@ /** * A {@link Records} implementation backed by a ByteBuffer. This is used only for reading or * modifying in-place an existing buffer of record batches. To create a new buffer see {@link MemoryRecordsBuilder}, - * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType, long)} variants. + * or {@link #builder(ByteBuffer)}. */ public class MemoryRecords extends AbstractRecords { private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class); @@ -249,9 +249,10 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina originalBatch.baseOffset() : retainedRecords.get(0).offset(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic, - originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), - originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), - originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit()); + CompressionConfig.of(originalBatch.compressionType()), timestampType, baseOffset, logAppendTime, + originalBatch.producerId(), originalBatch.producerEpoch(), originalBatch.baseSequence(), + originalBatch.isTransactional(), originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), + bufferOutputStream.limit()); for (Record record : retainedRecords) builder.append(record); @@ -425,11 +426,122 @@ public static MemoryRecords readableRecords(ByteBuffer buffer) { return new MemoryRecords(buffer); } - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset) { - return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset); + public static class Builder { + // attributes without default value + private final ByteBuffer buffer; + + // attritube with default value + private byte magic = RecordBatch.CURRENT_MAGIC_VALUE; + private CompressionType compressionType = CompressionType.NONE; + private Integer compressionLevel = null; + private Integer compressionBufferSize = null; + private TimestampType timestampType = TimestampType.CREATE_TIME; + private long baseOffset = 0L; + private long logAppendTime = RecordBatch.NO_TIMESTAMP; + private long producerId = RecordBatch.NO_PRODUCER_ID; + private short producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + private int baseSequence = RecordBatch.NO_SEQUENCE; + private boolean isTransactional = false; + private boolean isControlBatch = false; + private int partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + + // attribute without default value, but decided dynamically + private int writeLimit = -1; + + public Builder(ByteBuffer buffer) { + this.buffer = buffer; + } + + public Builder magic(byte magic) { + this.magic = magic; + return this; + } + + public Builder compressionType(CompressionType compressionType) { + this.compressionType = compressionType; + return this; + } + + public Builder compressionLevel(Integer compressionLevel) { + this.compressionLevel = compressionLevel; + return this; + } + + public Builder compressionBufferSize(Integer compressionBufferSize) { + this.compressionBufferSize = compressionBufferSize; + return this; + } + + public Builder compressionConfig(CompressionConfig compressionConfig) { + this.compressionType = compressionConfig.getType(); + this.compressionLevel = compressionConfig.getLevel(); + this.compressionBufferSize = compressionConfig.getBufferSize(); + return this; + } + + public Builder timestampType(TimestampType timestampType) { + this.timestampType = timestampType; + + if (timestampType == TimestampType.LOG_APPEND_TIME) { + logAppendTime = System.currentTimeMillis(); + } else { + logAppendTime = RecordBatch.NO_TIMESTAMP; + } + + return this; + } + + public Builder baseOffset(long baseOffset) { + this.baseOffset = baseOffset; + return this; + } + + public Builder logAppendTime(long logAppendTime) { + this.logAppendTime = logAppendTime; + return this; + } + + public Builder producerState(long producerId, short producerEpoch, int baseSequence) { + this.producerId = producerId; + this.producerEpoch = producerEpoch; + this.baseSequence = baseSequence; + return this; + } + + public Builder transaction(boolean isTransactional) { + this.isTransactional = isTransactional; + return this; + } + + public Builder controlBatch(boolean isControlBatch) { + this.isControlBatch = isControlBatch; + return this; + } + + public Builder partitionLeaderEpoch(int partitionLeaderEpoch) { + this.partitionLeaderEpoch = partitionLeaderEpoch; + return this; + } + + public Builder writeLimit(int writeLimit) { + this.writeLimit = writeLimit; + return this; + } + + public MemoryRecordsBuilder build() { + if (writeLimit < 0) { + writeLimit = buffer.remaining(); + } + + return new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, CompressionConfig.of(compressionType, compressionLevel, compressionBufferSize), + timestampType, baseOffset, logAppendTime, producerId, producerEpoch, baseSequence, + isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit); + } + } + + public static Builder builder(ByteBuffer buffer) { + return new Builder(buffer); } public static MemoryRecordsBuilder idempotentBuilder(ByteBuffer buffer, @@ -438,100 +550,12 @@ public static MemoryRecordsBuilder idempotentBuilder(ByteBuffer buffer, long producerId, short producerEpoch, int baseSequence) { - return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - baseOffset, System.currentTimeMillis(), producerId, producerEpoch, baseSequence); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset, - long logAppendTime) { - return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset) { - long logAppendTime = RecordBatch.NO_TIMESTAMP; - if (timestampType == TimestampType.LOG_APPEND_TIME) - logAppendTime = System.currentTimeMillis(); - return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset, - long logAppendTime, - int partitionLeaderEpoch) { - return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, partitionLeaderEpoch); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - CompressionType compressionType, - long baseOffset, - long producerId, - short producerEpoch, - int baseSequence, - boolean isTransactional) { - return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, baseOffset, - RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, - RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset, - long logAppendTime, - long producerId, - short producerEpoch, - int baseSequence) { - return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - producerId, producerEpoch, baseSequence, false, RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset, - long logAppendTime, - long producerId, - short producerEpoch, - int baseSequence, - boolean isTransactional, - int partitionLeaderEpoch) { - return builder(buffer, magic, compressionType, timestampType, baseOffset, - logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, partitionLeaderEpoch); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, - long baseOffset, - long logAppendTime, - long producerId, - short producerEpoch, - int baseSequence, - boolean isTransactional, - boolean isControlBatch, - int partitionLeaderEpoch) { - return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, - logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, - buffer.remaining()); + return builder(buffer) + .compressionType(compressionType) + .baseOffset(baseOffset) + .logAppendTime(System.currentTimeMillis()) + .producerState(producerId, producerEpoch, baseSequence) + .build(); } public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) { @@ -616,13 +640,18 @@ public static MemoryRecords withRecords(byte magic, long initialOffset, Compress if (records.length == 0) return MemoryRecords.EMPTY; int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records)); - ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(sizeEstimate); - long logAppendTime = RecordBatch.NO_TIMESTAMP; - if (timestampType == TimestampType.LOG_APPEND_TIME) - logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType, - initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, - partitionLeaderEpoch, sizeEstimate); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(sizeEstimate)) + .magic(magic) + .compressionType(compressionType) + .timestampType(timestampType) + .baseOffset(initialOffset) + .logAppendTime(timestampType == TimestampType.LOG_APPEND_TIME ? System.currentTimeMillis() : RecordBatch.NO_TIMESTAMP) + .producerState(producerId, producerEpoch, baseSequence) + .transaction(isTransactional) + .controlBatch(false) + .partitionLeaderEpoch(partitionLeaderEpoch) + .writeLimit(sizeEstimate) + .build(); for (SimpleRecord record : records) builder.append(record); return builder.build(); @@ -654,12 +683,15 @@ public static MemoryRecords withEndTransactionMarker(long initialOffset, long ti public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOffset, long timestamp, int partitionLeaderEpoch, long producerId, short producerEpoch, EndTransactionMarker marker) { - boolean isTransactional = true; - boolean isControlBatch = true; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.CREATE_TIME, initialOffset, timestamp, producerId, producerEpoch, - RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, partitionLeaderEpoch, - buffer.capacity()); + MemoryRecordsBuilder builder = builder(buffer) + .baseOffset(initialOffset) + .logAppendTime(timestamp) + .producerState(producerId, producerEpoch, RecordBatch.NO_SEQUENCE) + .transaction(true) + .controlBatch(true) + .partitionLeaderEpoch(partitionLeaderEpoch) + .writeLimit(buffer.capacity()) + .build(); builder.appendEndTxnMarker(timestamp, marker); builder.close(); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 054fb86199884..c57463fe82fd1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -47,7 +47,7 @@ public void write(int b) { }); private final TimestampType timestampType; - private final CompressionType compressionType; + private final CompressionConfig compressionConfig; // Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough, // so it's not safe to hold a direct reference to the underlying ByteBuffer. @@ -82,9 +82,29 @@ public void write(int b) { private MemoryRecords builtRecords; private boolean aborted = false; + /** + * Construct a new builder. + * + * @param bufferStream The output stream to the underlying buffer to use (note that this class will allocate a new buffer + * if necessary to fit the records appended) + * @param magic The magic value to use + * @param compressionConfig The compression codec, level, buffer size to use + * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}. + * @param baseOffset The initial offset to use for + * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used. + * @param producerId The producer ID associated with the producer writing this record set + * @param producerEpoch The epoch of the producer + * @param baseSequence The sequence number of the first record in this set + * @param isTransactional Whether or not the records are part of a transaction + * @param isControlBatch Whether or not this is a control batch (e.g. for transaction markers) + * @param partitionLeaderEpoch The epoch of the partition leader appending the record set to the log + * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded + * when compression is used since size estimates are rough, and in the case that the first + * record added exceeds the size). + */ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, - CompressionType compressionType, + CompressionConfig compressionConfig, TimestampType timestampType, long baseOffset, long logAppendTime, @@ -102,13 +122,13 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, throw new IllegalArgumentException("Transactional records are not supported for magic " + magic); if (isControlBatch) throw new IllegalArgumentException("Control records are not supported for magic " + magic); - if (compressionType == CompressionType.ZSTD) + if (compressionConfig.getType() == CompressionType.ZSTD) throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic); } this.magic = magic; this.timestampType = timestampType; - this.compressionType = compressionType; + this.compressionConfig = compressionConfig; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.numRecords = 0; @@ -123,34 +143,14 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); - this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType); + this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionConfig.getType()); bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; - this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); + this.appendStream = compressionConfig.outputStream(this.bufferStream, magic); } - /** - * Construct a new builder. - * - * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary - * to fit the records appended) - * @param magic The magic value to use - * @param compressionType The compression codec to use - * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}. - * @param baseOffset The initial offset to use for - * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used. - * @param producerId The producer ID associated with the producer writing this record set - * @param producerEpoch The epoch of the producer - * @param baseSequence The sequence number of the first record in this set - * @param isTransactional Whether or not the records are part of a transaction - * @param isControlBatch Whether or not this is a control batch (e.g. for transaction markers) - * @param partitionLeaderEpoch The epoch of the partition leader appending the record set to the log - * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded - * when compression is used since size estimates are rough, and in the case that the first - * record added exceeds the size). - */ - public MemoryRecordsBuilder(ByteBuffer buffer, + public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, CompressionType compressionType, TimestampType timestampType, @@ -163,9 +163,8 @@ public MemoryRecordsBuilder(ByteBuffer buffer, boolean isControlBatch, int partitionLeaderEpoch, int writeLimit) { - this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime, - producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, - writeLimit); + this(bufferStream, magic, CompressionConfig.of(compressionType), timestampType, baseOffset, logAppendTime, + producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit); } public ByteBuffer buffer() { @@ -181,7 +180,7 @@ public double compressionRatio() { } public CompressionType compressionType() { - return compressionType; + return compressionConfig.getType(); } public boolean isControlBatch() { @@ -219,7 +218,7 @@ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { long shallowOffsetOfMaxTimestamp; // Use the last offset when dealing with record batches - if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + if (compressionConfig.getType() != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) shallowOffsetOfMaxTimestamp = lastOffset; else shallowOffsetOfMaxTimestamp = baseOffset; @@ -229,7 +228,7 @@ public RecordsInfo info() { } else { long shallowOffsetOfMaxTimestamp; // Use the last offset when dealing with record batches - if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + if (compressionConfig.getType() != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) shallowOffsetOfMaxTimestamp = lastOffset; else shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; @@ -318,7 +317,7 @@ public void close() { } else { if (magic > RecordBatch.MAGIC_VALUE_V1) this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes; - else if (compressionType != CompressionType.NONE) + else if (compressionConfig.getType() != CompressionType.NONE) this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes; ByteBuffer buffer = buffer().duplicate(); @@ -363,7 +362,7 @@ private int writeDefaultBatchHeader() { else maxTimestamp = this.maxTimestamp; - DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, + DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionConfig.getType(), timestampType, firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, numRecords); @@ -386,7 +385,7 @@ private int writeLegacyCompressedWrapperHeader() { AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize); long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp; - LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType); + LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionConfig.getType(), timestampType); buffer.position(pos); return writtenCompressed; @@ -634,7 +633,7 @@ private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, By private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte magic) throws IOException { ensureOpenForRecordAppend(); - if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) + if (compressionConfig.getType() == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) timestamp = logAppendTime; int size = LegacyRecord.recordSize(magic, key, value); @@ -649,7 +648,7 @@ private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, Byt private long toInnerOffset(long offset) { // use relative offsets for compressed messages with magic v1 - if (magic > 0 && compressionType != CompressionType.NONE) + if (magic > 0 && compressionConfig.getType() != CompressionType.NONE) return offset - baseOffset; return offset; } @@ -688,7 +687,7 @@ private void ensureOpenForRecordBatchWrite() { * @return The estimated number of bytes written */ private int estimatedBytesWritten() { - if (compressionType == CompressionType.NONE) { + if (compressionConfig.getType() == CompressionType.NONE) { return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes; } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java index 423d1e1656f86..63d2d158ed315 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java @@ -110,8 +110,13 @@ private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer bu final TimestampType timestampType = batch.timestampType(); long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP; - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), - timestampType, recordBatchAndRecords.baseOffset, logAppendTime); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(batch.compressionType()) + .timestampType(timestampType) + .baseOffset(recordBatchAndRecords.baseOffset) + .logAppendTime(logAppendTime) + .build(); for (Record record : recordBatchAndRecords.records) { // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported if (magic > RecordBatch.MAGIC_VALUE_V1) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 0f9b956663f16..292174dcdf24e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -51,10 +51,8 @@ import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -1826,8 +1824,7 @@ private FetchResponse fetchResponse(Map data() { List payloads = new ArrayList<>(); @@ -90,21 +91,24 @@ public static Collection data() { List values = new ArrayList<>(); for (Payload payload : payloads) - for (boolean broken : Arrays.asList(false, true)) - for (boolean ignore : Arrays.asList(false, true)) - for (boolean blockChecksum : Arrays.asList(false, true)) - for (boolean close : Arrays.asList(false, true)) - values.add(new Object[]{broken, ignore, blockChecksum, close, payload}); + // TODO: Add a way to get available magic values, compression levels, and block sizes. + for (int magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) + for (Integer compressionLevel = 1; compressionLevel < 17 + 1; ++compressionLevel) + for (Integer blockSize = 4; blockSize < 7 + 1; ++blockSize) + for (boolean ignore : Arrays.asList(false, true)) + for (boolean close : Arrays.asList(false, true)) + values.add(new Object[]{magic, compressionLevel, blockSize, ignore, close, payload}); return values; } - public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, - boolean blockChecksum, boolean close, Payload payload) { - this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; + public KafkaLZ4Test(int magic, Integer compressionLevel, Integer blockSize, boolean ignoreFlagDescriptorChecksum, + boolean close, Payload payload) { + this.magic = magic; + this.compressionLevel = compressionLevel; + this.blockSize = blockSize; this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum; - this.payload = payload.payload; this.close = close; - this.blockChecksum = blockChecksum; + this.payload = payload.payload; } @Test @@ -143,7 +147,7 @@ public void testBadFrameChecksum() throws Exception { @Test public void testBadBlockSize() throws Exception { - if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) + if (!close || (useBrokenFlagDescriptorChecksum() && !ignoreFlagDescriptorChecksum)) return; byte[] compressed = compressedBytes(); @@ -157,8 +161,6 @@ public void testBadBlockSize() throws Exception { assertThat(e.getMessage(), CoreMatchers.containsString("exceeded max")); } - - @Test public void testCompression() throws Exception { byte[] compressed = compressedBytes(); @@ -209,7 +211,7 @@ public void testCompression() throws Exception { // Initial implementation of checksum incorrectly applied to full header // including magic bytes - if (this.useBrokenFlagDescriptorChecksum) { + if (useBrokenFlagDescriptorChecksum()) { off = 0; len = offset; } @@ -279,7 +281,7 @@ public void testDirectBuffer() throws IOException { @Test public void testSkip() throws Exception { - if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return; + if (!close || (useBrokenFlagDescriptorChecksum() && !ignoreFlagDescriptorChecksum)) return; final KafkaLZ4BlockInputStream in = makeInputStream(ByteBuffer.wrap(compressedBytes())); @@ -318,7 +320,7 @@ private void testDecompression(ByteBuffer buffer) throws IOException { assertEquals(this.payload.length, pos); assertArrayEquals(this.payload, testPayload); } catch (IOException e) { - if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) { + if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum()) { assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage()); error = e; } else if (!close) { @@ -328,18 +330,13 @@ private void testDecompression(ByteBuffer buffer) throws IOException { throw e; } } - if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) assertNotNull(error); + if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum()) assertNotNull(error); if (!close) assertNotNull(error); } private byte[] compressedBytes() throws IOException { ByteArrayOutputStream output = new ByteArrayOutputStream(); - KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream( - output, - KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB, - blockChecksum, - useBrokenFlagDescriptorChecksum - ); + KafkaLZ4BlockOutputStream lz4 = KafkaLZ4BlockOutputStream.of(output, this.magic, this.compressionLevel, this.blockSize); lz4.write(this.payload, 0, this.payload.length); if (this.close) { lz4.close(); @@ -348,4 +345,8 @@ private byte[] compressedBytes() throws IOException { } return output.toByteArray(); } + + private boolean useBrokenFlagDescriptorChecksum() { + return magic == RecordBatch.MAGIC_VALUE_V0; + } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java index b233793be009c..f759ab7dca7f8 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java @@ -122,20 +122,17 @@ private void doTestConversion(boolean testConversionOverflow) throws IOException assertEquals("incorrect test setup", offsets.size(), records.size()); ByteBuffer buffer = ByteBuffer.allocate(1024); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).compressionType(compressionType).build(); for (int i = 0; i < 3; i++) builder.appendWithOffset(offsets.get(i), records.get(i)); builder.close(); - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L); + builder = MemoryRecords.builder(buffer).compressionType(compressionType).build(); for (int i = 3; i < 6; i++) builder.appendWithOffset(offsets.get(i), records.get(i)); builder.close(); - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L); + builder = MemoryRecords.builder(buffer).compressionType(compressionType).build(); for (int i = 6; i < 10; i++) builder.appendWithOffset(offsets.get(i), records.get(i)); builder.close(); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 522915f064034..32ebb60e13cc8 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; @@ -62,10 +63,10 @@ public void testWriteEmptyRecordSet() { ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - Supplier builderSupplier = () -> new MemoryRecordsBuilder(buffer, magic, - compressionType, TimestampType.CREATE_TIME, 0L, 0L, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + Supplier builderSupplier = () -> new MemoryRecordsBuilder( + new ByteBufferOutputStream(buffer), magic, compressionType, TimestampType.CREATE_TIME, + 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); if (compressionType != CompressionType.ZSTD) { MemoryRecords records = builderSupplier.get().build(); @@ -86,9 +87,9 @@ public void testWriteTransactionalRecordSet() { short epoch = 15; int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes()); MemoryRecords records = builder.build(); @@ -106,8 +107,9 @@ public void testWriteTransactionalNotAllowedMagicV0() { short epoch = 15; int sequence = 2342; - new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0, + compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); } @Test(expected = IllegalArgumentException.class) @@ -119,8 +121,9 @@ public void testWriteTransactionalNotAllowedMagicV1() { short epoch = 15; int sequence = 2342; - new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1, + compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); } @Test(expected = IllegalArgumentException.class) @@ -132,8 +135,9 @@ public void testWriteControlBatchNotAllowedMagicV0() { short epoch = 15; int sequence = 2342; - new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0, + compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, false, true, + RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); } @Test(expected = IllegalArgumentException.class) @@ -145,8 +149,9 @@ public void testWriteControlBatchNotAllowedMagicV1() { short epoch = 15; int sequence = 2342; - new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1, + compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, false, true, + RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); } @Test(expected = IllegalArgumentException.class) @@ -158,8 +163,9 @@ public void testWriteTransactionalWithInvalidPID() { short epoch = 15; int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.close(); } @@ -172,8 +178,9 @@ public void testWriteIdempotentWithInvalidEpoch() { short epoch = RecordBatch.NO_PRODUCER_EPOCH; int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.close(); } @@ -186,8 +193,9 @@ public void testWriteIdempotentWithInvalidBaseSequence() { short epoch = 15; int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.close(); } @@ -200,8 +208,9 @@ public void testWriteEndTxnMarkerNonTransactionalBatch() { short epoch = 15; int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0)); } @@ -214,8 +223,9 @@ public void testWriteEndTxnMarkerNonControlBatch() { short epoch = 15; int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0)); } @@ -233,8 +243,9 @@ public void testCompressionRateV0() { LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()), }; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); int uncompressedSize = 0; @@ -258,8 +269,9 @@ public void testEstimatedSizeInBytes() { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); int previousEstimate = 0; @@ -291,8 +303,9 @@ public void testCompressionRateV1() { LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()), }; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); int uncompressedSize = 0; @@ -320,9 +333,10 @@ public void buildUsingLogAppendTime() { buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.append(0L, "b".getBytes(), "2".getBytes()); builder.append(0L, "c".getBytes(), "3".getBytes()); @@ -352,8 +366,9 @@ public void buildUsingCreateTime() { buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.append(2L, "b".getBytes(), "2".getBytes()); @@ -384,10 +399,10 @@ public void testAppendedChecksumConsistency() { ByteBuffer buffer = ByteBuffer.allocate(512); for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes()); MemoryRecords memoryRecords = builder.build(); List records = TestUtils.toList(memoryRecords.records()); @@ -404,9 +419,11 @@ public void testSmallWriteLimit() { byte[] value = "bar".getBytes(); int writeLimit = 0; ByteBuffer buffer = ByteBuffer.allocate(512); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, - TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, 0L, + LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, + writeLimit); assertFalse(builder.isFull()); assertTrue(builder.hasRoomFor(0L, key, value, Record.EMPTY_HEADERS)); @@ -433,8 +450,9 @@ public void writePastLimit() { buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.setEstimatedCompressionRatio(0.5f); builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -462,9 +480,11 @@ public void testAppendAtInvalidOffset() { buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, - TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, + logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, + buffer.capacity()); builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null); @@ -475,8 +495,8 @@ public void testAppendAtInvalidOffset() { @Test public void convertV2ToV1UsingMixedCreateAndLogAppendTime() { ByteBuffer buffer = ByteBuffer.allocate(512); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, - compressionType, TimestampType.LOG_APPEND_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2).compressionType(compressionType).timestampType(TimestampType.LOG_APPEND_TIME).build(); builder.append(10L, "1".getBytes(), "a".getBytes()); builder.close(); @@ -487,8 +507,7 @@ public void convertV2ToV1UsingMixedCreateAndLogAppendTime() { int position = buffer.position(); - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, - TimestampType.CREATE_TIME, 1L); + builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V2).compressionType(compressionType).baseOffset(1L).build(); builder.append(12L, "2".getBytes(), "b".getBytes()); builder.append(13L, "3".getBytes(), "c".getBytes()); builder.close(); @@ -540,13 +559,12 @@ public void convertToV1WithMixedV0AndV2Data() { assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1); ByteBuffer buffer = ByteBuffer.allocate(512); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, - compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V0).compressionType(compressionType).timestampType(TimestampType.NO_TIMESTAMP_TYPE).build(); builder.append(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "a".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, - TimestampType.CREATE_TIME, 1L); + builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V2).compressionType(compressionType).baseOffset(1L).build(); builder.append(11L, "2".getBytes(), "b".getBytes()); builder.append(12L, "3".getBytes(), "c".getBytes()); builder.close(); @@ -619,9 +637,10 @@ public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() { ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); assertThrows(IllegalStateException.class, builder::build); } @@ -634,9 +653,10 @@ public void shouldResetBufferToInitialPositionOnAbort() { ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.abort(); assertEquals(bufferOffset, builder.buffer().position()); @@ -650,9 +670,10 @@ public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() { ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); try { builder.close(); @@ -670,9 +691,10 @@ public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() { ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); try { builder.append(0L, "a".getBytes(), "1".getBytes()); @@ -705,10 +727,11 @@ public void testBuffersDereferencedOnClose() { int iterations = 0; while (iterations++ < 100) { buffer.rewind(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, 0); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), + RecordBatch.MAGIC_VALUE_V2, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, + 0); builder.append(1L, new byte[0], value); builder.build(); builders.add(builder); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 3d5a4f1a498b3..f13b26283f407 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; +import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -76,7 +77,7 @@ public void testIterator() { ByteBuffer buffer = ByteBuffer.allocate(1024); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression, + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(new ByteBufferOutputStream(buffer), magic, compression, TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, false, partitionLeaderEpoch, buffer.limit()); @@ -158,8 +159,7 @@ public void testIterator() { @Test public void testHasRoomForMethod() { assumeAtLeastV2OrNotZstd(); - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024)).magic(magic).compressionType(compression).build(); builder.append(0L, "a".getBytes(), "1".getBytes()); assertTrue(builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS)); builder.close(); @@ -169,8 +169,7 @@ public void testHasRoomForMethod() { @Test public void testHasRoomForMethodWithHeaders() { if (magic >= RecordBatch.MAGIC_VALUE_V2) { - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(100), magic, compression, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(100)).magic(magic).compressionType(compression).build(); RecordHeaders headers = new RecordHeaders(); headers.add("hello", "world.world".getBytes()); headers.add("hello", "world.world".getBytes()); @@ -227,8 +226,12 @@ public void testFilterToPreservesPartitionLeaderEpoch() { int partitionLeaderEpoch = 67; ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, - 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .logAppendTime(RecordBatch.NO_TIMESTAMP) + .partitionLeaderEpoch(partitionLeaderEpoch) + .build(); builder.append(10L, null, "a".getBytes()); builder.append(11L, "1".getBytes(), "b".getBytes()); builder.append(12L, null, "c".getBytes()); @@ -260,9 +263,14 @@ public void testFilterToEmptyBatchRetention() { int partitionLeaderEpoch = 293; int numRecords = 2; - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, - baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, - partitionLeaderEpoch); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .baseOffset(baseOffset) + .producerState(producerId, producerEpoch, baseSequence) + .transaction(isTransactional) + .partitionLeaderEpoch(partitionLeaderEpoch) + .build(); builder.append(11L, "2".getBytes(), "b".getBytes()); builder.append(12L, "3".getBytes(), "c".getBytes()); builder.close(); @@ -448,22 +456,22 @@ public void testFilterToBatchDiscard() { assumeTrue(compression != CompressionType.NONE || magic >= MAGIC_VALUE_V2); ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).build(); builder.append(10L, "1".getBytes(), "a".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(1L).build(); builder.append(11L, "2".getBytes(), "b".getBytes()); builder.append(12L, "3".getBytes(), "c".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(3L).build(); builder.append(13L, "4".getBytes(), "d".getBytes()); builder.append(20L, "5".getBytes(), "e".getBytes()); builder.append(15L, "6".getBytes(), "f".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(6L).build(); builder.append(16L, "7".getBytes(), "g".getBytes()); builder.close(); @@ -501,8 +509,7 @@ public void testFilterToAlreadyCompactedLog() { ByteBuffer buffer = ByteBuffer.allocate(2048); // create a batch with some offset gaps to simulate a compacted batch - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).build(); builder.appendWithOffset(5L, 10L, null, "a".getBytes()); builder.appendWithOffset(8L, 11L, "1".getBytes(), "b".getBytes()); builder.appendWithOffset(10L, 12L, null, "c".getBytes()); @@ -547,7 +554,7 @@ public void testFilterToPreservesProducerInfo() { ByteBuffer buffer = ByteBuffer.allocate(2048); // non-idempotent, non-transactional - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).build(); builder.append(10L, null, "a".getBytes()); builder.append(11L, "1".getBytes(), "b".getBytes()); builder.append(12L, null, "c".getBytes()); @@ -558,8 +565,13 @@ public void testFilterToPreservesProducerInfo() { long pid1 = 23L; short epoch1 = 5; int baseSequence1 = 10; - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L, - RecordBatch.NO_TIMESTAMP, pid1, epoch1, baseSequence1); + builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .baseOffset(3L) + .logAppendTime(RecordBatch.NO_TIMESTAMP) + .producerState(pid1, epoch1, baseSequence1) + .build(); builder.append(13L, null, "d".getBytes()); builder.append(14L, "4".getBytes(), "e".getBytes()); builder.append(15L, "5".getBytes(), "f".getBytes()); @@ -569,8 +581,13 @@ public void testFilterToPreservesProducerInfo() { long pid2 = 99384L; short epoch2 = 234; int baseSequence2 = 15; - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L, - RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH); + builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .baseOffset(3L) + .producerState(pid2, epoch2, baseSequence2) + .transaction(true) + .build(); builder.append(16L, "6".getBytes(), "g".getBytes()); builder.append(17L, "7".getBytes(), "h".getBytes()); builder.append(18L, null, "i".getBytes()); @@ -641,23 +658,23 @@ public void testFilterToWithUndersizedBuffer() { assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(1024); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).build(); builder.append(10L, null, "a".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(1L).build(); builder.append(11L, "1".getBytes(), new byte[128]); builder.append(12L, "2".getBytes(), "c".getBytes()); builder.append(13L, null, "d".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 4L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(4L).build(); builder.append(14L, null, "e".getBytes()); builder.append(15L, "5".getBytes(), "f".getBytes()); builder.append(16L, "6".getBytes(), "g".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 7L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(7L).build(); builder.append(17L, "7".getBytes(), new byte[128]); builder.close(); @@ -725,22 +742,22 @@ public void testFilterTo() { assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).build(); builder.append(10L, null, "a".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(1L).build(); builder.append(11L, "1".getBytes(), "b".getBytes()); builder.append(12L, null, "c".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(3L).build(); builder.append(13L, null, "d".getBytes()); builder.append(20L, "4".getBytes(), "e".getBytes()); builder.append(15L, "5".getBytes(), "f".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); + builder = MemoryRecords.builder(buffer).magic(magic).compressionType(compression).baseOffset(6L).build(); builder.append(16L, "6".getBytes(), "g".getBytes()); builder.close(); @@ -842,19 +859,36 @@ public void testFilterToPreservesLogAppendTime() { long logAppendTime = System.currentTimeMillis(); ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, - TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, firstSequence); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .timestampType(TimestampType.LOG_APPEND_TIME) + .logAppendTime(logAppendTime) + .producerState(pid, epoch, firstSequence) + .build(); builder.append(10L, null, "a".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime, - pid, epoch, firstSequence); + builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(1L) + .logAppendTime(logAppendTime) + .producerState(pid, epoch, firstSequence) + .build(); builder.append(11L, "1".getBytes(), "b".getBytes()); builder.append(12L, null, "c".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime, - pid, epoch, firstSequence); + builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(3L) + .logAppendTime(logAppendTime) + .producerState(pid, epoch, firstSequence) + .build(); builder.append(13L, null, "d".getBytes()); builder.append(14L, "4".getBytes(), "e".getBytes()); builder.append(15L, "5".getBytes(), "f".getBytes()); @@ -886,8 +920,13 @@ public void testNextBatchSize() { assumeAtLeastV2OrNotZstd(); ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, - TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, firstSequence); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compression) + .timestampType(TimestampType.LOG_APPEND_TIME) + .logAppendTime(logAppendTime) + .producerState(pid, epoch, firstSequence) + .build(); builder.append(10L, null, "abc".getBytes()); builder.close(); diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java index 5f578a873d0dc..e48a7f421a049 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java @@ -47,8 +47,8 @@ public void testCompressedIterationWithNullValue() throws Exception { @Test(expected = InvalidRecordException.class) public void testCompressedIterationWithEmptyRecords() throws Exception { ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64); - OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue), - RecordBatch.MAGIC_VALUE_V1); + OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput( + new ByteBufferOutputStream(emptyCompressedValue), RecordBatch.MAGIC_VALUE_V1, null, null); gzipOutput.close(); emptyCompressedValue.flip(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index 809d64f1e327b..7c791b367cf71 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -79,8 +79,7 @@ public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception @Test public void testBuildWithOldMessageFormat() { ByteBuffer buffer = ByteBuffer.allocate(256); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V1).build(); builder.append(10L, null, "a".getBytes()); Map produceData = new HashMap<>(); produceData.put(new TopicPartition("test", 0), builder.build()); @@ -94,8 +93,7 @@ public void testBuildWithOldMessageFormat() { @Test public void testBuildWithCurrentMessageFormat() { ByteBuffer buffer = ByteBuffer.allocate(256); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, - CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).build(); builder.append(10L, null, "a".getBytes()); Map produceData = new HashMap<>(); produceData.put(new TopicPartition("test", 0), builder.build()); @@ -109,11 +107,11 @@ public void testBuildWithCurrentMessageFormat() { @Test public void testV3AndAboveShouldContainOnlyOneRecordBatch() { ByteBuffer buffer = ByteBuffer.allocate(256); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).build(); builder.append(10L, null, "a".getBytes()); builder.close(); - builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L); + builder = MemoryRecords.builder(buffer).baseOffset(1L).build(); builder.append(11L, "1".getBytes(), "b".getBytes()); builder.append(12L, null, "c".getBytes()); builder.close(); @@ -137,8 +135,7 @@ public void testV3AndAboveCannotHaveNoRecordBatches() { @Test public void testV3AndAboveCannotUseMagicV0() { ByteBuffer buffer = ByteBuffer.allocate(256); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, - TimestampType.NO_TIMESTAMP_TYPE, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V0).timestampType(TimestampType.NO_TIMESTAMP_TYPE).build(); builder.append(10L, null, "a".getBytes()); Map produceData = new HashMap<>(); @@ -150,8 +147,7 @@ public void testV3AndAboveCannotUseMagicV0() { @Test public void testV3AndAboveCannotUseMagicV1() { ByteBuffer buffer = ByteBuffer.allocate(256); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V1).build(); builder.append(10L, null, "a".getBytes()); Map produceData = new HashMap<>(); @@ -163,8 +159,7 @@ public void testV3AndAboveCannotUseMagicV1() { @Test public void testV6AndBelowCannotUseZStdCompression() { ByteBuffer buffer = ByteBuffer.allocate(256); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.ZSTD, - TimestampType.CREATE_TIME, 0L); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V2).compressionType(CompressionType.ZSTD).build(); builder.append(10L, null, "a".getBytes()); Map produceData = new HashMap<>(); diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 40643a4fdc1df..29ee4ad6392bf 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -207,7 +207,7 @@ class GroupMetadataManager(brokerId: Int, val records = { val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, Seq(new SimpleRecord(timestamp, key, value)).asJava)) - val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L) + val builder = MemoryRecords.builder(buffer).magic(magicValue).compressionType(compressionType).timestampType(timestampType).build(); builder.append(timestamp, key, value) builder.build() } @@ -333,8 +333,14 @@ class GroupMetadataManager(brokerId: Int, if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue) - val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(), - producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH) + val builder = MemoryRecords.builder(buffer) + .magic(magicValue) + .compressionType(compressionType) + .timestampType(timestampType) + .logAppendTime(time.milliseconds()) + .producerState(producerId, producerEpoch, 0) + .transaction(isTxnOffsetCommit) + .build() records.foreach(builder.append) val entries = Map(offsetTopicPartition -> builder.build()) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7ad43b4c92c6b..d7cf31807033e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -30,7 +30,7 @@ import java.util.regex.Pattern import com.yammer.metrics.core.Gauge import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0} import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException} -import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} +import kafka.message.BrokerCompressionCodec import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.LeaderEpochFileCache @@ -50,11 +50,11 @@ import scala.collection.{Seq, Set, mutable} object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) + RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, -1, offsetsMonotonic = false, -1L) def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) + RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, -1, offsetsMonotonic = false, -1L) } /** @@ -68,8 +68,8 @@ object LogAppendInfo { * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logStartOffset The start offset of the log at the time of this append. * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCodec The source codec used in the message set (send by the producer) - * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any) + * @param sourceType The source compression type used in the message set (send by the producer) + * @param targetType The target compression type of the message set(after applying the broker compression configuration if any) * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing @@ -82,8 +82,8 @@ case class LogAppendInfo(var firstOffset: Option[Long], var logAppendTime: Long, var logStartOffset: Long, var recordConversionStats: RecordConversionStats, - sourceCodec: CompressionCodec, - targetCodec: CompressionCodec, + sourceType: CompressionType, + targetType: CompressionType, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean, @@ -932,8 +932,8 @@ class Log(@volatile var dir: File, offset, time, now, - appendInfo.sourceCodec, - appendInfo.targetCodec, + appendInfo.sourceType, + CompressionConfig.of(appendInfo.targetType), config.compact, config.messageFormatVersion.recordVersion.value, config.messageTimestampType, @@ -1183,7 +1183,7 @@ class Log(@volatile var dir: File, var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L - var sourceCodec: CompressionCodec = NoCompressionCodec + var sourceType = CompressionType.NONE var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L @@ -1236,15 +1236,15 @@ class Log(@volatile var dir: File, shallowMessageCount += 1 validBytesCount += batchSize - val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) - if (messageCodec != NoCompressionCodec) - sourceCodec = messageCodec + val messageType = batch.compressionType + if (messageType != CompressionType.NONE) + sourceType = messageType } // Apply broker-side compression if any - val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) + val targetType = BrokerCompressionCodec.getTargetCompressionType(config.compressionType, sourceType) LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) + RecordConversionStats.EMPTY, sourceType, targetType, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) } private def updateProducers(batch: RecordBatch, diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index d7088a675e957..d01065fabadb0 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -20,10 +20,9 @@ import java.nio.ByteBuffer import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.LongRef -import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.utils.Logging import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier} +import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionConfig, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.utils.Time import scala.collection.mutable @@ -50,8 +49,8 @@ private[kafka] object LogValidator extends Logging { offsetCounter: LongRef, time: Time, now: Long, - sourceCodec: CompressionCodec, - targetCodec: CompressionCodec, + sourceType: CompressionType, + targetConfig: CompressionConfig, compactedTopic: Boolean, magic: Byte, timestampType: TimestampType, @@ -59,7 +58,7 @@ private[kafka] object LogValidator extends Logging { partitionLeaderEpoch: Int, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { - if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { + if (sourceType == CompressionType.NONE && targetConfig.getType == CompressionType.NONE) { // check the magic value if (!records.hasMatchingMagic(magic)) convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, time, now, timestampType, @@ -69,12 +68,12 @@ private[kafka] object LogValidator extends Logging { assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, magic) } else { - validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic, + validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceType, targetConfig, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion) } } - private[kafka] def getFirstBatchAndMaybeValidateNoMoreBatches(records: MemoryRecords, sourceCodec: CompressionCodec): RecordBatch = { + private[kafka] def getFirstBatchAndMaybeValidateNoMoreBatches(records: MemoryRecords, sourceType: CompressionType): RecordBatch = { val batchIterator = records.batches.iterator if (!batchIterator.hasNext) { @@ -84,7 +83,7 @@ private[kafka] object LogValidator extends Logging { val batch = batchIterator.next() // if the format is v2 and beyond, or if the messages are compressed, we should check there's only one batch. - if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || sourceCodec != NoCompressionCodec) { + if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || sourceType != CompressionType.NONE) { if (batchIterator.hasNext) { throw new InvalidRecordException("Compressed outer record has more than one batch") } @@ -165,10 +164,17 @@ private[kafka] object LogValidator extends Logging { } val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) - val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType, - offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) - - val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) + val builder = MemoryRecords.builder(newBuffer) + .magic(toMagicValue) + .timestampType(timestampType) + .baseOffset(offsetCounter.value) + .logAppendTime(now) + .producerState(producerId, producerEpoch, sequence) + .transaction(isTransactional) + .partitionLeaderEpoch(partitionLeaderEpoch) + .build() + + val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE) for (batch <- records.batches.asScala) { validateBatch(firstBatch, batch, isFromClient, toMagicValue) @@ -205,7 +211,7 @@ private[kafka] object LogValidator extends Logging { var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value - val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) + val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE) for (batch <- records.batches.asScala) { validateBatch(firstBatch, batch, isFromClient, magic) @@ -258,7 +264,7 @@ private[kafka] object LogValidator extends Logging { /** * We cannot do in place assignment in one of the following situations: - * 1. Source and target compression codec are different + * 1. Source and target compression type are different * 2. When the target magic is not equal to batches' magic, meaning format conversion is needed. * 3. When the target magic is equal to V0, meaning absolute offsets need to be re-assigned. */ @@ -266,8 +272,8 @@ private[kafka] object LogValidator extends Logging { offsetCounter: LongRef, time: Time, now: Long, - sourceCodec: CompressionCodec, - targetCodec: CompressionCodec, + sourceType: CompressionType, + targetConfig: CompressionConfig, compactedTopic: Boolean, toMagic: Byte, timestampType: TimestampType, @@ -276,12 +282,12 @@ private[kafka] object LogValidator extends Logging { isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { - if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) + if (targetConfig.getType == CompressionType.ZSTD && interBrokerProtocolVersion < KAFKA_2_1_IV0) throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") // No in place assignment situation 1 - var inPlaceAssignment = sourceCodec == targetCodec + var inPlaceAssignment = sourceType == targetConfig.getType var maxTimestamp = RecordBatch.NO_TIMESTAMP val expectedInnerOffset = new LongRef(0) @@ -292,7 +298,7 @@ private[kafka] object LogValidator extends Logging { // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records - val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec) + val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceType) // No in place assignment situation 2 and 3: we only need to check for the first batch because: // 1. For most cases (compressed records, v2, for example), there's only one batch anyways. @@ -301,7 +307,7 @@ private[kafka] object LogValidator extends Logging { inPlaceAssignment = false // Do not compress control records unless they are written compressed - if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch) + if (sourceType == CompressionType.NONE && firstBatch.isControlBatch) inPlaceAssignment = true val batches = records.batches.asScala @@ -318,7 +324,7 @@ private[kafka] object LogValidator extends Logging { try { for (record <- batch.asScala) { - if (sourceCodec != NoCompressionCodec && record.isCompressed) + if (sourceType != CompressionType.NONE && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + s"compression attribute set: $record") validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) @@ -348,7 +354,7 @@ private[kafka] object LogValidator extends Logging { val first = records.batches.asScala.head (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional) } - buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec), now, + buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, targetConfig.getType, now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch, isFromClient, uncompressedSizeInBytes) } else { @@ -395,9 +401,16 @@ private[kafka] object LogValidator extends Logging { val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava) val buffer = ByteBuffer.allocate(estimatedSize) - val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, - logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) - + val builder = MemoryRecords.builder(buffer) + .magic(magic) + .compressionType(compressionType) + .timestampType(timestampType) + .baseOffset(offsetCounter.value) + .logAppendTime(logAppendTime) + .producerState(producerId, producerEpoch, baseSequence) + .transaction(isTransactional) + .partitionLeaderEpoch(partitionLeaderEpoch) + .build() validatedRecords.foreach { record => builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index abe3694b0359c..387f8be5a830b 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -20,6 +20,7 @@ package kafka.message import java.util.Locale import kafka.common.UnknownCodecException +import org.apache.kafka.common.record.CompressionType object CompressionCodec { def getCompressionCodec(codec: Int): CompressionCodec = { @@ -58,11 +59,14 @@ object BrokerCompressionCodec { } } - def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = { + def getTargetCompressionType(compressionType: String, producerCompression: CompressionType): CompressionType = { if (ProducerCompressionCodec.name.equals(compressionType)) producerCompression else - getCompressionCodec(compressionType) + if (UncompressedCodec.name.equals(compressionType)) + CompressionType.NONE + else + CompressionType.forName(compressionType) } } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index bb1270d24af60..dd89d87880c36 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -223,8 +223,10 @@ class PartitionTest { // Write to the future replica as if the log had been compacted, and do not roll the segment val buffer = ByteBuffer.allocate(1024) - val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0) + val builder = MemoryRecords.builder(buffer) + .logAppendTime(RecordBatch.NO_TIMESTAMP) + .partitionLeaderEpoch(0) + .build() builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes)) builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes)) builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes)) @@ -916,9 +918,11 @@ class PartitionTest { new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k2".getBytes, "v2".getBytes)) val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) - val builder = MemoryRecords.builder( - buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, - baseOffset, time.milliseconds, 0) + val builder = MemoryRecords.builder(buf) + .baseOffset(baseOffset) + .logAppendTime(time.milliseconds) + .partitionLeaderEpoch(0) + .build() records.foreach(builder.append) builder.build() } @@ -952,9 +956,12 @@ class PartitionTest { def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) - val builder = MemoryRecords.builder( - buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, - baseOffset, time.milliseconds, partitionLeaderEpoch) + val builder = MemoryRecords.builder(buf) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(baseOffset) + .logAppendTime(time.milliseconds) + .partitionLeaderEpoch(partitionLeaderEpoch) + .build() records.foreach(builder.append) builder.build() } @@ -967,8 +974,11 @@ class PartitionTest { val baseSequence = 0 val isTransactional = true val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) - val builder = MemoryRecords.builder(buf, CompressionType.NONE, baseOffset, producerId, - producerEpoch, baseSequence, isTransactional) + val builder = MemoryRecords.builder(buf) + .baseOffset(baseOffset) + .producerState(producerId, producerEpoch, baseSequence) + .transaction(isTransactional) + .build() records.foreach(builder.append) builder.build() } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index f9b571e68c2a8..99801fa07af49 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -524,7 +524,7 @@ class GroupMetadataManagerTest { } private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = { - val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset) + val builder = MemoryRecords.builder(buffer).timestampType(TimestampType.LOG_APPEND_TIME).baseOffset(baseOffset).build(); val commitRecords = createCommittedOffsetRecords(offsets) commitRecords.foreach(builder.append) builder.build() @@ -533,7 +533,11 @@ class GroupMetadataManagerTest { private def appendTransactionalOffsetCommits(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, baseOffset: Long, offsets: Map[TopicPartition, Long]): Int = { - val builder = MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true) + val builder = MemoryRecords.builder(buffer) + .baseOffset(baseOffset) + .producerState(producerId, producerEpoch, 0) + .transaction(true) + .build() val commitRecords = createCommittedOffsetRecords(offsets) commitRecords.foreach(builder.append) builder.build() @@ -542,9 +546,15 @@ class GroupMetadataManagerTest { private def completeTransactionalOffsetCommit(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, baseOffset: Long, isCommit: Boolean): Int = { - val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), producerId, producerEpoch, 0, true, true, - RecordBatch.NO_PARTITION_LEADER_EPOCH) + val builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(baseOffset) + .logAppendTime(time.milliseconds()) + .producerState(producerId, producerEpoch, 0) + .transaction(true) + .controlBatch(true) + .build() val controlRecordType = if (isCommit) ControlRecordType.COMMIT else ControlRecordType.ABORT builder.appendEndTxnMarker(time.milliseconds(), new EndTransactionMarker(controlRecordType, 0)) builder.build() diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 557cef3955514..515b3d94765db 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -77,6 +77,25 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode.codec, readBatch(0).compressionType.id) } + @Test + def testGetTargetCompressionCodec() { + if (brokerCompression.equals(ProducerCompressionCodec.name)) { + // if broker-size compression is 'producer', returns the used compression type. + for (compressionType <- CompressionType.values) { + assertEquals(BrokerCompressionCodec.getTargetCompressionType(brokerCompression, compressionType), compressionType) + } + } else { + for (compressionType <- CompressionType.values) { + if (brokerCompression.equals(UncompressedCodec.name)) { + // if broker-size compression is 'uncompressed', returns CompressionType#NONE. + assertEquals(BrokerCompressionCodec.getTargetCompressionType(brokerCompression, compressionType), CompressionType.NONE) + } else { + // anything else, returns broker-size compression type. + assertEquals(BrokerCompressionCodec.getTargetCompressionType(brokerCompression, compressionType), CompressionType.forId(CompressionCodec.getCompressionCodec(brokerCompression).codec)) + } + } + } + } } object BrokerCompressionTest { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 42ad8a08d1481..9a790feb5756a 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1573,7 +1573,7 @@ class LogCleanerTest { kv._2.toString.getBytes)) val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) - val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME, initialOffset) + val builder = MemoryRecords.builder(buffer).magic(RecordBatch.MAGIC_VALUE_V1).compressionType(codec).baseOffset(initialOffset).build(); var offset = initialOffset records.foreach { record => diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 9cd53e529ba93..16514456e6112 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1137,23 +1137,46 @@ class LogTest { val epoch: Short = 0 val buffer = ByteBuffer.allocate(512) - var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 1L, epoch, 0, false, 0) + var builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2) + .timestampType(TimestampType.LOG_APPEND_TIME) + .logAppendTime(mockTime.milliseconds()) + .producerState(1L, epoch, 0) + .partitionLeaderEpoch(0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 2L, epoch, 0, false, 0) + builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(1L) + .logAppendTime(mockTime.milliseconds()) + .producerState(2L, epoch, 0) + .partitionLeaderEpoch(0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 3L, epoch, 0, false, 0) + builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(2L) + .logAppendTime(mockTime.milliseconds()) + .producerState(3L, epoch, 0) + .partitionLeaderEpoch(0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 4L, epoch, 0, false, 0) + builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(3L) + .logAppendTime(mockTime.milliseconds()) + .producerState(4L, epoch, 0) + .partitionLeaderEpoch(0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() @@ -1208,32 +1231,51 @@ class LogTest { val buffer = ByteBuffer.allocate(512) // pid1 seq = 0 - var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), pid1, epoch, 0) + var builder = MemoryRecords.builder(buffer) + .timestampType(TimestampType.LOG_APPEND_TIME) + .logAppendTime(mockTime.milliseconds()) + .producerState(pid1, epoch, 0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // pid2 seq = 0 - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), pid2, epoch, 0) + builder = MemoryRecords.builder(buffer) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(1L) + .logAppendTime(mockTime.milliseconds()) + .producerState(pid2, epoch, 0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // pid1 seq = 1 - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), pid1, epoch, 1) + builder = MemoryRecords.builder(buffer) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(2L) + .logAppendTime(mockTime.milliseconds()) + .producerState(pid1, epoch, 1) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // pid2 seq = 1 - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), pid2, epoch, 1) + builder = MemoryRecords.builder(buffer) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(3L) + .logAppendTime(mockTime.milliseconds()) + .producerState(pid2, epoch, 1) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // // pid1 seq = 1 (duplicate) - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), pid1, epoch, 1) + builder = MemoryRecords.builder(buffer) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(4L) + .logAppendTime(mockTime.milliseconds()) + .producerState(pid1, epoch, 1) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() @@ -2088,9 +2130,14 @@ class LogTest { val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L) val buffer = ByteBuffer.allocate(512) for (offset <- appendOffsets) { - val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), - 1L, 0, 0, false, 0) + val builder = MemoryRecords.builder(buffer) + .magic(RecordBatch.MAGIC_VALUE_V2) + .timestampType(TimestampType.LOG_APPEND_TIME) + .baseOffset(offset) + .logAppendTime(mockTime.milliseconds()) + .producerState(1L, 0, 0) + .partitionLeaderEpoch(0) + .build() builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() } @@ -3213,8 +3260,13 @@ class LogTest { val records = Seq(new SimpleRecord(timestamp, key, value)) val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) - val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, offset, - System.currentTimeMillis, leaderEpoch) + val builder = MemoryRecords.builder(buf) + .magic(magicValue) + .compressionType(codec) + .baseOffset(offset) + .logAppendTime(System.currentTimeMillis) + .partitionLeaderEpoch(leaderEpoch) + .build() records.foreach(builder.append) builder.build() } @@ -3790,8 +3842,13 @@ class LogTest { leaderEpoch: Int = 0): (Long, Int) => Unit = { var sequence = 0 (offset: Long, numRecords: Int) => { - val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, - offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch) + val builder = MemoryRecords.builder(buffer) + .baseOffset(offset) + .logAppendTime(System.currentTimeMillis()) + .producerState(producerId, producerEpoch, sequence) + .transaction(true) + .partitionLeaderEpoch(leaderEpoch) + .build() for (seq <- sequence until sequence + numRecords) { val record = new SimpleRecord(s"$seq".getBytes) builder.append(record) @@ -3814,7 +3871,7 @@ class LogTest { } private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = { - val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, offset) + val builder = MemoryRecords.builder(buffer).baseOffset(offset).build(); (0 until numRecords).foreach { seq => builder.append(new SimpleRecord(s"$seq".getBytes)) } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 26c1e5ff0263e..ecd385f615ca1 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1} import kafka.common.LongRef -import kafka.message._ import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time @@ -83,8 +82,8 @@ class LogValidatorTest { new LongRef(0L), time, now = 0L, - CompressionCodec.getCompressionCodec(sourceCompressionType.name), - CompressionCodec.getCompressionCodec(targetCompressionType.name), + sourceCompressionType, + CompressionConfig.of(targetCompressionType), compactedTopic = false, magic, TimestampType.CREATE_TIME, @@ -108,8 +107,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time= time, now = now, - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = magic, timestampType = TimestampType.LOG_APPEND_TIME, @@ -146,8 +145,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = now, - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = targetMagic, timestampType = TimestampType.LOG_APPEND_TIME, @@ -188,8 +187,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = now, - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = magic, timestampType = TimestampType.LOG_APPEND_TIME, @@ -246,8 +245,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = time.milliseconds(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -288,8 +287,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = magic, timestampType = TimestampType.CREATE_TIME, @@ -355,8 +354,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = GZIPCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = magic, timestampType = TimestampType.CREATE_TIME, @@ -406,8 +405,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = toMagic, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -448,8 +447,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = timestamp, - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = RecordBatch.MAGIC_VALUE_V2, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -503,8 +502,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = magic, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -554,8 +553,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -575,8 +574,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.CREATE_TIME, @@ -596,8 +595,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = RecordBatch.MAGIC_VALUE_V1, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -617,8 +616,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), magic = RecordBatch.MAGIC_VALUE_V1, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -637,8 +636,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), magic = RecordBatch.MAGIC_VALUE_V0, compactedTopic = false, timestampType = TimestampType.CREATE_TIME, @@ -657,8 +656,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -678,8 +677,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -700,8 +699,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.CREATE_TIME, @@ -723,8 +722,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -746,8 +745,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.CREATE_TIME, @@ -767,8 +766,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.LOG_APPEND_TIME, @@ -790,8 +789,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -813,8 +812,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.LOG_APPEND_TIME, @@ -836,8 +835,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -859,8 +858,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.CURRENT_MAGIC_VALUE, timestampType = TimestampType.CREATE_TIME, @@ -879,8 +878,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = SnappyCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.of(CompressionType.SNAPPY), compactedTopic = false, magic = RecordBatch.CURRENT_MAGIC_VALUE, timestampType = TimestampType.CREATE_TIME, @@ -904,8 +903,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -925,8 +924,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -945,8 +944,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -965,8 +964,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -986,8 +985,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1007,8 +1006,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1030,8 +1029,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1053,8 +1052,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V1, timestampType = TimestampType.CREATE_TIME, @@ -1074,8 +1073,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.none(), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -1095,8 +1094,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, + sourceType = CompressionType.GZIP, + targetConfig = CompressionConfig.of(CompressionType.GZIP), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V0, timestampType = TimestampType.CREATE_TIME, @@ -1108,7 +1107,7 @@ class LogValidatorTest { @Test(expected = classOf[InvalidRecordException]) def testCompressedBatchWithoutRecordsNotAllowed(): Unit = { - testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, DefaultCompressionCodec) + testBatchWithoutRecordsNotAllowed(CompressionType.GZIP, CompressionType.GZIP) } @Test(expected = classOf[UnsupportedCompressionTypeException]) @@ -1120,8 +1119,8 @@ class LogValidatorTest { offsetCounter = new LongRef(0), time= time, now = now, - sourceCodec = NoCompressionCodec, - targetCodec = ZStdCompressionCodec, + sourceType = CompressionType.NONE, + targetConfig = CompressionConfig.of(CompressionType.ZSTD), compactedTopic = false, magic = RecordBatch.MAGIC_VALUE_V2, timestampType = TimestampType.LOG_APPEND_TIME, @@ -1133,15 +1132,15 @@ class LogValidatorTest { @Test(expected = classOf[InvalidRecordException]) def testUncompressedBatchWithoutRecordsNotAllowed(): Unit = { - testBatchWithoutRecordsNotAllowed(NoCompressionCodec, NoCompressionCodec) + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, CompressionType.NONE) } @Test(expected = classOf[InvalidRecordException]) def testRecompressedBatchWithoutRecordsNotAllowed(): Unit = { - testBatchWithoutRecordsNotAllowed(NoCompressionCodec, DefaultCompressionCodec) + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, CompressionType.GZIP) } - private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = { + private def testBatchWithoutRecordsNotAllowed(sourceType: CompressionType, targetType: CompressionType): Unit = { val offset = 1234567 val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = (1324L, 10.toShort, 984, true, 40) @@ -1155,8 +1154,8 @@ class LogValidatorTest { offsetCounter = new LongRef(offset), time = time, now = System.currentTimeMillis(), - sourceCodec = sourceCodec, - targetCodec = targetCodec, + sourceType = sourceType, + targetConfig = CompressionConfig.of(targetType), compactedTopic = false, magic = RecordBatch.CURRENT_MAGIC_VALUE, timestampType = TimestampType.CREATE_TIME, @@ -1170,7 +1169,7 @@ class LogValidatorTest { timestamp: Long = RecordBatch.NO_TIMESTAMP, codec: CompressionType): MemoryRecords = { val buf = ByteBuffer.allocate(512) - val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) + val builder = MemoryRecords.builder(buf).magic(magicValue).compressionType(codec).build(); builder.appendWithOffset(0, timestamp, null, "hello".getBytes) builder.appendWithOffset(1, timestamp, null, "there".getBytes) builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes) @@ -1181,10 +1180,10 @@ class LogValidatorTest { timestamp: Long = RecordBatch.NO_TIMESTAMP, codec: CompressionType): MemoryRecords = { val buf = ByteBuffer.allocate(2048) - var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) + var builder = MemoryRecords.builder(buf).magic(magicValue).compressionType(codec).build(); builder.append(10L, "1".getBytes(), "a".getBytes()) builder.close() - builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L) + builder = MemoryRecords.builder(buf).magic(magicValue).compressionType(codec).baseOffset(1L).build(); builder.append(11L, "2".getBytes(), "b".getBytes()) builder.append(12L, "3".getBytes(), "c".getBytes()) builder.close() @@ -1196,7 +1195,7 @@ class LogValidatorTest { private def createDiscontinuousOffsetRecords(magicValue: Byte, codec: CompressionType): MemoryRecords = { val buf = ByteBuffer.allocate(512) - val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) + val builder = MemoryRecords.builder(buf).magic(magicValue).compressionType(codec).build() builder.appendWithOffset(0, RecordBatch.NO_TIMESTAMP, null, "hello".getBytes) builder.appendWithOffset(2, RecordBatch.NO_TIMESTAMP, null, "there".getBytes) builder.appendWithOffset(3, RecordBatch.NO_TIMESTAMP, null, "beautiful".getBytes) @@ -1223,8 +1222,7 @@ class LogValidatorTest { id.toString.getBytes)) val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) - val builder = MemoryRecords.builder(buffer, batchMagicValue, codec, - TimestampType.CREATE_TIME, 0L) + val builder = MemoryRecords.builder(buffer).magic(batchMagicValue).compressionType(codec).build() var offset = 1234567 records.foreach { record => diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 55c38a1deab80..6b1f8a046c0e5 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo -import kafka.message.NoCompressionCodec import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.utils.TestUtils import org.apache.kafka.common.KafkaException @@ -876,8 +875,8 @@ class AbstractFetcherThreadTest { logAppendTime = Time.SYSTEM.milliseconds(), logStartOffset = state.logStartOffset, recordConversionStats = RecordConversionStats.EMPTY, - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, + sourceType = CompressionType.NONE, + targetType = CompressionType.NONE, shallowCount = batches.size, validBytes = partitionData.records.sizeInBytes, offsetsMonotonic = true, diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 98fbc4e4dc25f..5c2bbe33a53be 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -412,8 +412,14 @@ object TestUtils extends Logging { baseOffset: Long = 0L, partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) - val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset, - System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch) + val builder = MemoryRecords.builder(buf) + .magic(magicValue) + .compressionType(codec) + .baseOffset(baseOffset) + .logAppendTime(System.currentTimeMillis) + .producerState(producerId, producerEpoch, sequence) + .partitionLeaderEpoch(partitionLeaderEpoch) + .build() records.foreach(builder.append) builder.build() } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java index 73552c2131ef0..136428c93e26a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.CloseableIterator; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -105,7 +104,7 @@ private ByteBuffer createBatch(int batchSize) { ); final MemoryRecordsBuilder builder = - MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset); + MemoryRecords.builder(buf).magic(messageVersion).compressionType(compressionType).baseOffset(startingOffset).build(); for (int i = 0; i < batchSize; ++i) { switch (bytes) {