diff --git a/build.gradle.kts b/build.gradle.kts index 8dcdc14f04e7..664fb8a83d09 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -304,6 +304,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:contextualtextio:build") dependsOn(":sdks:java:io:expansion-service:build") dependsOn(":sdks:java:io:file-based-io-tests:build") + dependsOn(":sdks:java:io:kafka:jmh:build") dependsOn(":sdks:java:io:sparkreceiver:3:build") dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") diff --git a/sdks/java/io/kafka/jmh/build.gradle.kts b/sdks/java/io/kafka/jmh/build.gradle.kts new file mode 100644 index 000000000000..2f11dbff7ee0 --- /dev/null +++ b/sdks/java/io/kafka/jmh/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + groovy + id("org.apache.beam.module") +} + +val applyJavaNature: groovy.lang.Closure by extra +applyJavaNature(mapOf( + "automaticModuleName" to "org.apache.beam.sdk.io.kafka.jmh", + "enableJmh" to true, + "publish" to false)) + +description = "Apache Beam :: SDKs :: Java :: IO :: Kafka :: JMH" +val summary by extra("This contains JMH benchmarks for the Kafka IO connector for Beam Java") + +dependencies { + implementation(project(":sdks:java:io:kafka")) +} diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java new file mode 100644 index 000000000000..8523e2094895 --- /dev/null +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka.jmh; + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.infra.ThreadParams; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Threads(Threads.MAX) +public class KafkaIOUtilsBenchmark { + private static final int SIZE = 1024; + + @State(Scope.Thread) + public static class ProducerState { + private int[] values; + private int idx; + + @Setup(Level.Iteration) + public void setup(final IterationParams ip, final ThreadParams tp) { + values = new Random(299792458 + ip.getCount()).ints(SIZE, 0, 100).toArray(); + idx = 0; + } + + int next() { + final int value = values[idx]; + idx = (idx + 1) % SIZE; + return value; + } + } + + @State(Scope.Group) + public static class PlainAccumulatorState { + // As implemented before 2.64.0. + // Note that numUpdates may overflow and count back from Long.MIN_VALUE. + static class MovingAvg { + private static final int MOVING_AVG_WINDOW = 1000; + private double avg = 0; + private long numUpdates = 0; + + void update(double quantity) { + numUpdates++; + avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + } + + double get() { + return avg; + } + } + + MovingAvg accumulator; + + @Setup(Level.Trial) + public void setup() { + accumulator = new MovingAvg(); + } + } + + @State(Scope.Group) + public static class AtomicAccumulatorState { + KafkaIOUtils.MovingAvg accumulator; + + @Setup(Level.Trial) + public void setup() { + accumulator = new KafkaIOUtils.MovingAvg(); + } + } + + @State(Scope.Group) + public static class VolatileAccumulatorState { + // Atomic accumulator using only volatile reads and writes. + static class MovingAvg { + private static final int MOVING_AVG_WINDOW = 1000; + + private volatile double avg = 0; + private long numUpdates = 0; + + void update(final double quantity) { + final double prevAvg = avg; + numUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); + avg = prevAvg + (quantity - prevAvg) / numUpdates; + } + + double get() { + return avg; + } + } + + MovingAvg accumulator; + + @Setup(Level.Trial) + public void setup() { + accumulator = new MovingAvg(); + } + } + + @Benchmark + @Group("WritePlain") + public void plainWrite(final PlainAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadPlain") + public double plainRead(final PlainAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("ReadAndWritePlain") + public void plainWriteWhileReading(final PlainAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAndWritePlain") + public double plainReadWhileWriting(final PlainAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("WriteSynchronizedPlain") + public void synchronizedPlainWrite(final PlainAccumulatorState as, final ProducerState ps) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + final int value = ps.next(); + synchronized (accumulator) { + accumulator.update(value); + } + } + + @Benchmark + @Group("ReadSynchronizedPlain") + public double synchronizedPlainRead(final PlainAccumulatorState as) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + synchronized (accumulator) { + return accumulator.get(); + } + } + + @Benchmark + @Group("ReadAndWriteSynchronizedPlain") + public void synchronizedPlainWriteWhileReading( + final PlainAccumulatorState as, final ProducerState ps) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + final int value = ps.next(); + synchronized (accumulator) { + accumulator.update(value); + } + } + + @Benchmark + @Group("ReadAndWriteSynchronizedPlain") + public double synchronizedPlainReadWhileWriting(final PlainAccumulatorState as) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + synchronized (accumulator) { + return accumulator.get(); + } + } + + @Benchmark + @Group("WriteAtomic") + public void atomicWrite(final AtomicAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAtomic") + public double atomicRead(final AtomicAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("ReadAndWriteAtomic") + public void atomicWriteWhileReading(final AtomicAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAndWriteAtomic") + public double atomicReadWhileWriting(final AtomicAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("WriteVolatile") + public void volatileWrite(final VolatileAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadVolatile") + public double volatileRead(final VolatileAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("ReadAndWriteVolatile") + public void volatileWriteWhileReading(final VolatileAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAndWriteVolatile") + public double volatileReadWhileWriting(final VolatileAccumulatorState as) { + return as.accumulator.get(); + } +} diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java new file mode 100644 index 000000000000..bfdefa2be4ec --- /dev/null +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Benchmarks for KafkaIO. */ +package org.apache.beam.sdk.io.kafka.jmh; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 95f95000a58f..8b778ce5481e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; @@ -129,19 +130,43 @@ static Map getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Maintains approximate average over last 1000 elements. + * Usage is only thread-safe for a single producer and multiple consumers. + */ + public static final class MovingAvg { + private static final AtomicLongFieldUpdater AVG = + AtomicLongFieldUpdater.newUpdater(MovingAvg.class, "avg"); private static final int MOVING_AVG_WINDOW = 1000; - private double avg = 0; - private long numUpdates = 0; - void update(double quantity) { - numUpdates++; - avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + private volatile long avg; + private long numUpdates; + + private double getAvg() { + return Double.longBitsToDouble(avg); + } + + private void setAvg(final double value) { + AVG.lazySet(this, Double.doubleToRawLongBits(value)); + } + + private long incrementAndGetNumUpdates() { + final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); + numUpdates = nextNumUpdates; + return nextNumUpdates; + } + + public void update(final double quantity) { + final double prevAvg = getAvg(); // volatile load (acquire) + + final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store + final double nextAvg = prevAvg + (quantity - prevAvg) / nextNumUpdates; // normal load/store + + setAvg(nextAvg); // ordered store (release) } - double get() { - return avg; + public double get() { + return getAvg(); // volatile load (acquire) } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 0f4f44999133..074bba54ac21 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -187,13 +187,6 @@ public boolean advance() throws IOException { continue; } - long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. - - if (curRecord == null) { - LOG.info("{}: first record offset {}", name, offset); - offsetGap = 0; - } - // Apply user deserializers. User deserializers might throw, which will be propagated up // and 'curRecord' remains unchanged. The runner should close this reader. // TODO: write records that can't be deserialized to a "dead-letter" additional output. @@ -219,7 +212,7 @@ public boolean advance() throws IOException { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - pState.recordConsumed(offset, recordSize, offsetGap); + pState.recordConsumed(offset, recordSize); bytesRead.inc(recordSize); bytesReadBySplit.inc(recordSize); @@ -470,8 +463,6 @@ private static class PartitionState { private Iterator> recordIter = Collections.emptyIterator(); private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg(); - private KafkaIOUtils.MovingAvg avgOffsetGap = - new KafkaIOUtils.MovingAvg(); // > 0 only when log compaction is enabled. PartitionState( TopicPartition partition, long nextOffset, TimestampPolicy timestampPolicy) { @@ -487,13 +478,10 @@ public TopicPartition topicPartition() { return topicPartition; } - // Update consumedOffset, avgRecordSize, and avgOffsetGap - void recordConsumed(long offset, int size, long offsetGap) { + // Update consumedOffset and avgRecordSize + void recordConsumed(long offset, int size) { nextOffset = offset + 1; - - // This is always updated from single thread. Probably not worth making atomic. avgRecordSize.update(size); - avgOffsetGap.update(offsetGap); } synchronized void setLatestOffset(long latestOffset, Instant fetchTime) { @@ -521,7 +509,7 @@ synchronized long backlogMessageCount() { if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } - double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get()); + double remaining = latestOffset - nextOffset; return Math.max(0, (long) Math.ceil(remaining)); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index daf7e91b03bd..5e16447241e7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -30,8 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -218,7 +216,7 @@ private static final class SharedStateHolder { private static final Map> OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>(); - private static final Map> + private static final Map> AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>(); } @@ -248,8 +246,7 @@ private static final class SharedStateHolder { private transient @Nullable LoadingCache offsetEstimatorCache; - private transient @Nullable LoadingCache - avgRecordSizeCache; + private transient @Nullable LoadingCache avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @@ -360,24 +357,22 @@ public WatermarkEstimator newWatermarkEstimator( public double getSize( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange) throws ExecutionException { - // If present, estimates the record size to offset gap ratio. Compacted topics may hold less - // records than the estimated offset range due to record deletion within a partition. - final LoadingCache avgRecordSizeCache = + // If present, estimates the record size. + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); - final @Nullable AverageRecordSize avgRecordSize = + final @Nullable MovingAvg avgRecordSize = avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor); // The tracker estimates the offset range by subtracting the last claimed position from the // currently observed end offset for the partition belonging to this split. - double estimatedOffsetRange = + final double estimatedOffsetRange = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining(); - // Before processing elements, we don't have a good estimated size of records and offset gap. - // Return the estimated offset range without scaling by a size to gap ratio. - if (avgRecordSize == null) { - return estimatedOffsetRange; - } - // When processing elements, a moving average estimates the size of records and offset gap. - // Return the estimated offset range scaled by the estimated size to gap ratio. - return estimatedOffsetRange * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio(); + + // Before processing elements, we don't have a good estimated size of records. + // When processing elements, a moving average estimates the size of records. + // Return the estimated offset range scaled by the estimated size if present. + return avgRecordSize == null + ? estimatedOffsetRange + : estimatedOffsetRange * avgRecordSize.get(); } @NewTracker @@ -406,7 +401,7 @@ public ProcessContinuation processElement( WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -415,7 +410,7 @@ public ProcessContinuation processElement( final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); + // TODO: Metrics should be reported per split instead of partition, add bootstrap server hash? final Distribution rawSizes = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString()); @@ -453,6 +448,8 @@ public ProcessContinuation processElement( final Stopwatch sw = Stopwatch.createStarted(); while (true) { + // Fetch the record size accumulator. + final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor); rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); // When there are no records available for the current TopicPartition, self-checkpoint // and move to process the next element. @@ -515,9 +512,7 @@ public ProcessContinuation processElement( int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - avgRecordSizeCache - .getUnchecked(kafkaSourceDescriptor) - .update(recordSize, rawRecord.offset() - expectedOffset); + avgRecordSize.update(recordSize); rawSizes.update(recordSize); expectedOffset = rawRecord.offset() + 1; Instant outputTimestamp; @@ -556,7 +551,7 @@ public ProcessContinuation processElement( offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() - * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + * avgRecordSize.get())); KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); kafkaResults.updateBacklogBytes( kafkaSourceDescriptor.getTopic(), @@ -567,7 +562,7 @@ public ProcessContinuation processElement( offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() - * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + * avgRecordSize.get())); kafkaResults.flushBufferedMetrics(); } } @@ -628,7 +623,7 @@ public Coder restrictionCoder() { @Setup public void setup() throws Exception { - // Start to track record size and offset gap per bundle. + // Start to track record size. avgRecordSizeCache = SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent( fnId, @@ -636,11 +631,11 @@ public void setup() throws Exception { return CacheBuilder.newBuilder() .maximumSize(1000L) .build( - new CacheLoader() { + new CacheLoader() { @Override - public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { - return new AverageRecordSize(); + return new MovingAvg(); } }); }); @@ -687,7 +682,7 @@ public KafkaLatestOffsetEstimator load( @Teardown public void teardown() throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -726,45 +721,6 @@ private Map overrideBootstrapServersConfig( return config; } - // TODO: Collapse the two moving average trackers into a single accumulator using a single Guava - // AtomicDouble. Note that this requires that a single thread will call update and that while get - // may be called by multiple threads the method must only load the accumulator itself. - @ThreadSafe - private static class AverageRecordSize { - @GuardedBy("this") - private MovingAvg avgRecordSize; - - @GuardedBy("this") - private MovingAvg avgRecordGap; - - public AverageRecordSize() { - this.avgRecordSize = new MovingAvg(); - this.avgRecordGap = new MovingAvg(); - } - - public synchronized void update(int recordSize, long gap) { - avgRecordSize.update(recordSize); - avgRecordGap.update(gap); - } - - public double estimateRecordByteSizeToOffsetCountRatio() { - double avgRecordSize; - double avgRecordGap; - - synchronized (this) { - avgRecordSize = this.avgRecordSize.get(); - avgRecordGap = this.avgRecordGap.get(); - } - - // The offset increases between records in a batch fetched from a compacted topic may be - // greater than 1. Compacted topics only store records with the greatest offset per key per - // partition, the records in between are deleted and will not be observed by a consumer. - // The observed gap between offsets is used to estimate the number of records that are likely - // to be observed for the provided number of records. - return avgRecordSize / (1 + avgRecordGap); - } - } - private static Instant ensureTimestampWithinBounds(Instant timestamp) { if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/settings.gradle.kts b/settings.gradle.kts index 9bf86c9bf2f1..821d60bd453a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -241,6 +241,7 @@ include(":sdks:java:io:jdbc") include(":sdks:java:io:jms") include(":sdks:java:io:json") include(":sdks:java:io:kafka") +include(":sdks:java:io:kafka:jmh") include(":sdks:java:io:kafka:upgrade") include(":sdks:java:io:kudu") include(":sdks:java:io:mongodb")