From 490012c31f92b53ca9871b6844009faeb225a357 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 4 Mar 2025 16:35:14 +0000 Subject: [PATCH 01/10] Fix data race in MovingAvg, remove offset gap ratio --- build.gradle.kts | 1 + sdks/java/io/kafka/jmh/build.gradle | 31 +++++ .../sdk/io/kafka/KafkaIOUtilsBenchmark.java | 126 ++++++++++++++++++ .../beam/sdk/io/kafka/KafkaIOUtils.java | 74 +++++++++- .../sdk/io/kafka/KafkaUnboundedReader.java | 18 +-- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 94 ++++--------- settings.gradle.kts | 1 + 7 files changed, 255 insertions(+), 90 deletions(-) create mode 100644 sdks/java/io/kafka/jmh/build.gradle create mode 100644 sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java diff --git a/build.gradle.kts b/build.gradle.kts index 8dcdc14f04e7..664fb8a83d09 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -304,6 +304,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:contextualtextio:build") dependsOn(":sdks:java:io:expansion-service:build") dependsOn(":sdks:java:io:file-based-io-tests:build") + dependsOn(":sdks:java:io:kafka:jmh:build") dependsOn(":sdks:java:io:sparkreceiver:3:build") dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") diff --git a/sdks/java/io/kafka/jmh/build.gradle b/sdks/java/io/kafka/jmh/build.gradle new file mode 100644 index 000000000000..5f3d0d38f092 --- /dev/null +++ b/sdks/java/io/kafka/jmh/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.kafka.jmh', + enableJmh: true, + publish: false) + +description = "Apache Beam :: SDKs :: Java :: IO :: Kafka :: JMH" +ext.summary = "This contains JMH benchmarks for the Kafka IO connector for Beam Java" + +dependencies { + implementation project(path: ":sdks:java:io:kafka") +} diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java new file mode 100644 index 000000000000..cd216b5460f0 --- /dev/null +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.infra.ThreadParams; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(batchSize = KafkaIOUtilsBenchmark.SIZE) +@Measurement(batchSize = KafkaIOUtilsBenchmark.SIZE) +public class KafkaIOUtilsBenchmark { + static final int READERS = 2; + static final int WRITERS = 1; + static final int SIZE = 1024; + + @State(Scope.Thread) + public static class ProducerState { + private int[] values; + private int idx; + + @Setup(Level.Iteration) + public void setup(final IterationParams ip, final ThreadParams tp) { + values = + new Random(299792458 + ip.getCount() + tp.getThreadIndex()) + .ints(KafkaIOUtilsBenchmark.SIZE, 0, 100) + .toArray(); + idx = 0; + } + + @TearDown(Level.Invocation) + public void tearDown(final IterationParams ip, final ThreadParams tp) { + idx = (idx + 1) % KafkaIOUtilsBenchmark.SIZE; + } + } + + @State(Scope.Group) + public static class PlainAccumulatorState { + // As implemented before 2.64.0. + // Note that numUpdates may overflow and count back from Long.MIN_VALUE. + static class MovingAvg { + private static final int MOVING_AVG_WINDOW = 1000; + private double avg = 0; + private long numUpdates = 0; + + void update(double quantity) { + numUpdates++; + avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + } + + double get() { + return avg; + } + } + + private MovingAvg accumulator; + + @Setup(Level.Iteration) + public void setup(final IterationParams ip, final ThreadParams tp) { + accumulator = new MovingAvg(); + } + } + + @State(Scope.Group) + public static class AtomicAccumulatorState { + private final KafkaIOUtils.MovingAvg accumulator = new KafkaIOUtils.MovingAvg(); + } + + @Benchmark + @Group("Plain") + @GroupThreads(KafkaIOUtilsBenchmark.WRITERS) + public void plainWrite(final PlainAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.values[ps.idx]); + } + + @Benchmark + @Group("Plain") + @GroupThreads(KafkaIOUtilsBenchmark.READERS) + public double plainRead(final PlainAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("Atomic") + @GroupThreads(KafkaIOUtilsBenchmark.WRITERS) + public void atomicWrite(final AtomicAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.values[ps.idx]); + } + + @Benchmark + @Group("Atomic") + @GroupThreads(KafkaIOUtilsBenchmark.READERS) + public double atomicRead(final AtomicAccumulatorState as) { + return as.accumulator.get(); + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 95f95000a58f..13c688408510 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; @@ -129,19 +130,78 @@ static Map getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Attempt to prevent false sharing by padding to at least 64 bytes. + * object header: 4, 8, 12 or 16 bytes + * alignment: at least 8 bytes + */ + private static class MovingAvgPadding { + byte p000, p001, p002, p003, p004, p005, p006, p007; + byte p010, p011, p012, p013, p014, p015, p016, p017; + byte p020, p021, p022, p023, p024, p025, p026, p027; + byte p030, p031, p032, p033, p034, p035, p036, p037; + byte p040, p041, p042, p043, p044, p045, p046, p047; + byte p050, p051, p052, p053, p054, p055, p056, p057; + byte p060, p061, p062, p063, p064, p065, p066, p067; + } + + // The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2 + // cache lines). + private static class MovingAvgFields extends MovingAvgPadding { private static final int MOVING_AVG_WINDOW = 1000; - private double avg = 0; + + private static final AtomicLongFieldUpdater AVG = + AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg"); + + private volatile long avg = 0; private long numUpdates = 0; - void update(double quantity) { - numUpdates++; - avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + protected double getAvg() { + return Double.longBitsToDouble(avg); + } + + protected void setAvg(final double value) { + AVG.lazySet(this, Double.doubleToRawLongBits(value)); + } + + protected long incrementAndGetNumUpdates() { + final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); + numUpdates = nextNumUpdates; + return nextNumUpdates; + } + } + + /* + * Maintains approximate average over last 1000 elements. + * Usage is only thread-safe for a single producer and multiple consumers. + * + * Attempt to prevent false sharing by padding to 64 bytes. + * avg: 8 bytes + * numUpdates: 8 bytes + * alignment: at least 8 bytes + * + * Visibility and ordering of non-volatile loads/stores on numUpdates is guaranteed by volatile loads/stores on avg. + * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. + */ + static class MovingAvg extends MovingAvgFields { + byte p100, p101, p102, p103, p104, p105, p106, p107; + byte p110, p111, p112, p113, p114, p115, p116, p117; + byte p120, p121, p122, p123, p124, p125, p126, p127; + byte p130, p131, p132, p133, p134, p135, p136, p137; + byte p140, p141, p142, p143, p144, p145, p146, p147; + byte p150, p151, p152, p153, p154, p155, p156, p157; + + void update(final double quantity) { + final double prevAvg = getAvg(); // volatile load (acquire) + + final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store + final double nextAvg = prevAvg + (quantity - prevAvg) / nextNumUpdates; // normal load/store + + setAvg(nextAvg); // ordered store (release) } double get() { - return avg; + return getAvg(); // volatile load (acquire) } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 0f4f44999133..39461fba0a8b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -187,13 +187,6 @@ public boolean advance() throws IOException { continue; } - long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. - - if (curRecord == null) { - LOG.info("{}: first record offset {}", name, offset); - offsetGap = 0; - } - // Apply user deserializers. User deserializers might throw, which will be propagated up // and 'curRecord' remains unchanged. The runner should close this reader. // TODO: write records that can't be deserialized to a "dead-letter" additional output. @@ -219,7 +212,7 @@ public boolean advance() throws IOException { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - pState.recordConsumed(offset, recordSize, offsetGap); + pState.recordConsumed(offset, recordSize); bytesRead.inc(recordSize); bytesReadBySplit.inc(recordSize); @@ -470,8 +463,6 @@ private static class PartitionState { private Iterator> recordIter = Collections.emptyIterator(); private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg(); - private KafkaIOUtils.MovingAvg avgOffsetGap = - new KafkaIOUtils.MovingAvg(); // > 0 only when log compaction is enabled. PartitionState( TopicPartition partition, long nextOffset, TimestampPolicy timestampPolicy) { @@ -487,13 +478,12 @@ public TopicPartition topicPartition() { return topicPartition; } - // Update consumedOffset, avgRecordSize, and avgOffsetGap - void recordConsumed(long offset, int size, long offsetGap) { + // Update consumedOffset and avgRecordSize + void recordConsumed(long offset, int size) { nextOffset = offset + 1; // This is always updated from single thread. Probably not worth making atomic. avgRecordSize.update(size); - avgOffsetGap.update(offsetGap); } synchronized void setLatestOffset(long latestOffset, Instant fetchTime) { @@ -521,7 +511,7 @@ synchronized long backlogMessageCount() { if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } - double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get()); + double remaining = latestOffset - nextOffset; return Math.max(0, (long) Math.ceil(remaining)); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index daf7e91b03bd..5e16447241e7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -30,8 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -218,7 +216,7 @@ private static final class SharedStateHolder { private static final Map> OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>(); - private static final Map> + private static final Map> AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>(); } @@ -248,8 +246,7 @@ private static final class SharedStateHolder { private transient @Nullable LoadingCache offsetEstimatorCache; - private transient @Nullable LoadingCache - avgRecordSizeCache; + private transient @Nullable LoadingCache avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @@ -360,24 +357,22 @@ public WatermarkEstimator newWatermarkEstimator( public double getSize( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange) throws ExecutionException { - // If present, estimates the record size to offset gap ratio. Compacted topics may hold less - // records than the estimated offset range due to record deletion within a partition. - final LoadingCache avgRecordSizeCache = + // If present, estimates the record size. + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); - final @Nullable AverageRecordSize avgRecordSize = + final @Nullable MovingAvg avgRecordSize = avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor); // The tracker estimates the offset range by subtracting the last claimed position from the // currently observed end offset for the partition belonging to this split. - double estimatedOffsetRange = + final double estimatedOffsetRange = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining(); - // Before processing elements, we don't have a good estimated size of records and offset gap. - // Return the estimated offset range without scaling by a size to gap ratio. - if (avgRecordSize == null) { - return estimatedOffsetRange; - } - // When processing elements, a moving average estimates the size of records and offset gap. - // Return the estimated offset range scaled by the estimated size to gap ratio. - return estimatedOffsetRange * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio(); + + // Before processing elements, we don't have a good estimated size of records. + // When processing elements, a moving average estimates the size of records. + // Return the estimated offset range scaled by the estimated size if present. + return avgRecordSize == null + ? estimatedOffsetRange + : estimatedOffsetRange * avgRecordSize.get(); } @NewTracker @@ -406,7 +401,7 @@ public ProcessContinuation processElement( WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -415,7 +410,7 @@ public ProcessContinuation processElement( final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); + // TODO: Metrics should be reported per split instead of partition, add bootstrap server hash? final Distribution rawSizes = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString()); @@ -453,6 +448,8 @@ public ProcessContinuation processElement( final Stopwatch sw = Stopwatch.createStarted(); while (true) { + // Fetch the record size accumulator. + final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor); rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); // When there are no records available for the current TopicPartition, self-checkpoint // and move to process the next element. @@ -515,9 +512,7 @@ public ProcessContinuation processElement( int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - avgRecordSizeCache - .getUnchecked(kafkaSourceDescriptor) - .update(recordSize, rawRecord.offset() - expectedOffset); + avgRecordSize.update(recordSize); rawSizes.update(recordSize); expectedOffset = rawRecord.offset() + 1; Instant outputTimestamp; @@ -556,7 +551,7 @@ public ProcessContinuation processElement( offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() - * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + * avgRecordSize.get())); KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); kafkaResults.updateBacklogBytes( kafkaSourceDescriptor.getTopic(), @@ -567,7 +562,7 @@ public ProcessContinuation processElement( offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() - * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + * avgRecordSize.get())); kafkaResults.flushBufferedMetrics(); } } @@ -628,7 +623,7 @@ public Coder restrictionCoder() { @Setup public void setup() throws Exception { - // Start to track record size and offset gap per bundle. + // Start to track record size. avgRecordSizeCache = SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent( fnId, @@ -636,11 +631,11 @@ public void setup() throws Exception { return CacheBuilder.newBuilder() .maximumSize(1000L) .build( - new CacheLoader() { + new CacheLoader() { @Override - public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { - return new AverageRecordSize(); + return new MovingAvg(); } }); }); @@ -687,7 +682,7 @@ public KafkaLatestOffsetEstimator load( @Teardown public void teardown() throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -726,45 +721,6 @@ private Map overrideBootstrapServersConfig( return config; } - // TODO: Collapse the two moving average trackers into a single accumulator using a single Guava - // AtomicDouble. Note that this requires that a single thread will call update and that while get - // may be called by multiple threads the method must only load the accumulator itself. - @ThreadSafe - private static class AverageRecordSize { - @GuardedBy("this") - private MovingAvg avgRecordSize; - - @GuardedBy("this") - private MovingAvg avgRecordGap; - - public AverageRecordSize() { - this.avgRecordSize = new MovingAvg(); - this.avgRecordGap = new MovingAvg(); - } - - public synchronized void update(int recordSize, long gap) { - avgRecordSize.update(recordSize); - avgRecordGap.update(gap); - } - - public double estimateRecordByteSizeToOffsetCountRatio() { - double avgRecordSize; - double avgRecordGap; - - synchronized (this) { - avgRecordSize = this.avgRecordSize.get(); - avgRecordGap = this.avgRecordGap.get(); - } - - // The offset increases between records in a batch fetched from a compacted topic may be - // greater than 1. Compacted topics only store records with the greatest offset per key per - // partition, the records in between are deleted and will not be observed by a consumer. - // The observed gap between offsets is used to estimate the number of records that are likely - // to be observed for the provided number of records. - return avgRecordSize / (1 + avgRecordGap); - } - } - private static Instant ensureTimestampWithinBounds(Instant timestamp) { if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/settings.gradle.kts b/settings.gradle.kts index 9bf86c9bf2f1..821d60bd453a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -241,6 +241,7 @@ include(":sdks:java:io:jdbc") include(":sdks:java:io:jms") include(":sdks:java:io:json") include(":sdks:java:io:kafka") +include(":sdks:java:io:kafka:jmh") include(":sdks:java:io:kafka:upgrade") include(":sdks:java:io:kudu") include(":sdks:java:io:mongodb") From 92d3ec0ef583e75002c7f89b8514ed8d18df16b9 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 13:08:08 +0000 Subject: [PATCH 02/10] Suppress UUF_UNUSED_FIELD warning --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 13c688408510..eaa651bcfa96 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -135,6 +136,7 @@ static Map getOffsetConsumerConfig( * object header: 4, 8, 12 or 16 bytes * alignment: at least 8 bytes */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") private static class MovingAvgPadding { byte p000, p001, p002, p003, p004, p005, p006, p007; byte p010, p011, p012, p013, p014, p015, p016, p017; @@ -183,6 +185,7 @@ protected long incrementAndGetNumUpdates() { * Visibility and ordering of non-volatile loads/stores on numUpdates is guaranteed by volatile loads/stores on avg. * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") static class MovingAvg extends MovingAvgFields { byte p100, p101, p102, p103, p104, p105, p106, p107; byte p110, p111, p112, p113, p114, p115, p116, p117; From 066dbeafa7d0974e344c3613cb7c071cc96ac1b6 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 13:23:51 +0000 Subject: [PATCH 03/10] Add package-info.java --- .../beam/sdk/io/kafka/package-info.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java new file mode 100644 index 000000000000..74e682c19117 --- /dev/null +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing from Apache Kafka. */ +package org.apache.beam.sdk.io.kafka.jmh; From 168a1ab22d4dcd0722522a1cdb0ce8bf402749f3 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 13:25:48 +0000 Subject: [PATCH 04/10] Fix sloppy copy/paste --- .../main/java/org/apache/beam/sdk/io/kafka/package-info.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java index 74e682c19117..bfdefa2be4ec 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java @@ -16,5 +16,5 @@ * limitations under the License. */ -/** Transforms for reading and writing from Apache Kafka. */ +/** Benchmarks for KafkaIO. */ package org.apache.beam.sdk.io.kafka.jmh; From 519259e901d1f4e55c82e714ef763692cfe4cb9d Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 14:02:34 +0000 Subject: [PATCH 05/10] Move JMH content to jmh package --- .../beam/sdk/io/kafka/{ => jmh}/KafkaIOUtilsBenchmark.java | 2 +- .../org/apache/beam/sdk/io/kafka/{ => jmh}/package-info.java | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/{ => jmh}/KafkaIOUtilsBenchmark.java (98%) rename sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/{ => jmh}/package-info.java (100%) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java similarity index 98% rename from sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java rename to sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java index cd216b5460f0..55e499c77a59 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kafka; +package org.apache.beam.sdk.io.kafka.jmh; import java.util.Random; import java.util.concurrent.TimeUnit; diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java similarity index 100% rename from sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java rename to sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java From a3069b63a4ad9a2325ba911a80dbc5d148ab9315 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 14:24:11 +0000 Subject: [PATCH 06/10] Change MovingAvg's class and method visibility --- .../apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java | 1 + .../java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java index 55e499c77a59..26488a617762 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java @@ -19,6 +19,7 @@ import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Group; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index eaa651bcfa96..ca280766de14 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -186,7 +186,7 @@ protected long incrementAndGetNumUpdates() { * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. */ @SuppressFBWarnings("UUF_UNUSED_FIELD") - static class MovingAvg extends MovingAvgFields { + public static final class MovingAvg extends MovingAvgFields { byte p100, p101, p102, p103, p104, p105, p106, p107; byte p110, p111, p112, p113, p114, p115, p116, p117; byte p120, p121, p122, p123, p124, p125, p126, p127; @@ -194,7 +194,7 @@ static class MovingAvg extends MovingAvgFields { byte p140, p141, p142, p143, p144, p145, p146, p147; byte p150, p151, p152, p153, p154, p155, p156, p157; - void update(final double quantity) { + public void update(final double quantity) { final double prevAvg = getAvg(); // volatile load (acquire) final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store @@ -203,7 +203,7 @@ void update(final double quantity) { setAvg(nextAvg); // ordered store (release) } - double get() { + public double get() { return getAvg(); // volatile load (acquire) } } From acc07a892ebfd528780069cb3ae92eba89b579ea Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 6 Mar 2025 22:32:33 +0000 Subject: [PATCH 07/10] Remove comment --- .../java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 39461fba0a8b..074bba54ac21 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -481,8 +481,6 @@ public TopicPartition topicPartition() { // Update consumedOffset and avgRecordSize void recordConsumed(long offset, int size) { nextOffset = offset + 1; - - // This is always updated from single thread. Probably not worth making atomic. avgRecordSize.update(size); } From 713d496bd8b2a73cec0044d7c1f679b0e5653085 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 11 Mar 2025 08:37:39 +0000 Subject: [PATCH 08/10] Add synchronized plain and volatile benchmarks and improvements to experiment setup --- .../io/kafka/jmh/KafkaIOUtilsBenchmark.java | 168 ++++++++++++++---- 1 file changed, 138 insertions(+), 30 deletions(-) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java index 26488a617762..8523e2094895 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java @@ -23,27 +23,21 @@ import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Group; -import org.openjdk.jmh.annotations.GroupThreads; import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.infra.IterationParams; import org.openjdk.jmh.infra.ThreadParams; @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) -@Warmup(batchSize = KafkaIOUtilsBenchmark.SIZE) -@Measurement(batchSize = KafkaIOUtilsBenchmark.SIZE) +@Threads(Threads.MAX) public class KafkaIOUtilsBenchmark { - static final int READERS = 2; - static final int WRITERS = 1; - static final int SIZE = 1024; + private static final int SIZE = 1024; @State(Scope.Thread) public static class ProducerState { @@ -52,16 +46,14 @@ public static class ProducerState { @Setup(Level.Iteration) public void setup(final IterationParams ip, final ThreadParams tp) { - values = - new Random(299792458 + ip.getCount() + tp.getThreadIndex()) - .ints(KafkaIOUtilsBenchmark.SIZE, 0, 100) - .toArray(); + values = new Random(299792458 + ip.getCount()).ints(SIZE, 0, 100).toArray(); idx = 0; } - @TearDown(Level.Invocation) - public void tearDown(final IterationParams ip, final ThreadParams tp) { - idx = (idx + 1) % KafkaIOUtilsBenchmark.SIZE; + int next() { + final int value = values[idx]; + idx = (idx + 1) % SIZE; + return value; } } @@ -84,44 +76,160 @@ void update(double quantity) { } } - private MovingAvg accumulator; + MovingAvg accumulator; - @Setup(Level.Iteration) - public void setup(final IterationParams ip, final ThreadParams tp) { + @Setup(Level.Trial) + public void setup() { accumulator = new MovingAvg(); } } @State(Scope.Group) public static class AtomicAccumulatorState { - private final KafkaIOUtils.MovingAvg accumulator = new KafkaIOUtils.MovingAvg(); + KafkaIOUtils.MovingAvg accumulator; + + @Setup(Level.Trial) + public void setup() { + accumulator = new KafkaIOUtils.MovingAvg(); + } + } + + @State(Scope.Group) + public static class VolatileAccumulatorState { + // Atomic accumulator using only volatile reads and writes. + static class MovingAvg { + private static final int MOVING_AVG_WINDOW = 1000; + + private volatile double avg = 0; + private long numUpdates = 0; + + void update(final double quantity) { + final double prevAvg = avg; + numUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); + avg = prevAvg + (quantity - prevAvg) / numUpdates; + } + + double get() { + return avg; + } + } + + MovingAvg accumulator; + + @Setup(Level.Trial) + public void setup() { + accumulator = new MovingAvg(); + } } @Benchmark - @Group("Plain") - @GroupThreads(KafkaIOUtilsBenchmark.WRITERS) + @Group("WritePlain") public void plainWrite(final PlainAccumulatorState as, final ProducerState ps) { - as.accumulator.update(ps.values[ps.idx]); + as.accumulator.update(ps.next()); } @Benchmark - @Group("Plain") - @GroupThreads(KafkaIOUtilsBenchmark.READERS) + @Group("ReadPlain") public double plainRead(final PlainAccumulatorState as) { return as.accumulator.get(); } @Benchmark - @Group("Atomic") - @GroupThreads(KafkaIOUtilsBenchmark.WRITERS) + @Group("ReadAndWritePlain") + public void plainWriteWhileReading(final PlainAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAndWritePlain") + public double plainReadWhileWriting(final PlainAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("WriteSynchronizedPlain") + public void synchronizedPlainWrite(final PlainAccumulatorState as, final ProducerState ps) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + final int value = ps.next(); + synchronized (accumulator) { + accumulator.update(value); + } + } + + @Benchmark + @Group("ReadSynchronizedPlain") + public double synchronizedPlainRead(final PlainAccumulatorState as) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + synchronized (accumulator) { + return accumulator.get(); + } + } + + @Benchmark + @Group("ReadAndWriteSynchronizedPlain") + public void synchronizedPlainWriteWhileReading( + final PlainAccumulatorState as, final ProducerState ps) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + final int value = ps.next(); + synchronized (accumulator) { + accumulator.update(value); + } + } + + @Benchmark + @Group("ReadAndWriteSynchronizedPlain") + public double synchronizedPlainReadWhileWriting(final PlainAccumulatorState as) { + final PlainAccumulatorState.MovingAvg accumulator = as.accumulator; + synchronized (accumulator) { + return accumulator.get(); + } + } + + @Benchmark + @Group("WriteAtomic") public void atomicWrite(final AtomicAccumulatorState as, final ProducerState ps) { - as.accumulator.update(ps.values[ps.idx]); + as.accumulator.update(ps.next()); } @Benchmark - @Group("Atomic") - @GroupThreads(KafkaIOUtilsBenchmark.READERS) + @Group("ReadAtomic") public double atomicRead(final AtomicAccumulatorState as) { return as.accumulator.get(); } + + @Benchmark + @Group("ReadAndWriteAtomic") + public void atomicWriteWhileReading(final AtomicAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAndWriteAtomic") + public double atomicReadWhileWriting(final AtomicAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("WriteVolatile") + public void volatileWrite(final VolatileAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadVolatile") + public double volatileRead(final VolatileAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("ReadAndWriteVolatile") + public void volatileWriteWhileReading(final VolatileAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.next()); + } + + @Benchmark + @Group("ReadAndWriteVolatile") + public double volatileReadWhileWriting(final VolatileAccumulatorState as) { + return as.accumulator.get(); + } } From c5de3b2cee4595b3855b4f971e9bb507de2935c4 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 11 Mar 2025 08:39:36 +0000 Subject: [PATCH 09/10] Remove layout padding --- .../beam/sdk/io/kafka/KafkaIOUtils.java | 58 ++++--------------- 1 file changed, 10 insertions(+), 48 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index ca280766de14..8b778ce5481e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -132,67 +131,30 @@ static Map getOffsetConsumerConfig( } /* - * Attempt to prevent false sharing by padding to at least 64 bytes. - * object header: 4, 8, 12 or 16 bytes - * alignment: at least 8 bytes + * Maintains approximate average over last 1000 elements. + * Usage is only thread-safe for a single producer and multiple consumers. */ - @SuppressFBWarnings("UUF_UNUSED_FIELD") - private static class MovingAvgPadding { - byte p000, p001, p002, p003, p004, p005, p006, p007; - byte p010, p011, p012, p013, p014, p015, p016, p017; - byte p020, p021, p022, p023, p024, p025, p026, p027; - byte p030, p031, p032, p033, p034, p035, p036, p037; - byte p040, p041, p042, p043, p044, p045, p046, p047; - byte p050, p051, p052, p053, p054, p055, p056, p057; - byte p060, p061, p062, p063, p064, p065, p066, p067; - } - - // The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2 - // cache lines). - private static class MovingAvgFields extends MovingAvgPadding { + public static final class MovingAvg { + private static final AtomicLongFieldUpdater AVG = + AtomicLongFieldUpdater.newUpdater(MovingAvg.class, "avg"); private static final int MOVING_AVG_WINDOW = 1000; - private static final AtomicLongFieldUpdater AVG = - AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg"); - - private volatile long avg = 0; - private long numUpdates = 0; + private volatile long avg; + private long numUpdates; - protected double getAvg() { + private double getAvg() { return Double.longBitsToDouble(avg); } - protected void setAvg(final double value) { + private void setAvg(final double value) { AVG.lazySet(this, Double.doubleToRawLongBits(value)); } - protected long incrementAndGetNumUpdates() { + private long incrementAndGetNumUpdates() { final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); numUpdates = nextNumUpdates; return nextNumUpdates; } - } - - /* - * Maintains approximate average over last 1000 elements. - * Usage is only thread-safe for a single producer and multiple consumers. - * - * Attempt to prevent false sharing by padding to 64 bytes. - * avg: 8 bytes - * numUpdates: 8 bytes - * alignment: at least 8 bytes - * - * Visibility and ordering of non-volatile loads/stores on numUpdates is guaranteed by volatile loads/stores on avg. - * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. - */ - @SuppressFBWarnings("UUF_UNUSED_FIELD") - public static final class MovingAvg extends MovingAvgFields { - byte p100, p101, p102, p103, p104, p105, p106, p107; - byte p110, p111, p112, p113, p114, p115, p116, p117; - byte p120, p121, p122, p123, p124, p125, p126, p127; - byte p130, p131, p132, p133, p134, p135, p136, p137; - byte p140, p141, p142, p143, p144, p145, p146, p147; - byte p150, p151, p152, p153, p154, p155, p156, p157; public void update(final double quantity) { final double prevAvg = getAvg(); // volatile load (acquire) From e90f8fc5759bd425f19c38dad39f3041738a8f55 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 14 Mar 2025 12:50:17 +0000 Subject: [PATCH 10/10] Rewrite build file using Kotlin DSL --- .../jmh/{build.gradle => build.gradle.kts} | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) rename sdks/java/io/kafka/jmh/{build.gradle => build.gradle.kts} (69%) diff --git a/sdks/java/io/kafka/jmh/build.gradle b/sdks/java/io/kafka/jmh/build.gradle.kts similarity index 69% rename from sdks/java/io/kafka/jmh/build.gradle rename to sdks/java/io/kafka/jmh/build.gradle.kts index 5f3d0d38f092..2f11dbff7ee0 100644 --- a/sdks/java/io/kafka/jmh/build.gradle +++ b/sdks/java/io/kafka/jmh/build.gradle.kts @@ -16,16 +16,20 @@ * limitations under the License. */ -plugins { id 'org.apache.beam.module' } +plugins { + groovy + id("org.apache.beam.module") +} -applyJavaNature( - automaticModuleName: 'org.apache.beam.sdk.io.kafka.jmh', - enableJmh: true, - publish: false) +val applyJavaNature: groovy.lang.Closure by extra +applyJavaNature(mapOf( + "automaticModuleName" to "org.apache.beam.sdk.io.kafka.jmh", + "enableJmh" to true, + "publish" to false)) description = "Apache Beam :: SDKs :: Java :: IO :: Kafka :: JMH" -ext.summary = "This contains JMH benchmarks for the Kafka IO connector for Beam Java" +val summary by extra("This contains JMH benchmarks for the Kafka IO connector for Beam Java") dependencies { - implementation project(path: ":sdks:java:io:kafka") + implementation(project(":sdks:java:io:kafka")) }