Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
Expand Down
10 changes: 7 additions & 3 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,22 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
<subpackage name="fault">
<allow pkg="org.apache.kafka.server.fault" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="util">
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
</subpackage>
</subpackage>

<subpackage name="metalog">
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
Expand Down
10 changes: 8 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@
<allow pkg="com.github.luben.zstd" />
<allow pkg="net.jpountz.lz4" />
<allow pkg="net.jpountz.xxhash" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.xerial.snappy" />
<allow pkg="org.apache.kafka.common.compress" />
<allow class="org.apache.kafka.common.record.CompressionType" exact-match="true" />
<allow class="org.apache.kafka.common.record.RecordBatch" exact-match="true" />
</subpackage>

<subpackage name="message">
Expand Down Expand Up @@ -138,6 +140,7 @@

<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
Expand Down Expand Up @@ -166,6 +169,7 @@

<subpackage name="requests">
<allow pkg="org.apache.kafka.common.acl" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.message" />
Expand Down Expand Up @@ -426,6 +430,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.message" />
Expand All @@ -444,8 +449,9 @@
</subpackage>

<subpackage name="snapshot">
<allow pkg="org.apache.kafka.common.record" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test"/>
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

<suppress checks="BooleanExpressionComplexity"
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
files="(Utils|Topic|Lz4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -250,7 +251,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Compression compression;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer;
Expand Down Expand Up @@ -413,7 +414,7 @@ private void warnIfPartitionerDeprecated() {
Arrays.asList(this.keySerializer, this.valueSerializer));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.compression = configureCompression(config);

this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
Expand All @@ -432,7 +433,7 @@ private void warnIfPartitionerDeprecated() {
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,
batchSize,
this.compressionType,
compression,
lingerMs(config),
retryBackoffMs,
retryBackoffMaxMs,
Expand Down Expand Up @@ -501,7 +502,7 @@ private void warnIfPartitionerDeprecated() {
this.interceptors = interceptors;
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.compression = configureCompression(config);
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
this.apiVersions = new ApiVersions();
Expand Down Expand Up @@ -548,6 +549,29 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
apiVersions);
}

private static Compression configureCompression(ProducerConfig config) {
CompressionType type = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
switch (type) {
case GZIP: {
return Compression.gzip()
.level(config.getInt(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG))
.build();
}
case LZ4: {
return Compression.lz4()
.level(config.getInt(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG))
.build();
}
case ZSTD: {
return Compression.zstd()
.level(config.getInt(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG))
.build();
}
default:
return Compression.of(type).build();
}
}

private static int lingerMs(ProducerConfig config) {
return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE);
}
Expand Down Expand Up @@ -1033,7 +1057,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
compression.type(), serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.Lz4Compression;
import org.apache.kafka.common.compress.ZstdCompression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -225,6 +228,18 @@ public class ProducerConfig extends AbstractConfig {
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>. "
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";

/** <code>compression.gzip.level</code> */
public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level";
private static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>gzip</code>.";

/** <code>compression.lz4.level</code> */
public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level";
private static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>lz4</code>.";

/** <code>compression.zstd.level</code> */
public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level";
private static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>zstd</code>.";

/** <code>metrics.sample.window.ms</code> */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;

Expand Down Expand Up @@ -364,6 +379,9 @@ public class ProducerConfig extends AbstractConfig {
Importance.LOW,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(COMPRESSION_GZIP_LEVEL_CONFIG, Type.INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
.define(COMPRESSION_LZ4_LEVEL_CONFIG, Type.INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, Type.INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon
this.retry = false;
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
recordsBuilder.compression().type());
this.currentLeaderEpoch = OptionalInt.empty();
this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
Expand Down Expand Up @@ -146,7 +146,7 @@ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value,
} else {
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
recordsBuilder.compression().type(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
Expand All @@ -172,7 +172,7 @@ private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer val
// No need to get the CRC.
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
recordsBuilder.compression().type(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.remaining(),
Expand Down Expand Up @@ -377,19 +377,19 @@ public Deque<ProducerBatch> split(int splitBatchSize) {

private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
int initialSize = Math.max(AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), record.key(), record.value(), record.headers()), batchSize);
recordsBuilder.compression().type(), record.key(), record.value(), record.headers()), batchSize);
ByteBuffer buffer = ByteBuffer.allocate(initialSize);

// Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional)
// for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent
// with how normal batches are handled).
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compression(),
TimestampType.CREATE_TIME, 0L);
return new ProducerBatch(topicPartition, builder, this.createdMs, true);
}

public boolean isCompressed() {
return recordsBuilder.compressionType() != CompressionType.NONE;
return recordsBuilder.compression().type() != CompressionType.NONE;
}

/**
Expand Down Expand Up @@ -491,7 +491,7 @@ public void close() {
recordsBuilder.close();
if (!recordsBuilder.isControlBatch()) {
CompressionRatioEstimator.updateEstimation(topicPartition.topic(),
recordsBuilder.compressionType(),
recordsBuilder.compression().type(),
(float) recordsBuilder.compressionRatio());
}
reopened = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
Expand All @@ -48,7 +49,6 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
Expand All @@ -74,7 +74,7 @@ public class RecordAccumulator {
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final Compression compression;
private final int lingerMs;
private final ExponentialBackoff retryBackoff;
private final int deliveryTimeoutMs;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class RecordAccumulator {
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
Compression compression,
int lingerMs,
long retryBackoffMs,
long retryBackoffMaxMs,
Expand Down Expand Up @@ -176,7 +176,7 @@ public RecordAccumulator(LogContext logContext,
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
Compression compression,
int lingerMs,
long retryBackoffMs,
long retryBackoffMaxMs,
Expand Down Expand Up @@ -344,7 +344,7 @@ public RecordAppendResult append(String topic,

if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Expand Down Expand Up @@ -533,7 +533,7 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down
Loading