From bf28c4124d48a91ff08f141dcb68db6af5ae17b0 Mon Sep 17 00:00:00 2001 From: Naireen Date: Tue, 30 Apr 2024 22:13:27 +0000 Subject: [PATCH 1/2] Set backlog in gauge metric --- .../sdk/io/kafka/KafkaUnboundedReader.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 054eb502cd85..fed03047cf16 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -224,6 +225,9 @@ public boolean advance() throws IOException { METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString()); rawSizes.update(recordSize); + for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { + backlogBytesOfSplit.set(backlogSplit.getValue()); + } return true; } else { // -- (b) @@ -341,6 +345,7 @@ public long getSplitBacklogBytes() { private final Counter bytesReadBySplit; private final Gauge backlogBytesOfSplit; private final Gauge backlogElementsOfSplit; + private HashMap perPartitionBacklogMetrics = new HashMap();; private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC); // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed). @@ -491,6 +496,10 @@ Instant updateAndGetWatermark() { lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext()); return lastWatermark; } + + String name() { + return this.topicPartition.toString(); + } } KafkaUnboundedReader( @@ -528,14 +537,16 @@ Instant updateAndGetWatermark() { prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis())); } - states.add( - new PartitionState<>( + PartitionState state = + new PartitionState( tp, nextOffset, source .getSpec() .getTimestampPolicyFactory() - .createTimestampPolicy(tp, prevWatermark))); + .createTimestampPolicy(tp, prevWatermark)); + states.add(state); + perPartitionBacklogMetrics.put(state.name(), 0L); } partitionStates = ImmutableList.copyOf(states); @@ -717,6 +728,7 @@ private long getSplitBacklogMessageCount() { if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } + perPartitionBacklogMetrics.put(p.name(), pBacklog); backlogCount += pBacklog; } From 97a8fc9710c28495c61fceacd00c3c06111a8cc1 Mon Sep 17 00:00:00 2001 From: Naireen Date: Mon, 13 May 2024 17:26:17 +0000 Subject: [PATCH 2/2] add backlog metrics to splittable dofn Kafka read implementatino --- .../sdk/io/kafka/KafkaUnboundedReader.java | 18 +++--------------- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index fed03047cf16..054eb502cd85 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -225,9 +224,6 @@ public boolean advance() throws IOException { METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString()); rawSizes.update(recordSize); - for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { - backlogBytesOfSplit.set(backlogSplit.getValue()); - } return true; } else { // -- (b) @@ -345,7 +341,6 @@ public long getSplitBacklogBytes() { private final Counter bytesReadBySplit; private final Gauge backlogBytesOfSplit; private final Gauge backlogElementsOfSplit; - private HashMap perPartitionBacklogMetrics = new HashMap();; private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC); // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed). @@ -496,10 +491,6 @@ Instant updateAndGetWatermark() { lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext()); return lastWatermark; } - - String name() { - return this.topicPartition.toString(); - } } KafkaUnboundedReader( @@ -537,16 +528,14 @@ String name() { prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis())); } - PartitionState state = - new PartitionState( + states.add( + new PartitionState<>( tp, nextOffset, source .getSpec() .getTimestampPolicyFactory() - .createTimestampPolicy(tp, prevWatermark)); - states.add(state); - perPartitionBacklogMetrics.put(state.name(), 0L); + .createTimestampPolicy(tp, prevWatermark))); } partitionStates = ImmutableList.copyOf(states); @@ -728,7 +717,6 @@ private long getSplitBacklogMessageCount() { if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } - perPartitionBacklogMetrics.put(p.name(), pBacklog); backlogCount += pBacklog; } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index b7bbc904da81..100f06d42d07 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -222,6 +223,9 @@ private ReadFromKafkaDoFn( private transient @Nullable LoadingCache avgRecordSize; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; + + private HashMap perPartitionBacklogMetrics = new HashMap();; + @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @@ -342,6 +346,13 @@ public double getSize( if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) { return numRecords; } + if (offsetEstimatorCache != null) { + for (Map.Entry tp : + offsetEstimatorCache.entrySet()) { + perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate()); + } + } + return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords); } @@ -394,6 +405,13 @@ public ProcessContinuation processElement( Metrics.distribution( METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString()); + for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { + Gauge backlog = + Metrics.gauge( + METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey()); + backlog.set(backlogSplit.getValue()); + } + // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null && checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {