Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
45cfc64
Remove MemoryRecordsBuilder(ByteBuffer, byte, CompressionType, Timest…
dongjinleekr Jun 18, 2019
7333166
a builder to build MemoryRecordsBuilder with a fluent method call, al…
dongjinleekr Jun 20, 2019
a61e8ac
Remove MemoryRecords#builder(ByteBuffer, CompressionType, TimestampTy…
dongjinleekr Jun 21, 2019
929b38a
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
1ad88d6
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
e0a1fb9
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
83d9662
Remove MemoryRecords#builder(ByteBuffer, CompressionType, long, long,…
dongjinleekr Jun 21, 2019
7f927d5
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
fe694f0
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
cd21bb0
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
bd6377e
Refactor MemoryRecords#writeEndTransactionalMarker to use MemoryRecor…
dongjinleekr Jun 22, 2019
3f4b2f4
Refactor MemoryRecords#withRecords to use MemoryRecords#builder, inst…
dongjinleekr Jun 22, 2019
0cfcef0
Add BrokerCompressionTest#testGetTargetCompressionCodec: validates Br…
dongjinleekr Jun 22, 2019
a48fd96
BrokerCompressionCodec#getTargetCompressionCodec -> BrokerCompression…
dongjinleekr Jun 22, 2019
ebe2f1a
Refactor KafkaLZ4BlockOutputStream, KafkaLZ4Test + Add support to com…
dongjinleekr Jun 21, 2019
b2c90b8
Add GZipOutputStream to support compression level, block size for Gzip
dongjinleekr Jun 21, 2019
3c91af6
Add CompressionConfig + expand CompressionType#wrapForOutput
dongjinleekr Jun 22, 2019
b6c4074
Make broker to use CompressionConfig
dongjinleekr Jun 22, 2019
d116b47
Make RecordAccumulator to use CompressionConfig
dongjinleekr Jun 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
Expand Down Expand Up @@ -395,7 +396,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
CompressionConfig.of(this.compressionType),
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
Expand Down Expand Up @@ -298,8 +297,7 @@ private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batc
// Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional)
// for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent
// with how normal batches are handled).
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
TimestampType.CREATE_TIME, 0L);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer).magic(magic()).compressionType(recordsBuilder.compressionType()).build();
return new ProducerBatch(topicPartition, builder, this.createdMs, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
Expand All @@ -44,13 +45,12 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
Expand All @@ -71,7 +71,7 @@ public final class RecordAccumulator {
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final CompressionConfig compressionConfig;
private final int lingerMs;
private final long retryBackoffMs;
private final int deliveryTimeoutMs;
Expand All @@ -91,7 +91,7 @@ public final class RecordAccumulator {
*
* @param logContext The log context used for logging
* @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param compression The compression codec for the records
* @param compressionConfig The compression type/level/buffer size for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
Expand All @@ -105,7 +105,7 @@ public final class RecordAccumulator {
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
CompressionConfig compressionConfig,
int lingerMs,
long retryBackoffMs,
int deliveryTimeoutMs,
Expand All @@ -121,7 +121,7 @@ public RecordAccumulator(LogContext logContext,
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.compressionConfig = compressionConfig;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.deliveryTimeoutMs = deliveryTimeoutMs;
Expand Down Expand Up @@ -205,7 +205,7 @@ public RecordAppendResult append(TopicPartition tp,

// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compressionConfig.getType(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
Expand Down Expand Up @@ -242,7 +242,10 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
return MemoryRecords.builder(buffer)
.magic(maxUsableMagic)
.compressionConfig(compressionConfig)
.build();
}

/**
Expand Down Expand Up @@ -340,7 +343,7 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compressionConfig.getType(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.DataOutputStream;

/**
* This class holds all compression configurations: compression type, compression level and the size of compression buffer.
*/
public class CompressionConfig {
private final CompressionType type;
private final Integer level;
private final Integer bufferSize;

private CompressionConfig(CompressionType type, Integer level, Integer bufferSize) {
this.type = type;
this.level = level;
this.bufferSize = bufferSize;
}

public CompressionType getType() {
return type;
}

public Integer getLevel() {
return level;
}

public Integer getBufferSize() {
return bufferSize;
}

/**
* Returns an {@link DataOutputStream} that compresses given bytes into <code>output</code> {@link ByteBufferOutputStream}
* with specified <code>magic</code>.
*/
public DataOutputStream outputStream(ByteBufferOutputStream output, byte magic) {
return new DataOutputStream(type.wrapForOutput(output, magic, this.level, this.bufferSize));
}

/**
* Creates a not-compressing configuration.
*/
public static CompressionConfig none() {
return of(CompressionType.NONE);
}

/**
* Creates a configuration of specified {@link CompressionType}, default compression level, and compression buffer size.
*/
public static CompressionConfig of(CompressionType type) {
return of(type, null);
}

/**
* Creates a configuration of specified {@link CompressionType}, specified compression level, and default compression buffer size.
*/
public static CompressionConfig of(CompressionType type, Integer level) {
return of(type, level, null);
}

/**
* Creates a configuration of specified {@link CompressionType}, compression level, and compression buffer size.
*/
public static CompressionConfig of(CompressionType type, Integer level, Integer bufferSize) {
return new CompressionConfig(type, level, bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* The compression type to use
*/
public enum CompressionType {
NONE(0, "none", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
return buffer;
}

Expand All @@ -49,12 +48,12 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu

GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
// Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
// number of bytes to write (potentially a single byte)
return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
return new BufferedOutputStream(GZipOutputStream.of(buffer, level, blockSize), 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
Expand All @@ -76,9 +75,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu

SNAPPY(2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer);
// Snappy does not support compression level; so given parameter is ignored.
if (blockSize == null) {
return (OutputStream) SnappyConstructors.OUTPUT_WITHOUT_BLOCK_SIZE.invoke(buffer);
} else {
return (OutputStream) SnappyConstructors.OUTPUT_WITH_BLOCK_SIZE.invoke(buffer, blockSize);
}
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -96,9 +100,9 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu

LZ4(3, "lz4", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
return KafkaLZ4BlockOutputStream.of(buffer, messageVersion, level, blockSize);
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -108,7 +112,7 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
messageVersion == RecordBatch.MAGIC_VALUE_V0);
messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -117,9 +121,14 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf

ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, Integer level, Integer blockSize) {
try {
return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
// Zstd does not support block size configuration; so given parameter is ignored.
if (level == null) {
return (OutputStream) ZstdConstructors.OUTPUT_WITHOUT_LEVEL.invoke(buffer);
} else {
return (OutputStream) ZstdConstructors.OUTPUT_WITH_LEVEL.invoke(buffer, level);
}
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -146,14 +155,17 @@ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSu
}

/**
* Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
*
* Wrap bufferStream with an OutputStream that will compress data with this CompressionType with given buffer size and compression level.
* <p>
* Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@link ByteBuffer}s directly.
* Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
* write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written.
* In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved.
*
* @param level The compression level to use. If null, it falls back to the default level.
* @param blockSize The buffer size to use during compression. If null, it falls back to the default block size.
*/
public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion);
public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion, Integer level, Integer blockSize);

/**
* Wrap buffer with an InputStream that will decompress data with this CompressionType.
Expand Down Expand Up @@ -210,15 +222,19 @@ else if (ZSTD.name.equals(name))
private static class SnappyConstructors {
static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream",
static final MethodHandle OUTPUT_WITHOUT_BLOCK_SIZE = findConstructor("org.xerial.snappy.SnappyOutputStream",
MethodType.methodType(void.class, OutputStream.class));
static final MethodHandle OUTPUT_WITH_BLOCK_SIZE = findConstructor("org.xerial.snappy.SnappyOutputStream",
MethodType.methodType(void.class, OutputStream.class, int.class));
}

private static class ZstdConstructors {
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class));
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT_WITHOUT_LEVEL = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class));
static final MethodHandle OUTPUT_WITH_LEVEL = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class, int.class));
}

private static MethodHandle findConstructor(String className, MethodType methodType) {
Expand Down
Loading