Skip to content
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:io:contextualtextio:build")
dependsOn(":sdks:java:io:expansion-service:build")
dependsOn(":sdks:java:io:file-based-io-tests:build")
dependsOn(":sdks:java:io:kafka:jmh:build")
dependsOn(":sdks:java:io:sparkreceiver:3:build")
dependsOn(":sdks:java:io:synthetic:build")
dependsOn(":sdks:java:io:xml:build")
Expand Down
35 changes: 35 additions & 0 deletions sdks/java/io/kafka/jmh/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
groovy
id("org.apache.beam.module")
}

val applyJavaNature: groovy.lang.Closure<Any?> by extra
applyJavaNature(mapOf(
"automaticModuleName" to "org.apache.beam.sdk.io.kafka.jmh",
"enableJmh" to true,
"publish" to false))

description = "Apache Beam :: SDKs :: Java :: IO :: Kafka :: JMH"
val summary by extra("This contains JMH benchmarks for the Kafka IO connector for Beam Java")

dependencies {
implementation(project(":sdks:java:io:kafka"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka.jmh;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.infra.IterationParams;
import org.openjdk.jmh.infra.ThreadParams;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(Threads.MAX)
public class KafkaIOUtilsBenchmark {
private static final int SIZE = 1024;

@State(Scope.Thread)
public static class ProducerState {
private int[] values;
private int idx;

@Setup(Level.Iteration)
public void setup(final IterationParams ip, final ThreadParams tp) {
values = new Random(299792458 + ip.getCount()).ints(SIZE, 0, 100).toArray();
idx = 0;
}

int next() {
final int value = values[idx];
idx = (idx + 1) % SIZE;
return value;
}
}

@State(Scope.Group)
public static class PlainAccumulatorState {
// As implemented before 2.64.0.
// Note that numUpdates may overflow and count back from Long.MIN_VALUE.
static class MovingAvg {
private static final int MOVING_AVG_WINDOW = 1000;
private double avg = 0;
private long numUpdates = 0;

void update(double quantity) {
numUpdates++;
avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
}

double get() {
return avg;
}
}

MovingAvg accumulator;

@Setup(Level.Trial)
public void setup() {
accumulator = new MovingAvg();
}
}

@State(Scope.Group)
public static class AtomicAccumulatorState {
KafkaIOUtils.MovingAvg accumulator;

@Setup(Level.Trial)
public void setup() {
accumulator = new KafkaIOUtils.MovingAvg();
}
}

@State(Scope.Group)
public static class VolatileAccumulatorState {
// Atomic accumulator using only volatile reads and writes.
static class MovingAvg {
private static final int MOVING_AVG_WINDOW = 1000;

private volatile double avg = 0;
private long numUpdates = 0;

void update(final double quantity) {
final double prevAvg = avg;
numUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1);
avg = prevAvg + (quantity - prevAvg) / numUpdates;
}

double get() {
return avg;
}
}

MovingAvg accumulator;

@Setup(Level.Trial)
public void setup() {
accumulator = new MovingAvg();
}
}

@Benchmark
@Group("WritePlain")
public void plainWrite(final PlainAccumulatorState as, final ProducerState ps) {
as.accumulator.update(ps.next());
}

@Benchmark
@Group("ReadPlain")
public double plainRead(final PlainAccumulatorState as) {
return as.accumulator.get();
}

@Benchmark
@Group("ReadAndWritePlain")
public void plainWriteWhileReading(final PlainAccumulatorState as, final ProducerState ps) {
as.accumulator.update(ps.next());
}

@Benchmark
@Group("ReadAndWritePlain")
public double plainReadWhileWriting(final PlainAccumulatorState as) {
return as.accumulator.get();
}

@Benchmark
@Group("WriteSynchronizedPlain")
public void synchronizedPlainWrite(final PlainAccumulatorState as, final ProducerState ps) {
final PlainAccumulatorState.MovingAvg accumulator = as.accumulator;
final int value = ps.next();
synchronized (accumulator) {
accumulator.update(value);
}
}

@Benchmark
@Group("ReadSynchronizedPlain")
public double synchronizedPlainRead(final PlainAccumulatorState as) {
final PlainAccumulatorState.MovingAvg accumulator = as.accumulator;
synchronized (accumulator) {
return accumulator.get();
}
}

@Benchmark
@Group("ReadAndWriteSynchronizedPlain")
public void synchronizedPlainWriteWhileReading(
final PlainAccumulatorState as, final ProducerState ps) {
final PlainAccumulatorState.MovingAvg accumulator = as.accumulator;
final int value = ps.next();
synchronized (accumulator) {
accumulator.update(value);
}
}

@Benchmark
@Group("ReadAndWriteSynchronizedPlain")
public double synchronizedPlainReadWhileWriting(final PlainAccumulatorState as) {
final PlainAccumulatorState.MovingAvg accumulator = as.accumulator;
synchronized (accumulator) {
return accumulator.get();
}
}

@Benchmark
@Group("WriteAtomic")
public void atomicWrite(final AtomicAccumulatorState as, final ProducerState ps) {
as.accumulator.update(ps.next());
}

@Benchmark
@Group("ReadAtomic")
public double atomicRead(final AtomicAccumulatorState as) {
return as.accumulator.get();
}

@Benchmark
@Group("ReadAndWriteAtomic")
public void atomicWriteWhileReading(final AtomicAccumulatorState as, final ProducerState ps) {
as.accumulator.update(ps.next());
}

@Benchmark
@Group("ReadAndWriteAtomic")
public double atomicReadWhileWriting(final AtomicAccumulatorState as) {
return as.accumulator.get();
}

@Benchmark
@Group("WriteVolatile")
public void volatileWrite(final VolatileAccumulatorState as, final ProducerState ps) {
as.accumulator.update(ps.next());
}

@Benchmark
@Group("ReadVolatile")
public double volatileRead(final VolatileAccumulatorState as) {
return as.accumulator.get();
}

@Benchmark
@Group("ReadAndWriteVolatile")
public void volatileWriteWhileReading(final VolatileAccumulatorState as, final ProducerState ps) {
as.accumulator.update(ps.next());
}

@Benchmark
@Group("ReadAndWriteVolatile")
public double volatileReadWhileWriting(final VolatileAccumulatorState as) {
return as.accumulator.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Benchmarks for KafkaIO. */
package org.apache.beam.sdk.io.kafka.jmh;
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
Expand Down Expand Up @@ -129,19 +130,43 @@ static Map<String, Object> getOffsetConsumerConfig(
return offsetConsumerConfig;
}

// Maintains approximate average over last 1000 elements
static class MovingAvg {
/*
* Maintains approximate average over last 1000 elements.
* Usage is only thread-safe for a single producer and multiple consumers.
*/
public static final class MovingAvg {
private static final AtomicLongFieldUpdater<MovingAvg> AVG =
AtomicLongFieldUpdater.newUpdater(MovingAvg.class, "avg");
private static final int MOVING_AVG_WINDOW = 1000;
private double avg = 0;
private long numUpdates = 0;

void update(double quantity) {
numUpdates++;
avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
private volatile long avg;
private long numUpdates;

private double getAvg() {
return Double.longBitsToDouble(avg);
}

private void setAvg(final double value) {
AVG.lazySet(this, Double.doubleToRawLongBits(value));
}

private long incrementAndGetNumUpdates() {
final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1);
numUpdates = nextNumUpdates;
return nextNumUpdates;
}

public void update(final double quantity) {
final double prevAvg = getAvg(); // volatile load (acquire)

final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store
final double nextAvg = prevAvg + (quantity - prevAvg) / nextNumUpdates; // normal load/store

setAvg(nextAvg); // ordered store (release)
}

double get() {
return avg;
public double get() {
return getAvg(); // volatile load (acquire)
}
}

Expand Down
Loading
Loading