From 5fbb44acde27045be49cddba30bd098c0b359759 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 18 Apr 2023 10:26:27 -0400 Subject: [PATCH 01/14] Modify batch IT to use count instead of hash --- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 43 +++++++------------ 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 89b7b42e1b43..881e42b900dd 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOITHelper; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; @@ -67,7 +66,6 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader; import org.apache.beam.sdk.testutils.metrics.TimeMonitor; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -238,27 +236,19 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { @Test public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { // Map of hashes of set size collections with 100b records - 10b key, 90b values. - Map expectedHashes = - ImmutableMap.of( - 1000L, "4507649971ee7c51abbb446e65a5c660", - 100_000_000L, "0f12c27c9a7672e14775594be66cad9a"); - expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, expectedHashes); writePipeline .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))) .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME))) .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic())); - PCollection hashcode = - readPipeline - .apply( - "Read from bounded Kafka", - readFromBoundedKafka().withTopic(options.getKafkaTopic())) - .apply( - "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) - .apply("Calculate hashcode", Combine.globally(new HashingFn()).withoutDefaults()); - - PAssert.thatSingleton(hashcode).isEqualTo(expectedHashcode); + readPipeline + .apply( + "Read from bounded Kafka", + readFromBoundedKafka().withTopic(options.getKafkaTopic())) + .apply( + "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) + .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); @@ -274,6 +264,14 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { // Fail the test if pipeline failed. assertEquals(PipelineResult.State.DONE, readState); + long actualRecords = readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME); + + assertTrue( + String.format( + "actual number of records %d smaller than expected: %d.", + actualRecords, sourceOptions.numRecords), + sourceOptions.numRecords <= actualRecords); + if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings); @@ -897,15 +895,6 @@ public String apply(KafkaRecord input) { } } - public static String getHashForRecordCount(long recordCount, Map hashes) { - String hash = hashes.get(recordCount); - if (hash == null) { - throw new UnsupportedOperationException( - String.format("No hash for that record count: %s", recordCount)); - } - return hash; - } - private static void setupKafkaContainer() { kafkaContainer = new KafkaContainer( From bad87d7bfab60d2d52f2d79ba41b1e3527170ee8 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 18 Apr 2023 11:21:21 -0400 Subject: [PATCH 02/14] remove unused varaiable --- .../src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 881e42b900dd..083ca432ec41 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -137,8 +137,6 @@ public class KafkaIOIT { private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class); - private static String expectedHashcode; - private static SyntheticSourceOptions sourceOptions; private static Options options; From 8285795f45e24635631e45cfab97af54f4fd64b6 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 18 Apr 2023 16:38:13 -0400 Subject: [PATCH 03/14] run spotless, update pipeline state checking --- .../java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 083ca432ec41..74eaa812d27c 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -240,11 +240,8 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic())); readPipeline - .apply( - "Read from bounded Kafka", - readFromBoundedKafka().withTopic(options.getKafkaTopic())) - .apply( - "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Read from bounded Kafka", readFromBoundedKafka().withTopic(options.getKafkaTopic())) + .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); @@ -259,9 +256,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - // Fail the test if pipeline failed. - assertEquals(PipelineResult.State.DONE, readState); - long actualRecords = readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME); assertTrue( @@ -270,6 +264,8 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { actualRecords, sourceOptions.numRecords), sourceOptions.numRecords <= actualRecords); + assertNotEquals(PipelineResult.State.FAILED, readState); + if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings); From dd29c435f2a8d6eb5356f64092d416ea7850026c Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 19 Apr 2023 13:10:19 -0400 Subject: [PATCH 04/14] Update timeout to 45 minutes --- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 2 +- .../src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index d513dd96a7e2..f19fc0f51ce6 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -87,7 +87,7 @@ job(jobName) { kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," + "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services kafkaTopic : 'beam-batch', - readTimeout : '1800', + readTimeout : '2700', numWorkers : '5', autoscalingAlgorithm : 'NONE' ] diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 74eaa812d27c..0c3c0d7e6011 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -233,7 +233,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { @Test public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { - // Map of hashes of set size collections with 100b records - 10b key, 90b values. writePipeline .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))) .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME))) @@ -264,7 +263,7 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { actualRecords, sourceOptions.numRecords), sourceOptions.numRecords <= actualRecords); - assertNotEquals(PipelineResult.State.FAILED, readState); + assertEquals(PipelineResult.State.DONE, readState); if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); From 9e877b05c80c13a72f3954696b2117302303993b Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 20 Apr 2023 14:09:35 -0400 Subject: [PATCH 05/14] revert timeout, add additional counter to try and pinpoint missing records --- .../job_PerformanceTests_KafkaIO_IT.groovy | 2 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 21 +++++++++++++++++-- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 17 +-------------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index f19fc0f51ce6..d513dd96a7e2 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -87,7 +87,7 @@ job(jobName) { kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," + "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services kafkaTopic : 'beam-batch', - readTimeout : '2700', + readTimeout : '1800', numWorkers : '5', autoscalingAlgorithm : 'NONE' ] diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 9bfd4723f6c8..ca477822820f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -60,6 +60,8 @@ import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation; import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityException; import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityResult; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -1354,9 +1356,24 @@ public PCollection> expand(PBegin input) { || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY) || (compatibility.supports(KafkaIOReadImplementation.LEGACY) && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) { - return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder)); + return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder)).apply(ParDo.of(new RecordKafkaRecords<>())); + } + return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder)).apply(ParDo.of(new RecordKafkaRecords<>())); + } + + private class RecordKafkaRecords extends DoFn { + + private final Counter elementCounter; + + RecordKafkaRecords(){ + this.elementCounter = Metrics.counter(KafkaIO.class.getName(), "elements_read"); + } + + @ProcessElement + public void processElement(ProcessContext c){ + elementCounter.inc(); + c.output(c.element()); } - return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder)); } private void warnAboutUnsafeConfigurations(PBegin input) { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 0c3c0d7e6011..2e463b0d978e 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -25,7 +25,6 @@ import com.google.cloud.Timestamp; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -74,7 +73,6 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -201,7 +199,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { readPipeline .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); @@ -241,7 +238,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { readPipeline .apply("Read from bounded Kafka", readFromBoundedKafka().withTopic(options.getKafkaTopic())) .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); @@ -679,7 +675,6 @@ private PipelineResult runWithStopReadingFn( .withTopic(options.getKafkaTopic() + "-" + topicSuffix) .withCheckStopReadingFn(function)) .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); @@ -825,7 +820,7 @@ private KafkaIO.Read readFromKafka() { .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")); } - private static class CountingFn extends DoFn { + private static class CountingFn extends DoFn, Void> { private final Counter elementCounter; @@ -878,16 +873,6 @@ public interface Options extends IOTestPipelineOptions, StreamingOptions { void setKafkaContainerVersion(String kafkaContainerVersion); } - private static class MapKafkaRecordsToStrings - extends SimpleFunction, String> { - @Override - public String apply(KafkaRecord input) { - String key = Arrays.toString(input.getKV().getKey()); - String value = Arrays.toString(input.getKV().getValue()); - return String.format("%s %s", key, value); - } - } - private static void setupKafkaContainer() { kafkaContainer = new KafkaContainer( From cb9ead19de5f929d741d025cc1ff2bba05afe9f2 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 20 Apr 2023 15:44:57 -0400 Subject: [PATCH 06/14] add a log to notify ranges used when workers restart --- .../java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index edb0a7972bb9..6e9a292f9aed 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -335,6 +335,8 @@ public ProcessContinuation processElement( Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); + LOG.error("RFKDF PE: KSD=" + kafkaSourceDescriptor.getTopicPartition().topic() + "-" +kafkaSourceDescriptor.getTopicPartition().partition() + " offset=[" +tracker.currentRestriction().getFrom() + "," + tracker.currentRestriction().getTo() + "]"); + // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null && checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) { From 29f1b8391e1af76bbb72232bdee94169f75d28b6 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 21 Apr 2023 10:49:30 -0400 Subject: [PATCH 07/14] change counts from metrics to combiners --- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 43 ++++--------------- 1 file changed, 8 insertions(+), 35 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 2e463b0d978e..e7c8a8a7f45f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import com.google.cloud.Timestamp; import java.io.IOException; @@ -46,8 +45,6 @@ import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; @@ -65,6 +62,7 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader; import org.apache.beam.sdk.testutils.metrics.TimeMonitor; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -196,10 +194,10 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { // Use streaming pipeline to read Kafka records. readPipeline.getOptions().as(Options.class).setStreaming(true); - readPipeline + PCollection count = readPipeline .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); + .apply("Counting element", Count.globally()); PipelineResult writeResult = writePipeline.run(); PipelineResult.State writeState = writeResult.waitUntilFinish(); @@ -214,12 +212,7 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - long actualRecords = readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME); - assertTrue( - String.format( - "actual number of records %d smaller than expected: %d.", - actualRecords, sourceOptions.numRecords), - sourceOptions.numRecords <= actualRecords); + PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); @@ -235,10 +228,10 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME))) .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic())); - readPipeline + PCollection count =readPipeline .apply("Read from bounded Kafka", readFromBoundedKafka().withTopic(options.getKafkaTopic())) .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); + .apply("Counting element", Count.globally()); PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); @@ -251,13 +244,7 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - long actualRecords = readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME); - - assertTrue( - String.format( - "actual number of records %d smaller than expected: %d.", - actualRecords, sourceOptions.numRecords), - sourceOptions.numRecords <= actualRecords); + PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); assertEquals(PipelineResult.State.DONE, readState); @@ -674,8 +661,7 @@ private PipelineResult runWithStopReadingFn( readFromKafka() .withTopic(options.getKafkaTopic() + "-" + topicSuffix) .withCheckStopReadingFn(function)) - .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); + .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); @@ -820,19 +806,6 @@ private KafkaIO.Read readFromKafka() { .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")); } - private static class CountingFn extends DoFn, Void> { - - private final Counter elementCounter; - - CountingFn(String namespace, String name) { - elementCounter = Metrics.counter(namespace, name); - } - - @ProcessElement - public void processElement() { - elementCounter.inc(1L); - } - } /** Pipeline options specific for this test. */ public interface Options extends IOTestPipelineOptions, StreamingOptions { From 1c18382c27f141e057ba8a3c9fed7fd29d8fc337 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 21 Apr 2023 13:13:30 -0400 Subject: [PATCH 08/14] add a window to streaming test --- .../test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index e7c8a8a7f45f..fbbaaf90444d 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -62,6 +62,7 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader; import org.apache.beam.sdk.testutils.metrics.TimeMonitor; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -72,6 +73,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.windowing.CalendarWindows; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -197,7 +199,8 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { PCollection count = readPipeline .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Counting element", Count.globally()); + .apply("Window", Window.into(CalendarWindows.years(1))) + .apply("Counting element", Combine.globally(Count.>combineFn()).withoutDefaults()); PipelineResult writeResult = writePipeline.run(); PipelineResult.State writeState = writeResult.waitUntilFinish(); From e72ff08427048861250fe8929ddfc4734bd0b914 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 21 Apr 2023 14:15:59 -0400 Subject: [PATCH 09/14] move the passert to the correct place --- .../test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index fbbaaf90444d..85b017c699f2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -207,6 +207,8 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { // Fail the test if pipeline failed. assertNotEquals(PipelineResult.State.FAILED, writeState); + PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); + PipelineResult readResult = readPipeline.run(); PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); @@ -215,8 +217,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); - if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings); @@ -239,6 +239,8 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); + PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); + PipelineResult readResult = readPipeline.run(); PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); @@ -247,8 +249,6 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); - assertEquals(PipelineResult.State.DONE, readState); if (!options.isWithTestcontainers()) { From 6580c5f9e650d682e0a9f0fbb545357760be11b6 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 25 Apr 2023 12:40:43 -0400 Subject: [PATCH 10/14] Remove extra counter, apply spotless --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 21 ++------------- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 11 +++++++- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 26 ++++++++++++------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ca477822820f..9bfd4723f6c8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -60,8 +60,6 @@ import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation; import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityException; import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityResult; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -1356,24 +1354,9 @@ public PCollection> expand(PBegin input) { || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY) || (compatibility.supports(KafkaIOReadImplementation.LEGACY) && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) { - return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder)).apply(ParDo.of(new RecordKafkaRecords<>())); - } - return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder)).apply(ParDo.of(new RecordKafkaRecords<>())); - } - - private class RecordKafkaRecords extends DoFn { - - private final Counter elementCounter; - - RecordKafkaRecords(){ - this.elementCounter = Metrics.counter(KafkaIO.class.getName(), "elements_read"); - } - - @ProcessElement - public void processElement(ProcessContext c){ - elementCounter.inc(); - c.output(c.element()); + return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder)); } + return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder)); } private void warnAboutUnsafeConfigurations(PBegin input) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 6e9a292f9aed..0d4338882f4b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -335,7 +335,16 @@ public ProcessContinuation processElement( Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); - LOG.error("RFKDF PE: KSD=" + kafkaSourceDescriptor.getTopicPartition().topic() + "-" +kafkaSourceDescriptor.getTopicPartition().partition() + " offset=[" +tracker.currentRestriction().getFrom() + "," + tracker.currentRestriction().getTo() + "]"); + LOG.error( + "RFKDF PE: KSD=" + + kafkaSourceDescriptor.getTopicPartition().topic() + + "-" + + kafkaSourceDescriptor.getTopicPartition().partition() + + " offset=[" + + tracker.currentRestriction().getFrom() + + "," + + tracker.currentRestriction().getTo() + + "]"); // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 85b017c699f2..8512536037c9 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -196,11 +196,15 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { // Use streaming pipeline to read Kafka records. readPipeline.getOptions().as(Options.class).setStreaming(true); - PCollection count = readPipeline - .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) - .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Window", Window.into(CalendarWindows.years(1))) - .apply("Counting element", Combine.globally(Count.>combineFn()).withoutDefaults()); + PCollection count = + readPipeline + .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) + .apply( + "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Window", Window.into(CalendarWindows.years(1))) + .apply( + "Counting element", + Combine.globally(Count.>combineFn()).withoutDefaults()); PipelineResult writeResult = writePipeline.run(); PipelineResult.State writeState = writeResult.waitUntilFinish(); @@ -231,10 +235,14 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME))) .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic())); - PCollection count =readPipeline - .apply("Read from bounded Kafka", readFromBoundedKafka().withTopic(options.getKafkaTopic())) - .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Counting element", Count.globally()); + PCollection count = + readPipeline + .apply( + "Read from bounded Kafka", + readFromBoundedKafka().withTopic(options.getKafkaTopic())) + .apply( + "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Counting element", Count.globally()); PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); From b61777054a0f71507af9290269d5677e32f3e9eb Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 25 Apr 2023 13:48:54 -0400 Subject: [PATCH 11/14] add additional metric to KafkaWriter --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java index c0c9772959f9..2a313e94b244 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java @@ -23,6 +23,7 @@ import java.util.concurrent.Future; import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.Preconditions; @@ -44,6 +45,8 @@ class KafkaWriter extends DoFn, Void> { protected transient @Nullable Callback callback; + private final Counter callbackCounter = Metrics.counter("sink", "callbacks"); + @Setup public void setup() { if (spec.getProducerFactoryFn() != null) { @@ -152,6 +155,7 @@ private synchronized void checkForFailures() throws IOException { private class SendCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { + callbackCounter.inc(); if (exception == null) { return; } From b1d961d2eb7bf217de97bd170e47ece097ef54a4 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 26 Apr 2023 12:15:52 -0400 Subject: [PATCH 12/14] Remove debugging metrics --- .../java/org/apache/beam/sdk/io/kafka/KafkaWriter.java | 4 ---- .../apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 10 ---------- 2 files changed, 14 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java index 2a313e94b244..c0c9772959f9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java @@ -23,7 +23,6 @@ import java.util.concurrent.Future; import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.Preconditions; @@ -45,8 +44,6 @@ class KafkaWriter extends DoFn, Void> { protected transient @Nullable Callback callback; - private final Counter callbackCounter = Metrics.counter("sink", "callbacks"); - @Setup public void setup() { if (spec.getProducerFactoryFn() != null) { @@ -155,7 +152,6 @@ private synchronized void checkForFailures() throws IOException { private class SendCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - callbackCounter.inc(); if (exception == null) { return; } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 0d4338882f4b..32436742df8a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -335,16 +335,6 @@ public ProcessContinuation processElement( Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); - LOG.error( - "RFKDF PE: KSD=" - + kafkaSourceDescriptor.getTopicPartition().topic() - + "-" - + kafkaSourceDescriptor.getTopicPartition().partition() - + " offset=[" - + tracker.currentRestriction().getFrom() - + "," - + tracker.currentRestriction().getTo() - + "]"); // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null From 80166bbd27f74d4c844818b7b6eb395fd9667e7d Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 26 Apr 2023 17:19:16 -0400 Subject: [PATCH 13/14] verify pipeline is not failed --- .../src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 8512536037c9..c6d07466040b 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -257,7 +257,7 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - assertEquals(PipelineResult.State.DONE, readState); + assertNotEquals(PipelineResult.State.FAILED, readState); if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); From acbd3c027d98285816e764c4f3a39a57e9eb4b07 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 27 Apr 2023 16:10:53 -0400 Subject: [PATCH 14/14] remove extra newline --- .../java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 32436742df8a..edb0a7972bb9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -335,7 +335,6 @@ public ProcessContinuation processElement( Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); - // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null && checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {