diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index b7bbc904da81..100f06d42d07 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -222,6 +223,9 @@ private ReadFromKafkaDoFn( private transient @Nullable LoadingCache avgRecordSize; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; + + private HashMap perPartitionBacklogMetrics = new HashMap();; + @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @@ -342,6 +346,13 @@ public double getSize( if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) { return numRecords; } + if (offsetEstimatorCache != null) { + for (Map.Entry tp : + offsetEstimatorCache.entrySet()) { + perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate()); + } + } + return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords); } @@ -394,6 +405,13 @@ public ProcessContinuation processElement( Metrics.distribution( METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString()); + for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { + Gauge backlog = + Metrics.gauge( + METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey()); + backlog.set(backlogSplit.getValue()); + } + // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null && checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {