From d943efb98a342a79b3c9429a56ff3ba035f706bc Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Fri, 23 Feb 2024 11:53:43 -0600 Subject: [PATCH] Rename the recordsProcessed field introduced in #15930 --- docs/ingestion/tasks.md | 4 ++-- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 8 ++++---- .../IngestionStatsAndErrorsTaskReportData.java | 16 ++++++++-------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index af9d8b7f88b8..4ff621c5c922 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -83,7 +83,7 @@ An example output is shown below: }, "segmentAvailabilityConfirmed": false, "segmentAvailabilityWaitTimeMs": 0, - "recordsProcessed": { + "recordsProcessedPerPartition": { "partition-a": 5789 }, "errorMsg": null @@ -101,7 +101,7 @@ For some task types, the indexing task can wait for the newly ingested segments |---|---| |`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.| |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.| -|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| +|`recordsProcessedPerPartition`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| ### Live report diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index c3d1d2133831..5fb4fdac41f7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -3234,8 +3234,8 @@ public void testCompletionReportPartitionStats() throws Exception Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); - Assert.assertEquals(reportData.getRecordsProcessed().size(), 1); - Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L); + Assert.assertEquals(reportData.getRecordsProcessedPerPartition().size(), 1); + Assert.assertEquals(reportData.getRecordsProcessedPerPartition().values().iterator().next(), (Long) 6L); } @Test(timeout = 60_000L) @@ -3282,8 +3282,8 @@ public void testCompletionReportMultiplePartitionStats() throws Exception Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); - Assert.assertEquals(reportData.getRecordsProcessed().size(), 2); - Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L))); + Assert.assertEquals(reportData.getRecordsProcessedPerPartition().size(), 2); + Assert.assertTrue(reportData.getRecordsProcessedPerPartition().values().containsAll(ImmutableSet.of(6L, 2L))); } public static class TestKafkaInputFormat implements InputFormat diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index ecdd9d3aeba9..790c0ea45d8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -48,7 +48,7 @@ public class IngestionStatsAndErrorsTaskReportData private long segmentAvailabilityWaitTimeMs; @JsonProperty - private Map recordsProcessed; + private Map recordsProcessedPerPartition; public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @@ -57,7 +57,7 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("errorMsg") @Nullable String errorMsg, @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, - @JsonProperty("recordsProcessed") Map recordsProcessed + @JsonProperty("recordsProcessedPerPartition") Map recordsProcessedPerPartition ) { this.ingestionState = ingestionState; @@ -66,7 +66,7 @@ public IngestionStatsAndErrorsTaskReportData( this.errorMsg = errorMsg; this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; - this.recordsProcessed = recordsProcessed; + this.recordsProcessedPerPartition = recordsProcessedPerPartition; } @JsonProperty @@ -108,9 +108,9 @@ public long getSegmentAvailabilityWaitTimeMs() @JsonProperty @Nullable - public Map getRecordsProcessed() + public Map getRecordsProcessedPerPartition() { - return recordsProcessed; + return recordsProcessedPerPartition; } public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( @@ -137,7 +137,7 @@ public boolean equals(Object o) Objects.equals(getErrorMsg(), that.getErrorMsg()) && Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) && - Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()); + Objects.equals(getRecordsProcessedPerPartition(), that.getRecordsProcessedPerPartition()); } @Override @@ -150,7 +150,7 @@ public int hashCode() getErrorMsg(), isSegmentAvailabilityConfirmed(), getSegmentAvailabilityWaitTimeMs(), - getRecordsProcessed() + getRecordsProcessedPerPartition() ); } @@ -164,7 +164,7 @@ public String toString() ", errorMsg='" + errorMsg + '\'' + ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + - ", recordsProcessed=" + recordsProcessed + + ", recordsProcessedPerPartition=" + recordsProcessedPerPartition + '}'; } }