From 3a945ce179693cc4ecb292af48df73758241477f Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 22 Feb 2024 16:37:35 -0600 Subject: [PATCH 01/23] Initial commit --- docs/ingestion/tasks.md | 8 +++++++ ...IngestionStatsAndErrorsTaskReportData.java | 11 ++++++++- .../AppenderatorDriverRealtimeIndexTask.java | 4 +++- .../indexing/common/task/HadoopIndexTask.java | 4 +++- .../druid/indexing/common/task/IndexTask.java | 7 ++++-- .../GeneratedPartitionsMetadataReport.java | 5 ++-- .../parallel/GeneratedPartitionsReport.java | 22 ++++++++++++++--- .../parallel/ParallelIndexSupervisorTask.java | 23 ++++++++++++++---- .../PartialHashSegmentGenerateTask.java | 9 +++++-- .../PartialRangeSegmentGenerateTask.java | 9 +++++-- .../parallel/PartialSegmentGenerateTask.java | 24 ++++++++++++++++--- .../batch/parallel/SinglePhaseSubTask.java | 4 +++- .../SeekableStreamIndexTaskRunner.java | 4 +++- .../common/task/TaskReportSerdeTest.java | 4 +++- .../ParallelIndexSupervisorTaskTest.java | 4 ++-- 15 files changed, 116 insertions(+), 26 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 855c98243476..6114fb2bd457 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -99,6 +99,14 @@ For some task types, the indexing task can wait for the newly ingested segments |`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.| |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.| + +#### Compaction Task Fields + +| Field | Description | +|---------------------|----------------------------------------------| +| `segmentsRead` | # of segments read by a compaction task | +| `segmentsPublished` | # of segments published by a compaction task | + ### Live report When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index 28388770c057..09ed9970aee0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -47,13 +47,20 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private long segmentAvailabilityWaitTimeMs; + @JsonProperty + private int segmentsRead; + @JsonProperty + private int segmentsPublished; + public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @JsonProperty("rowStats") Map rowStats, @JsonProperty("errorMsg") @Nullable String errorMsg, @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, - @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs + @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, + @JsonProperty("segmentsRead") int segmentsRead, + @JsonProperty("segmentsRead") int segmentsPublished ) { this.ingestionState = ingestionState; @@ -62,6 +69,8 @@ public IngestionStatsAndErrorsTaskReportData( this.errorMsg = errorMsg; this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; + this.segmentsRead = segmentsRead; + this.segmentsPublished = segmentsPublished; } @JsonProperty diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9cee79b63086..2f6bac78be8f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -620,7 +620,9 @@ private Map getTaskCompletionReports() getTaskCompletionRowStats(), errorMsg, errorMsg == null, - 0L + 0L, + 0, + 0 ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 37bbf647e74f..f7a8a7aabd1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -693,7 +693,9 @@ private Map getTaskCompletionReports() getTaskCompletionRowStats(), errorMsg, segmentAvailabilityConfirmationCompleted, - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + 0, + 0 ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index e90f531b58d4..b1b75dffebdb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -579,7 +579,9 @@ private Map getTaskCompletionReports() getTaskCompletionRowStats(), errorMsg, segmentAvailabilityConfirmationCompleted, - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + 0, + 0 ) ) ); @@ -1046,7 +1048,8 @@ private TaskStatus generateAndPublishSegments( log.debugSegments(published.getSegments(), "Published segments"); - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + toolbox.getTaskReportFileWriter() + .write(getId(), getTaskCompletionReports()); return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index a83c3cc74120..f3b86c393a84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -38,9 +38,10 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport GeneratedPartitionsMetadataReport( @JsonProperty("taskId") String taskId, @JsonProperty("partitionStats") List partitionStats, - @JsonProperty("taskReport") Map taskReport + @JsonProperty("taskReport") Map taskReport, + @JsonProperty("segmentsRead") Integer segmentsRead ) { - super(taskId, partitionStats, taskReport); + super(taskId, partitionStats, taskReport, segmentsRead); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index 1fa025d1c91f..648d2f24ffc9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -37,11 +37,19 @@ public class GeneratedPartitionsReport implements SubTaskReport private final List partitionStats; private final Map taskReport; - GeneratedPartitionsReport(String taskId, List partitionStats, Map taskReport) + private final Integer segmentsRead; + + GeneratedPartitionsReport( + String taskId, + List partitionStats, + Map taskReport, + Integer segmentsRead + ) { this.taskId = taskId; this.partitionStats = partitionStats; this.taskReport = taskReport; + this.segmentsRead = segmentsRead; } @Override @@ -63,6 +71,12 @@ public List getPartitionStats() return partitionStats; } + @JsonProperty + public Integer getSegmentsRead() + { + return segmentsRead; + } + @Override public boolean equals(Object o) { @@ -75,13 +89,14 @@ public boolean equals(Object o) GeneratedPartitionsReport that = (GeneratedPartitionsReport) o; return Objects.equals(taskId, that.taskId) && Objects.equals(partitionStats, that.partitionStats) && - Objects.equals(taskReport, that.taskReport); + Objects.equals(taskReport, that.taskReport) && + Objects.equals(segmentsRead, that.segmentsRead); } @Override public int hashCode() { - return Objects.hash(taskId, partitionStats, taskReport); + return Objects.hash(taskId, partitionStats, taskReport, segmentsRead); } @Override @@ -91,6 +106,7 @@ public String toString() "taskId='" + taskId + '\'' + ", partitionStats=" + partitionStats + ", taskReport=" + taskReport + + ", segmentsRead=" + segmentsRead + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index df987798aa72..b4feada231a5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -202,6 +202,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private volatile Pair, Map> indexGenerateRowStats; private IngestionState ingestionState; + private int segmentsRead = 0; + private int segmentsPublished = 0; @JsonCreator @@ -633,6 +635,14 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception if (state.isSuccess()) { //noinspection ConstantConditions publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); + segmentsRead = parallelSinglePhaseRunner.getReports() + .values() + .stream() + .mapToInt(report -> report.getOldSegments().size()).sum(); + segmentsRead = parallelSinglePhaseRunner.getReports() + .values() + .stream() + .mapToInt(report -> report.getNewSegments().size()).sum(); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } @@ -806,7 +816,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions - publishSegments(toolbox, mergeRunner.getReports()); + segmentsPublished = publishSegments(toolbox, mergeRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(mergeRunner.getReports()); } @@ -906,7 +916,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep TaskState mergeState = runNextPhase(mergeRunner); TaskStatus taskStatus; if (mergeState.isSuccess()) { - publishSegments(toolbox, mergeRunner.getReports()); + segmentsPublished = publishSegments(toolbox, mergeRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(mergeRunner.getReports()); } @@ -1120,7 +1130,7 @@ private static Pair getPartitionBoundaries(int index, int tota return Pair.of(start, stop); } - private void publishSegments( + private int publishSegments( TaskToolbox toolbox, Map reportsMap ) @@ -1188,6 +1198,8 @@ private void publishSegments( } else { throw new ISE("Failed to publish segments"); } + + return newSegments.size(); } private TaskStatus runSequential(TaskToolbox toolbox) throws Exception @@ -1240,7 +1252,9 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, rowStatsAndUnparseableEvents.lhs, taskStatus.getErrorMsg(), segmentAvailabilityConfirmed, - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + segmentsRead, + segmentsPublished ) ) ); @@ -1621,6 +1635,7 @@ private Pair, Map> doGetRowStatsAndUnparseab getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); + segmentsRead += generatedPartitionsReport.getSegmentsRead(); } RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 49e3591ff18a..1094d6b5bed9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -177,12 +177,17 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd } @Override - GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( + TaskToolbox toolbox, + List segments, + Map taskReport, + Integer segmentsRead + ) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); - return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport); + return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport, segmentsRead); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 27604eb7e770..e31677059eae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -192,11 +192,16 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd } @Override - GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( + TaskToolbox toolbox, + List segments, + Map taskReport, + Integer segmentsRead + ) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); - return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport); + return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport, segmentsRead); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index e20c7bdbe352..81e0d46f51ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -127,7 +128,12 @@ public final TaskStatus runTask(TaskToolbox toolbox) throws Exception Map taskReport = getTaskCompletionReports(); - taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport)); + taskClient.report(createGeneratedPartitionsReport( + toolbox, + segments, + taskReport, + getSegementsSize(inputSource) + )); return TaskStatus.success(getId()); } @@ -146,9 +152,19 @@ abstract SegmentAllocatorForBatch createSegmentAllocator( abstract T createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, - Map taskReport + Map taskReport, + Integer segmentsRead ); + private int getSegementsSize(InputSource inputSource) + { + if (inputSource instanceof DruidInputSource) { + return ((DruidInputSource) inputSource).getSegmentIds().size(); + } + + return 0; + } + private List generateSegments( final TaskToolbox toolbox, final ParallelIndexSupervisorTaskClient taskClient, @@ -247,7 +263,9 @@ private Map getTaskCompletionReports() getTaskCompletionRowStats(), "", false, // not applicable for parallel subtask - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + 0, + 0 ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index a3bd47d29603..17f8714c9e61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -643,7 +643,9 @@ private Map getTaskCompletionReports() getTaskCompletionRowStats(), errorMsg, false, // not applicable for parallel subtask - segmentAvailabilityWaitTimeMs + segmentAvailabilityWaitTimeMs, + 0, + 0 ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 9fc743bd23b8..9920f17859a2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1126,7 +1126,9 @@ private Map getTaskCompletionReports(@Nullable String errorM getTaskCompletionRowStats(), errorMsg, errorMsg == null, - handoffWaitMs + handoffWaitMs, + 0, + 0 ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 4231f318efd8..3a813239dce9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -69,7 +69,9 @@ public void testSerde() throws Exception ), "an error message", true, - 1000L + 1000L, + 0, + 0 ) ); String report1serialized = jsonMapper.writeValueAsString(report1); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 6fb56aa33f7f..ce0ed75da48d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -450,13 +450,13 @@ public void test_getPartitionToLocations_ordersPartitionsCorrectly() createRangePartitionStat(day2, 7), createRangePartitionStat(day1, 0), createRangePartitionStat(day2, 1) - ), null)); + ), null, 1)); taskIdToReport.put(task2, new GeneratedPartitionsReport(task2, Arrays.asList( createRangePartitionStat(day1, 4), createRangePartitionStat(day1, 6), createRangePartitionStat(day2, 1), createRangePartitionStat(day1, 1) - ), null)); + ), null, 1)); Map> partitionToLocations = ParallelIndexSupervisorTask.getPartitionToLocations(taskIdToReport); From fc188429a5a46376d84d2c6100d13723d6f5ec85 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 26 Feb 2024 17:46:34 -0600 Subject: [PATCH 02/23] fix typo --- .../indexing/common/IngestionStatsAndErrorsTaskReportData.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index d9bab8794049..c98695f08aab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -64,7 +64,7 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, @JsonProperty("recordsProcessed") Map recordsProcessed, @JsonProperty("segmentsRead") int segmentsRead, - @JsonProperty("segmentsRead") int segmentsPublished + @JsonProperty("segmentsPublished") int segmentsPublished ) { this.ingestionState = ingestionState; From 010e53da18c29c6134f10db2fec67f72a116233e Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 26 Feb 2024 21:51:33 -0600 Subject: [PATCH 03/23] fix missing coverage --- .../common/task/TaskReportSerdeTest.java | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 9994c087074e..45a561b99d80 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -71,8 +71,8 @@ public void testSerde() throws Exception true, 1000L, ImmutableMap.of("PartitionA", 5000L), - 0, - 0 + 5, + 10 ) ); String report1serialized = jsonMapper.writeValueAsString(report1); @@ -112,6 +112,47 @@ public void testExceptionWhileWritingReport() throws Exception ); } + @Test + public void testEquals() + { + IngestionStatsAndErrorsTaskReport report = generateReport(ImmutableMap.of("PartitionA", 5000L), 5, 10); + + // changed partition stats + Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionB", 5000L), 5, 10), report); + + // changed segements read + Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 1, 10), report); + + // changed segements published + Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 5, 5), report); + } + + private IngestionStatsAndErrorsTaskReport generateReport( + Map partitionStats, + int segmentsRead, + int segmentsPublished + ) + { + return new IngestionStatsAndErrorsTaskReport( + "testID", + new IngestionStatsAndErrorsTaskReportData( + IngestionState.BUILD_SEGMENTS, + ImmutableMap.of( + "hello", "world" + ), + ImmutableMap.of( + "number", 1234 + ), + "an error message", + true, + 1000L, + partitionStats, + segmentsRead, + segmentsPublished + ) + ); + } + /** * Task report that throws an exception while being serialized. */ From caa17e721c213679e6f38cd521495cede174d778 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 27 Feb 2024 14:44:08 -0600 Subject: [PATCH 04/23] change to long --- .../IngestionStatsAndErrorsTaskReportData.java | 14 ++++++++------ .../AppenderatorDriverRealtimeIndexTask.java | 4 ++-- .../indexing/common/task/HadoopIndexTask.java | 4 ++-- .../druid/indexing/common/task/IndexTask.java | 4 ++-- .../GeneratedPartitionsMetadataReport.java | 2 +- .../parallel/GeneratedPartitionsReport.java | 8 +++++--- .../parallel/ParallelIndexSupervisorTask.java | 12 ++++++------ .../PartialHashSegmentGenerateTask.java | 2 +- .../PartialRangeSegmentGenerateTask.java | 2 +- .../parallel/PartialSegmentGenerateTask.java | 12 ++++++------ .../batch/parallel/SinglePhaseSubTask.java | 4 ++-- .../SeekableStreamIndexTaskRunner.java | 4 ++-- .../common/task/TaskReportSerdeTest.java | 18 +++++++++--------- .../ParallelIndexSupervisorTaskTest.java | 4 ++-- 14 files changed, 49 insertions(+), 45 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index c98695f08aab..372bf1b3c6f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -48,9 +48,9 @@ public class IngestionStatsAndErrorsTaskReportData private long segmentAvailabilityWaitTimeMs; @JsonProperty - private int segmentsRead; + private Long segmentsRead; @JsonProperty - private int segmentsPublished; + private Long segmentsPublished; @JsonProperty private Map recordsProcessed; @@ -63,8 +63,8 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, @JsonProperty("recordsProcessed") Map recordsProcessed, - @JsonProperty("segmentsRead") int segmentsRead, - @JsonProperty("segmentsPublished") int segmentsPublished + @JsonProperty("segmentsRead") Long segmentsRead, + @JsonProperty("segmentsPublished") Long segmentsPublished ) { this.ingestionState = ingestionState; @@ -123,13 +123,15 @@ public Map getRecordsProcessed() } @JsonProperty - public int getSegmentsRead() + @Nullable + public Long getSegmentsRead() { return segmentsRead; } @JsonProperty - public int getSegmentsPublished() + @Nullable + public Long getSegmentsPublished() { return segmentsPublished; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 4d15963a5125..e0c7f9bc934f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -622,8 +622,8 @@ private Map getTaskCompletionReports() errorMsg == null, 0L, Collections.emptyMap(), - 0, - 0 + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 9dae764927c7..019b63520e75 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -695,8 +695,8 @@ private Map getTaskCompletionReports() segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, Collections.emptyMap(), - 0, - 0 + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index c153d14a0afa..2b1431e73432 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -581,8 +581,8 @@ private Map getTaskCompletionReports() segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, Collections.emptyMap(), - 0, - 0 + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index f3b86c393a84..0e97f76007f5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -39,7 +39,7 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport @JsonProperty("taskId") String taskId, @JsonProperty("partitionStats") List partitionStats, @JsonProperty("taskReport") Map taskReport, - @JsonProperty("segmentsRead") Integer segmentsRead + @JsonProperty("segmentsRead") Long segmentsRead ) { super(taskId, partitionStats, taskReport, segmentsRead); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index 648d2f24ffc9..92521c7d1b87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.common.TaskReport; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.Objects; @@ -37,13 +38,13 @@ public class GeneratedPartitionsReport implements SubTaskReport private final List partitionStats; private final Map taskReport; - private final Integer segmentsRead; + private final Long segmentsRead; GeneratedPartitionsReport( String taskId, List partitionStats, Map taskReport, - Integer segmentsRead + Long segmentsRead ) { this.taskId = taskId; @@ -72,7 +73,8 @@ public List getPartitionStats() } @JsonProperty - public Integer getSegmentsRead() + @Nullable + public Long getSegmentsRead() { return segmentsRead; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 1ba9bbe4f273..8bbcc06380bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -202,8 +202,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private volatile Pair, Map> indexGenerateRowStats; private IngestionState ingestionState; - private int segmentsRead = 0; - private int segmentsPublished = 0; + private Long segmentsRead = 0L; + private Long segmentsPublished = 0L; @JsonCreator @@ -638,11 +638,11 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception segmentsRead = parallelSinglePhaseRunner.getReports() .values() .stream() - .mapToInt(report -> report.getOldSegments().size()).sum(); - segmentsRead = parallelSinglePhaseRunner.getReports() + .mapToLong(report -> report.getOldSegments().size()).sum(); + segmentsPublished = parallelSinglePhaseRunner.getReports() .values() .stream() - .mapToInt(report -> report.getNewSegments().size()).sum(); + .mapToLong(report -> report.getNewSegments().size()).sum(); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } @@ -1130,7 +1130,7 @@ private static Pair getPartitionBoundaries(int index, int tota return Pair.of(start, stop); } - private int publishSegments( + private long publishSegments( TaskToolbox toolbox, Map reportsMap ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 1094d6b5bed9..5b3202a9e9dc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -181,7 +181,7 @@ GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, Map taskReport, - Integer segmentsRead + Long segmentsRead ) { List partitionStats = segments.stream() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index e31677059eae..17090d13b1fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -196,7 +196,7 @@ GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, Map taskReport, - Integer segmentsRead + Long segmentsRead ) { List partitionStats = segments.stream() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index eab38bd5c024..3290501b9b7c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -153,16 +153,16 @@ abstract T createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, Map taskReport, - Integer segmentsRead + Long segmentsRead ); - private int getSegementsSize(InputSource inputSource) + private Long getSegementsSize(InputSource inputSource) { if (inputSource instanceof DruidInputSource) { - return ((DruidInputSource) inputSource).getSegmentIds().size(); + return (long) ((DruidInputSource) inputSource).getSegmentIds().size(); } - return 0; + return null; } private List generateSegments( @@ -265,8 +265,8 @@ private Map getTaskCompletionReports() false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, Collections.emptyMap(), - 0, - 0 + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 595ffd56be16..bbd3f2964b61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -645,8 +645,8 @@ private Map getTaskCompletionReports() false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, Collections.emptyMap(), - 0, - 0 + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 0c06b9e08cab..09e89b1d6019 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1131,8 +1131,8 @@ private Map getTaskCompletionReports(@Nullable String errorM errorMsg == null, handoffWaitMs, getPartitionStats(), - 0, - 0 + null, + null ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 45a561b99d80..7d6dbcdd1837 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -71,8 +71,8 @@ public void testSerde() throws Exception true, 1000L, ImmutableMap.of("PartitionA", 5000L), - 5, - 10 + 5L, + 10L ) ); String report1serialized = jsonMapper.writeValueAsString(report1); @@ -113,24 +113,24 @@ public void testExceptionWhileWritingReport() throws Exception } @Test - public void testEquals() + public void testNegativeEquals() { - IngestionStatsAndErrorsTaskReport report = generateReport(ImmutableMap.of("PartitionA", 5000L), 5, 10); + IngestionStatsAndErrorsTaskReport report = generateReport(ImmutableMap.of("PartitionA", 5000L), 5L, 10L); // changed partition stats - Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionB", 5000L), 5, 10), report); + Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionB", 5000L), 5L, 10L), report); // changed segements read - Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 1, 10), report); + Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 1L, 10L), report); // changed segements published - Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 5, 5), report); + Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 5L, 5L), report); } private IngestionStatsAndErrorsTaskReport generateReport( Map partitionStats, - int segmentsRead, - int segmentsPublished + Long segmentsRead, + Long segmentsPublished ) { return new IngestionStatsAndErrorsTaskReport( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index ce0ed75da48d..17d19dd0d894 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -450,13 +450,13 @@ public void test_getPartitionToLocations_ordersPartitionsCorrectly() createRangePartitionStat(day2, 7), createRangePartitionStat(day1, 0), createRangePartitionStat(day2, 1) - ), null, 1)); + ), null, 1L)); taskIdToReport.put(task2, new GeneratedPartitionsReport(task2, Arrays.asList( createRangePartitionStat(day1, 4), createRangePartitionStat(day1, 6), createRangePartitionStat(day2, 1), createRangePartitionStat(day1, 1) - ), null, 1)); + ), null, 1L)); Map> partitionToLocations = ParallelIndexSupervisorTask.getPartitionToLocations(taskIdToReport); From 44ebd78f8a16c0ec6d384a8f3839d168ee3918a2 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 27 Feb 2024 22:06:48 -0600 Subject: [PATCH 05/23] fix tests --- .../batch/parallel/ParallelIndexSupervisorTask.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 8bbcc06380bb..4a63d467e396 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -202,8 +202,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private volatile Pair, Map> indexGenerateRowStats; private IngestionState ingestionState; - private Long segmentsRead = 0L; - private Long segmentsPublished = 0L; + private Long segmentsRead; + private Long segmentsPublished; @JsonCreator @@ -1626,6 +1626,7 @@ private Pair, Map> doGetRowStatsAndUnparseab final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters(); final List unparseableEvents = new ArrayList<>(); + long totalSegmentsRead = 0L; for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) { Map taskReport = generatedPartitionsReport.getTaskReport(); if (taskReport == null || taskReport.isEmpty()) { @@ -1636,7 +1637,9 @@ private Pair, Map> doGetRowStatsAndUnparseab getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); - segmentsRead += generatedPartitionsReport.getSegmentsRead(); + if (generatedPartitionsReport.getSegmentsRead() != null) { + totalSegmentsRead += generatedPartitionsReport.getSegmentsRead(); + } } RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks( @@ -1645,6 +1648,9 @@ private Pair, Map> doGetRowStatsAndUnparseab includeUnparseable ); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks); + if (totalSegmentsRead > 0) { + segmentsRead = totalSegmentsRead; + } return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents); } From d24f6548f07c7e023a5a9bf44c15ea0ee8829b2a Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Wed, 28 Feb 2024 15:04:59 -0600 Subject: [PATCH 06/23] commit suggestions: Update indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java --- .../common/task/batch/parallel/GeneratedPartitionsReport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index 92521c7d1b87..e4d336de4c02 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -108,7 +108,7 @@ public String toString() "taskId='" + taskId + '\'' + ", partitionStats=" + partitionStats + ", taskReport=" + taskReport + - ", segmentsRead=" + segmentsRead + + ", segmentsRead=" + segmentsRead + '}'; } } From f8c8c73dfb701969eabf5fff3586dd75d29a924d Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 09:29:20 -0600 Subject: [PATCH 07/23] address comments --- docs/ingestion/tasks.md | 8 +- ...IngestionStatsAndErrorsTaskReportData.java | 35 +----- .../ParallelCompactionTaskReportData.java | 101 ++++++++++++++++++ .../parallel/ParallelIndexSupervisorTask.java | 13 ++- 4 files changed, 120 insertions(+), 37 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index be707ddfeefe..e7855dc041ce 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -158,10 +158,10 @@ For some task types, the indexing task can wait for the newly ingested segments #### Compaction Task Fields -| Field | Description | -|---------------------|----------------------------------------------| -| `segmentsRead` | # of segments read by a compaction task | -| `segmentsPublished` | # of segments published by a compaction task | +| Field | Description | +|---------------------|-------------------------------------------------------| +| `segmentsRead` | # of segments read by a parallel compaction task | +| `segmentsPublished` | # of segments published by a parallel compaction task | ### Live report diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index 372bf1b3c6f2..ecdd9d3aeba9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -47,11 +47,6 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private long segmentAvailabilityWaitTimeMs; - @JsonProperty - private Long segmentsRead; - @JsonProperty - private Long segmentsPublished; - @JsonProperty private Map recordsProcessed; @@ -62,9 +57,7 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("errorMsg") @Nullable String errorMsg, @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, - @JsonProperty("recordsProcessed") Map recordsProcessed, - @JsonProperty("segmentsRead") Long segmentsRead, - @JsonProperty("segmentsPublished") Long segmentsPublished + @JsonProperty("recordsProcessed") Map recordsProcessed ) { this.ingestionState = ingestionState; @@ -74,8 +67,6 @@ public IngestionStatsAndErrorsTaskReportData( this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; this.recordsProcessed = recordsProcessed; - this.segmentsRead = segmentsRead; - this.segmentsPublished = segmentsPublished; } @JsonProperty @@ -122,20 +113,6 @@ public Map getRecordsProcessed() return recordsProcessed; } - @JsonProperty - @Nullable - public Long getSegmentsRead() - { - return segmentsRead; - } - - @JsonProperty - @Nullable - public Long getSegmentsPublished() - { - return segmentsPublished; - } - public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( Map taskReports ) @@ -160,9 +137,7 @@ public boolean equals(Object o) Objects.equals(getErrorMsg(), that.getErrorMsg()) && Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) && - Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) && - Objects.equals(getSegmentsRead(), that.getSegmentsRead()) && - Objects.equals(getSegmentsPublished(), that.getSegmentsPublished()); + Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()); } @Override @@ -175,9 +150,7 @@ public int hashCode() getErrorMsg(), isSegmentAvailabilityConfirmed(), getSegmentAvailabilityWaitTimeMs(), - getRecordsProcessed(), - getSegmentsRead(), - getSegmentsPublished() + getRecordsProcessed() ); } @@ -192,8 +165,6 @@ public String toString() ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + ", recordsProcessed=" + recordsProcessed + - ", segmentsRead=" + segmentsRead + - ", segmentsPublished=" + segmentsPublished + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java new file mode 100644 index 000000000000..2009097795b3 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java @@ -0,0 +1,101 @@ +package org.apache.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.IngestionState; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Objects; + +public class ParallelCompactionTaskReportData extends IngestionStatsAndErrorsTaskReportData +{ + @JsonProperty + private Long segmentsRead; + @JsonProperty + private Long segmentsPublished; + + @JsonProperty + private Map recordsProcessed; + + public ParallelCompactionTaskReportData( + @JsonProperty("ingestionState") IngestionState ingestionState, + @JsonProperty("unparseableEvents") Map unparseableEvents, + @JsonProperty("rowStats") Map rowStats, + @JsonProperty("errorMsg") @Nullable String errorMsg, + @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, + @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, + @JsonProperty("recordsProcessed") Map recordsProcessed, + @JsonProperty("segmentsRead") Long segmentsRead, + @JsonProperty("segmentsPublished") Long segmentsPublished + ) + { + super( + ingestionState, + unparseableEvents, + rowStats, + errorMsg, + segmentAvailabilityConfirmed, + segmentAvailabilityWaitTimeMs, + recordsProcessed + ); + this.segmentsRead = segmentsRead; + this.segmentsPublished = segmentsPublished; + } + + @JsonProperty + @Nullable + public Long getSegmentsRead() + { + return segmentsRead; + } + + @JsonProperty + @Nullable + public Long getSegmentsPublished() + { + return segmentsPublished; + } + + public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( + Map taskReports + ) + { + return (IngestionStatsAndErrorsTaskReportData) taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) + .getPayload(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ParallelCompactionTaskReportData that = (ParallelCompactionTaskReportData) o; + return super.equals(o) && + Objects.equals(getSegmentsRead(), that.getSegmentsRead()) && + Objects.equals(getSegmentsPublished(), that.getSegmentsPublished()); + } + + @Override + public int hashCode() + { + return Objects.hash( + super.hashCode(), + getSegmentsRead(), + getSegmentsPublished() + ); + } + + @Override + public String toString() + { + return "ParallelCompactionTaskReportData {" + + "IngestionStatsAndErrorsTaskReportData=" + super.toString() + + ", segmentsRead=" + segmentsRead + + ", segmentsPublished=" + segmentsPublished + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 22b285b63b39..7d9d4cd575a7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -42,6 +42,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -1260,7 +1261,8 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( getId(), - new IngestionStatsAndErrorsTaskReportData( + isCompactionTask ? + new ParallelCompactionTaskReportData( IngestionState.COMPLETED, rowStatsAndUnparseableEvents.rhs, rowStatsAndUnparseableEvents.lhs, @@ -1270,6 +1272,15 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, Collections.emptyMap(), segmentsRead, segmentsPublished + ) : + new IngestionStatsAndErrorsTaskReportData( + IngestionState.COMPLETED, + rowStatsAndUnparseableEvents.rhs, + rowStatsAndUnparseableEvents.lhs, + taskStatus.getErrorMsg(), + segmentAvailabilityConfirmed, + segmentAvailabilityWaitTimeMs, + Collections.emptyMap() ) ) ); From c1bd161a4f936b5ec337ce8953b80755a558a3ae Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 09:36:15 -0600 Subject: [PATCH 08/23] fix-compile-errors --- .../AppenderatorDriverRealtimeIndexTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 4 +- .../druid/indexing/common/task/IndexTask.java | 4 +- .../parallel/PartialSegmentGenerateTask.java | 4 +- .../batch/parallel/SinglePhaseSubTask.java | 4 +- .../SeekableStreamIndexTaskRunner.java | 4 +- .../common/task/TaskReportSerdeTest.java | 45 +------------------ 7 files changed, 7 insertions(+), 62 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index e0c7f9bc934f..9ea94b844cc4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -621,9 +621,7 @@ private Map getTaskCompletionReports() errorMsg, errorMsg == null, 0L, - Collections.emptyMap(), - null, - null + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 019b63520e75..1422720bf94d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -694,9 +694,7 @@ private Map getTaskCompletionReports() errorMsg, segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 1ba54dffa836..ce3245944e26 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -599,9 +599,7 @@ private Map getTaskCompletionReports() errorMsg, segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 3290501b9b7c..96f39725b8ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -264,9 +264,7 @@ private Map getTaskCompletionReports() "", false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index bbd3f2964b61..8d9bd8f7b6d6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -644,9 +644,7 @@ private Map getTaskCompletionReports() errorMsg, false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null + Collections.emptyMap() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 09e89b1d6019..c7f77cea5662 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1130,9 +1130,7 @@ private Map getTaskCompletionReports(@Nullable String errorM errorMsg, errorMsg == null, handoffWaitMs, - getPartitionStats(), - null, - null + getPartitionStats() ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 7d6dbcdd1837..13a850f5ee74 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -70,9 +70,7 @@ public void testSerde() throws Exception "an error message", true, 1000L, - ImmutableMap.of("PartitionA", 5000L), - 5L, - 10L + ImmutableMap.of("PartitionA", 5000L) ) ); String report1serialized = jsonMapper.writeValueAsString(report1); @@ -112,47 +110,6 @@ public void testExceptionWhileWritingReport() throws Exception ); } - @Test - public void testNegativeEquals() - { - IngestionStatsAndErrorsTaskReport report = generateReport(ImmutableMap.of("PartitionA", 5000L), 5L, 10L); - - // changed partition stats - Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionB", 5000L), 5L, 10L), report); - - // changed segements read - Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 1L, 10L), report); - - // changed segements published - Assert.assertNotEquals(generateReport(ImmutableMap.of("PartitionA", 5000L), 5L, 5L), report); - } - - private IngestionStatsAndErrorsTaskReport generateReport( - Map partitionStats, - Long segmentsRead, - Long segmentsPublished - ) - { - return new IngestionStatsAndErrorsTaskReport( - "testID", - new IngestionStatsAndErrorsTaskReportData( - IngestionState.BUILD_SEGMENTS, - ImmutableMap.of( - "hello", "world" - ), - ImmutableMap.of( - "number", 1234 - ), - "an error message", - true, - 1000L, - partitionStats, - segmentsRead, - segmentsPublished - ) - ); - } - /** * Task report that throws an exception while being serialized. */ From 70ab7628e2327df5f44551d95da5bcb7da280bc4 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 10:44:50 -0600 Subject: [PATCH 09/23] license --- .../ParallelCompactionTaskReportData.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java index 2009097795b3..c026262e2bb5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonProperty; @@ -14,9 +33,6 @@ public class ParallelCompactionTaskReportData extends IngestionStatsAndErrorsTas @JsonProperty private Long segmentsPublished; - @JsonProperty - private Map recordsProcessed; - public ParallelCompactionTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @@ -56,14 +72,6 @@ public Long getSegmentsPublished() return segmentsPublished; } - public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( - Map taskReports - ) - { - return (IngestionStatsAndErrorsTaskReportData) taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) - .getPayload(); - } - @Override public boolean equals(Object o) { From d318ff70ed0a3d5f3d252d6ad93fe82e67b67819 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 11:32:46 -0600 Subject: [PATCH 10/23] add IT Test --- ...IngestionStatsAndErrorsTaskReportData.java | 7 +++ .../tests/indexer/ITCompactionTaskTest.java | 54 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index ecdd9d3aeba9..566297f6a2b5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -20,12 +20,19 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.indexer.IngestionState; import javax.annotation.Nullable; import java.util.Map; import java.util.Objects; +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "kind", defaultImpl = IngestionStatsAndErrorsTaskReport.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "ingestion_stats", value = IngestionStatsAndErrorsTaskReport.class), + @JsonSubTypes.Type(name = "parallel_compaction_stats", value = ParallelCompactionTaskReportData.class) +}) public class IngestionStatsAndErrorsTaskReportData { @JsonProperty diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 95b3e1129706..aa25c1dc4e64 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -22,6 +22,7 @@ import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -43,6 +44,7 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -65,6 +67,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private static final String SEGMENT_METADATA_QUERY_RESOURCE = "/indexer/segment_metadata_query.json"; private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + private static final String PARALLEL_COMPACTION_TASK = "/indexer/wikipedia_compaction_task_parallel.json"; private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json"; private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json"; @@ -138,6 +141,54 @@ public void testCompactionWithQueryGranularityInGranularitySpec() throws Excepti } } + @Test + public void testParallelHashedCompaction() throws Exception + { + try (final Closeable ignored = unloader(fullDatasourceName)) { + loadData(INDEX_TASK, fullDatasourceName); + // 4 segments across 2 days + checkNumberOfSegments(4); + List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + expectedIntervalAfterCompaction.sort(null); + + checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4); + String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE); + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%", + jsonMapper.writeValueAsString("0") + ); + + queryHelper.testQueriesFromString(queryResponseTemplate); + String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null); + + // The original 4 segments should be compacted into 2 new segments + checkNumberOfSegments(2); + queryHelper.testQueriesFromString(queryResponseTemplate); + checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2); + + + checkCompactionIntervals(expectedIntervalAfterCompaction); + + Map reports = indexer.getTaskReport(taskId); + Assert.assertTrue(reports != null && reports.size() > 0); + + Assert.assertEquals(2, + reports.values() + .stream() + .mapToLong(r -> ((ParallelCompactionTaskReportData) r.getPayload()).getSegmentsPublished()) + .sum() + ); + Assert.assertEquals(4, + reports.values() + .stream() + .mapToLong(r -> ((ParallelCompactionTaskReportData) r.getPayload()).getSegmentsRead()) + .sum() + ); + } + } + @Test public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception { @@ -220,6 +271,9 @@ private void loadDataAndCompact( Map reports = indexer.getTaskReport(taskId); Assert.assertTrue(reports != null && reports.size() > 0); + + System.out.println(reports.size()); + System.out.println(Arrays.toString(reports.values().stream().map(r -> r.toString()).toArray())); } } From 7aeaa1dbf65c7053d195106ddbd02ea1718a5017 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 11:41:25 -0600 Subject: [PATCH 11/23] add another test --- .../common/task/CompactionTaskParallelRunTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 12cf09d33175..303dbe581dd4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -41,6 +41,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.TaskToolbox; @@ -250,6 +251,18 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc List reports = getIngestionReports(); Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task + + // this test reads 3 segments and publishes 6 segments + Assert.assertEquals( + 3, + reports.stream().mapToLong(r -> ((ParallelCompactionTaskReportData) r).getSegmentsRead()).sum() + ); + Assert.assertEquals( + 6, + reports.stream() + .mapToLong(r -> ((ParallelCompactionTaskReportData) r).getSegmentsPublished()) + .sum() + ); } @Test From 11c0eadfc93b2a2b657384b24afe676455b8c181 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 18:20:27 -0600 Subject: [PATCH 12/23] add test coverage --- .../common/task/TaskReportSerdeTest.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 13a850f5ee74..ba0974574258 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -25,9 +25,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TestUtils; @@ -94,6 +96,58 @@ public void testSerde() throws Exception Assert.assertEquals(reportMap1, reportMap2); } + @Test + public void testSerdeForParallelCompactionReport() throws Exception + { + IngestionStatsAndErrorsTaskReport report1 = new IngestionStatsAndErrorsTaskReport( + "testID", + new ParallelCompactionTaskReportData( + IngestionState.BUILD_SEGMENTS, + ImmutableMap.of( + "hello", "world" + ), + ImmutableMap.of( + "number", 1234 + ), + "an error message", + true, + 1000L, + ImmutableMap.of("PartitionA", 5000L), + 5L, + 10L + ) + ); + String report1serialized = jsonMapper.writeValueAsString(report1); + IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue( + report1serialized, + TaskReport.class + ); + Assert.assertEquals(report1, report2); + Assert.assertEquals(report1.hashCode(), report2.hashCode()); + + final File reportFile = temporaryFolder.newFile(); + final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); + writer.setObjectMapper(jsonMapper); + Map reportMap1 = TaskReport.buildTaskReports(report1); + writer.write("testID", reportMap1); + + Map reportMap2 = jsonMapper.readValue( + reportFile, + new TypeReference>() + { + } + ); + Assert.assertEquals(reportMap1, reportMap2); + } + + @Test + public void testEqualsForParallelCompactionReport() throws Exception + { + EqualsVerifier.simple(). + forClass(ParallelCompactionTaskReportData.class) + .verify(); + } + @Test public void testExceptionWhileWritingReport() throws Exception { From 7e3482eed28055601a812675fc10d68bf6a4a229 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 19:53:03 -0600 Subject: [PATCH 13/23] remove throws exception --- .../apache/druid/indexing/common/task/TaskReportSerdeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index ba0974574258..e76356f24bd4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -141,7 +141,7 @@ public void testSerdeForParallelCompactionReport() throws Exception } @Test - public void testEqualsForParallelCompactionReport() throws Exception + public void testEqualsForParallelCompactionReport() { EqualsVerifier.simple(). forClass(ParallelCompactionTaskReportData.class) From 6b2afe41a0b80483119e915bc6eddb78f04ba000 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 20:20:15 -0600 Subject: [PATCH 14/23] missed checkin --- .../wikipedia_compaction_task_parallel.json | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json new file mode 100644 index 000000000000..8616946ebbaa --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json @@ -0,0 +1,22 @@ +{ + "type" : "compact", + "dataSource" : "%%DATASOURCE%%", + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2013-08-31/2013-09-02" + } + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "hashed" + }, + "forceGuaranteedRollup": true, + "maxNumConcurrentSubTasks": 3 + }, + "context" : { + "storeCompactionState" : true + } +} From 7312217726910cdbcf5ce9ad5ecd9087f43a46df Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 20:33:10 -0600 Subject: [PATCH 15/23] checkstyle fix --- .../druid/indexing/common/task/TaskReportSerdeTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index e76356f24bd4..39097de6ef54 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -143,8 +143,8 @@ public void testSerdeForParallelCompactionReport() throws Exception @Test public void testEqualsForParallelCompactionReport() { - EqualsVerifier.simple(). - forClass(ParallelCompactionTaskReportData.class) + EqualsVerifier.simple() + .forClass(ParallelCompactionTaskReportData.class) .verify(); } From 84dec87cff340de387700d0f5c3115d07bda7a9c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 23:09:22 -0600 Subject: [PATCH 16/23] address comments --- docs/ingestion/tasks.md | 10 ++++------ .../common/ParallelCompactionTaskReportData.java | 4 ++++ .../batch/parallel/ParallelIndexSupervisorTask.java | 7 ++++--- .../druid/tests/indexer/ITCompactionTaskTest.java | 4 ---- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index e7855dc041ce..2d77386aa729 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -156,12 +156,10 @@ For some task types, the indexing task can wait for the newly ingested segments |`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| -#### Compaction Task Fields - -| Field | Description | -|---------------------|-------------------------------------------------------| -| `segmentsRead` | # of segments read by a parallel compaction task | -| `segmentsPublished` | # of segments published by a parallel compaction task | +|Field| Description | +|---|---| +| `segmentsRead`| Number of segments read by compaction task with more than 1 subtask.| +| `segmentsPublished`| Number of segments published by compaction task with more than 1 subtask.| ### Live report diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java index c026262e2bb5..4b3feadc5b84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java @@ -26,6 +26,10 @@ import java.util.Map; import java.util.Objects; +/** + * This class has fields specific to the parallel compaction task and hence extends the generic + * task report data {@link org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData} + */ public class ParallelCompactionTaskReportData extends IngestionStatsAndErrorsTaskReportData { @JsonProperty diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 7d9d4cd575a7..20ead92435bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -659,9 +659,10 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception .stream() .mapToLong(report -> report.getOldSegments().size()).sum(); segmentsPublished = parallelSinglePhaseRunner.getReports() - .values() - .stream() - .mapToLong(report -> report.getNewSegments().size()).sum(); + .values() + .stream() + .mapToLong(report -> report.getNewSegments().size()) + .sum(); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index aa25c1dc4e64..8d540227ec9e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -44,7 +44,6 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -271,9 +270,6 @@ private void loadDataAndCompact( Map reports = indexer.getTaskReport(taskId); Assert.assertTrue(reports != null && reports.size() > 0); - - System.out.println(reports.size()); - System.out.println(Arrays.toString(reports.values().stream().map(r -> r.toString()).toArray())); } } From f4a0f0ae3677b2b8f5b5ec54dd4b70f627abb0ed Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 23:44:51 -0600 Subject: [PATCH 17/23] fix intellij annotation check --- .../druid/indexing/common/ParallelCompactionTaskReportData.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java index 4b3feadc5b84..a11b0b4fb03b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java @@ -28,7 +28,7 @@ /** * This class has fields specific to the parallel compaction task and hence extends the generic - * task report data {@link org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData} + * task report data {@link IngestionStatsAndErrorsTaskReportData} */ public class ParallelCompactionTaskReportData extends IngestionStatsAndErrorsTaskReportData { From c5225e0738cafa768c91aca573826736290f640b Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 07:53:18 -0600 Subject: [PATCH 18/23] comments --- docs/ingestion/tasks.md | 6 +++--- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 7 +------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 2d77386aa729..82b325c149fa 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -156,10 +156,10 @@ For some task types, the indexing task can wait for the newly ingested segments |`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| -|Field| Description | +|Field| Description| |---|---| -| `segmentsRead`| Number of segments read by compaction task with more than 1 subtask.| -| `segmentsPublished`| Number of segments published by compaction task with more than 1 subtask.| +|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.| +|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.| ### Live report diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 20ead92435bb..544ab02766ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -653,16 +653,11 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions - publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); + segmentsPublished = publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); segmentsRead = parallelSinglePhaseRunner.getReports() .values() .stream() .mapToLong(report -> report.getOldSegments().size()).sum(); - segmentsPublished = parallelSinglePhaseRunner.getReports() - .values() - .stream() - .mapToLong(report -> report.getNewSegments().size()) - .sum(); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } From 907854e1327a4eb5340b523a7065cf236a506495 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 16:22:07 -0600 Subject: [PATCH 19/23] remove separeate class --- .../IngestionStatsAndErrorsTaskReport.java | 2 +- ...IngestionStatsAndErrorsTaskReportData.java | 45 +++++-- .../ParallelCompactionTaskReportData.java | 113 ------------------ .../AppenderatorDriverRealtimeIndexTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 4 +- .../druid/indexing/common/task/IndexTask.java | 4 +- .../GeneratedPartitionsMetadataReport.java | 5 +- .../parallel/GeneratedPartitionsReport.java | 20 +--- .../parallel/ParallelIndexSupervisorTask.java | 35 +++--- .../PartialHashSegmentGenerateTask.java | 5 +- .../PartialRangeSegmentGenerateTask.java | 5 +- .../parallel/PartialSegmentGenerateTask.java | 14 +-- .../batch/parallel/SinglePhaseSubTask.java | 4 +- .../SeekableStreamIndexTaskRunner.java | 4 +- .../task/CompactionTaskParallelRunTest.java | 5 +- .../common/task/TaskReportSerdeTest.java | 54 +-------- .../ParallelIndexSupervisorTaskTest.java | 4 +- .../tests/indexer/ITCompactionTaskTest.java | 6 +- 18 files changed, 92 insertions(+), 241 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java index 35ae2f669885..7518523b1de0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java @@ -57,7 +57,7 @@ public String getReportKey() } @Override - public Object getPayload() + public IngestionStatsAndErrorsTaskReportData getPayload() { return payload; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index 566297f6a2b5..7c2819836aa8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -19,20 +19,14 @@ package org.apache.druid.indexing.common; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.indexer.IngestionState; import javax.annotation.Nullable; import java.util.Map; import java.util.Objects; -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "kind", defaultImpl = IngestionStatsAndErrorsTaskReport.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "ingestion_stats", value = IngestionStatsAndErrorsTaskReport.class), - @JsonSubTypes.Type(name = "parallel_compaction_stats", value = ParallelCompactionTaskReportData.class) -}) public class IngestionStatsAndErrorsTaskReportData { @JsonProperty @@ -57,6 +51,11 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private Map recordsProcessed; + @JsonProperty + private Long segmentsRead; + @JsonProperty + private Long segmentsPublished; + public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @@ -64,7 +63,9 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("errorMsg") @Nullable String errorMsg, @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, - @JsonProperty("recordsProcessed") Map recordsProcessed + @JsonProperty("recordsProcessed") Map recordsProcessed, + @JsonProperty("segmentsRead") Long segmentsRead, + @JsonProperty("segmentsPublished") Long segmentsPublished ) { this.ingestionState = ingestionState; @@ -74,6 +75,8 @@ public IngestionStatsAndErrorsTaskReportData( this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; this.recordsProcessed = recordsProcessed; + this.segmentsRead = segmentsRead; + this.segmentsPublished = segmentsPublished; } @JsonProperty @@ -120,6 +123,22 @@ public Map getRecordsProcessed() return recordsProcessed; } + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getSegmentsRead() + { + return segmentsRead; + } + + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getSegmentsPublished() + { + return segmentsPublished; + } + public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( Map taskReports ) @@ -144,7 +163,9 @@ public boolean equals(Object o) Objects.equals(getErrorMsg(), that.getErrorMsg()) && Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) && - Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()); + Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) && + Objects.equals(getSegmentsRead(), that.getSegmentsRead()) && + Objects.equals(getSegmentsPublished(), that.getSegmentsPublished()); } @Override @@ -157,7 +178,9 @@ public int hashCode() getErrorMsg(), isSegmentAvailabilityConfirmed(), getSegmentAvailabilityWaitTimeMs(), - getRecordsProcessed() + getRecordsProcessed(), + getSegmentsRead(), + getSegmentsPublished() ); } @@ -172,6 +195,8 @@ public String toString() ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + ", recordsProcessed=" + recordsProcessed + + ", segmentsRead=" + segmentsRead + + ", segmentsPublished=" + segmentsPublished + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java deleted file mode 100644 index a11b0b4fb03b..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/ParallelCompactionTaskReportData.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.indexer.IngestionState; - -import javax.annotation.Nullable; -import java.util.Map; -import java.util.Objects; - -/** - * This class has fields specific to the parallel compaction task and hence extends the generic - * task report data {@link IngestionStatsAndErrorsTaskReportData} - */ -public class ParallelCompactionTaskReportData extends IngestionStatsAndErrorsTaskReportData -{ - @JsonProperty - private Long segmentsRead; - @JsonProperty - private Long segmentsPublished; - - public ParallelCompactionTaskReportData( - @JsonProperty("ingestionState") IngestionState ingestionState, - @JsonProperty("unparseableEvents") Map unparseableEvents, - @JsonProperty("rowStats") Map rowStats, - @JsonProperty("errorMsg") @Nullable String errorMsg, - @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, - @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, - @JsonProperty("recordsProcessed") Map recordsProcessed, - @JsonProperty("segmentsRead") Long segmentsRead, - @JsonProperty("segmentsPublished") Long segmentsPublished - ) - { - super( - ingestionState, - unparseableEvents, - rowStats, - errorMsg, - segmentAvailabilityConfirmed, - segmentAvailabilityWaitTimeMs, - recordsProcessed - ); - this.segmentsRead = segmentsRead; - this.segmentsPublished = segmentsPublished; - } - - @JsonProperty - @Nullable - public Long getSegmentsRead() - { - return segmentsRead; - } - - @JsonProperty - @Nullable - public Long getSegmentsPublished() - { - return segmentsPublished; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ParallelCompactionTaskReportData that = (ParallelCompactionTaskReportData) o; - return super.equals(o) && - Objects.equals(getSegmentsRead(), that.getSegmentsRead()) && - Objects.equals(getSegmentsPublished(), that.getSegmentsPublished()); - } - - @Override - public int hashCode() - { - return Objects.hash( - super.hashCode(), - getSegmentsRead(), - getSegmentsPublished() - ); - } - - @Override - public String toString() - { - return "ParallelCompactionTaskReportData {" + - "IngestionStatsAndErrorsTaskReportData=" + super.toString() + - ", segmentsRead=" + segmentsRead + - ", segmentsPublished=" + segmentsPublished + - '}'; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9ea94b844cc4..e0c7f9bc934f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -621,7 +621,9 @@ private Map getTaskCompletionReports() errorMsg, errorMsg == null, 0L, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 1422720bf94d..019b63520e75 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -694,7 +694,9 @@ private Map getTaskCompletionReports() errorMsg, segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index ce3245944e26..1ba54dffa836 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -599,7 +599,9 @@ private Map getTaskCompletionReports() errorMsg, segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index 0e97f76007f5..a83c3cc74120 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -38,10 +38,9 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport GeneratedPartitionsMetadataReport( @JsonProperty("taskId") String taskId, @JsonProperty("partitionStats") List partitionStats, - @JsonProperty("taskReport") Map taskReport, - @JsonProperty("segmentsRead") Long segmentsRead + @JsonProperty("taskReport") Map taskReport ) { - super(taskId, partitionStats, taskReport, segmentsRead); + super(taskId, partitionStats, taskReport); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index e4d336de4c02..44691f1dc7cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.common.TaskReport; -import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.Objects; @@ -38,19 +37,15 @@ public class GeneratedPartitionsReport implements SubTaskReport private final List partitionStats; private final Map taskReport; - private final Long segmentsRead; - GeneratedPartitionsReport( String taskId, List partitionStats, - Map taskReport, - Long segmentsRead + Map taskReport ) { this.taskId = taskId; this.partitionStats = partitionStats; this.taskReport = taskReport; - this.segmentsRead = segmentsRead; } @Override @@ -72,13 +67,6 @@ public List getPartitionStats() return partitionStats; } - @JsonProperty - @Nullable - public Long getSegmentsRead() - { - return segmentsRead; - } - @Override public boolean equals(Object o) { @@ -91,14 +79,13 @@ public boolean equals(Object o) GeneratedPartitionsReport that = (GeneratedPartitionsReport) o; return Objects.equals(taskId, that.taskId) && Objects.equals(partitionStats, that.partitionStats) && - Objects.equals(taskReport, that.taskReport) && - Objects.equals(segmentsRead, that.segmentsRead); + Objects.equals(taskReport, that.taskReport); } @Override public int hashCode() { - return Objects.hash(taskId, partitionStats, taskReport, segmentsRead); + return Objects.hash(taskId, partitionStats, taskReport); } @Override @@ -108,7 +95,6 @@ public String toString() "taskId='" + taskId + '\'' + ", partitionStats=" + partitionStats + ", taskReport=" + taskReport + - ", segmentsRead=" + segmentsRead + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 544ab02766ce..9ddc11a1cd50 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -42,7 +42,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; -import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -654,10 +653,16 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception if (state.isSuccess()) { //noinspection ConstantConditions segmentsPublished = publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); - segmentsRead = parallelSinglePhaseRunner.getReports() - .values() - .stream() - .mapToLong(report -> report.getOldSegments().size()).sum(); + if (isCompactionTask) { + // segements are only read for compactiont tasks. For `index_parallel` + // tasks this would result to 0, but we want to rather have it as null + // because segmentsRead is not applicable for such tasks. + segmentsRead = parallelSinglePhaseRunner.getReports() + .values() + .stream() + .mapToLong(report -> report.getOldSegments().size()).sum(); + } + if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } @@ -1257,8 +1262,7 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( getId(), - isCompactionTask ? - new ParallelCompactionTaskReportData( + new IngestionStatsAndErrorsTaskReportData( IngestionState.COMPLETED, rowStatsAndUnparseableEvents.rhs, rowStatsAndUnparseableEvents.lhs, @@ -1268,15 +1272,6 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, Collections.emptyMap(), segmentsRead, segmentsPublished - ) : - new IngestionStatsAndErrorsTaskReportData( - IngestionState.COMPLETED, - rowStatsAndUnparseableEvents.rhs, - rowStatsAndUnparseableEvents.lhs, - taskStatus.getErrorMsg(), - segmentAvailabilityConfirmed, - segmentAvailabilityWaitTimeMs, - Collections.emptyMap() ) ) ); @@ -1665,8 +1660,12 @@ private Pair, Map> doGetRowStatsAndUnparseab getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); - if (generatedPartitionsReport.getSegmentsRead() != null) { - totalSegmentsRead += generatedPartitionsReport.getSegmentsRead(); + + Long segmentsRead = ((IngestionStatsAndErrorsTaskReport) + taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) + ).getPayload().getSegmentsRead(); + if (segmentsRead != null) { + totalSegmentsRead += segmentsRead; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 5b3202a9e9dc..74cf75705543 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -180,14 +180,13 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, - Map taskReport, - Long segmentsRead + Map taskReport ) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); - return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport, segmentsRead); + return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 17090d13b1fa..4a9f3be68293 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -195,13 +195,12 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, - Map taskReport, - Long segmentsRead + Map taskReport ) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); - return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport, segmentsRead); + return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 96f39725b8ef..6a7eff1271aa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -126,13 +126,12 @@ public final TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getIndexingTmpDir() ); - Map taskReport = getTaskCompletionReports(); + Map taskReport = getTaskCompletionReports(getSegementsSize(inputSource)); taskClient.report(createGeneratedPartitionsReport( toolbox, segments, - taskReport, - getSegementsSize(inputSource) + taskReport )); return TaskStatus.success(getId()); @@ -152,8 +151,7 @@ abstract SegmentAllocatorForBatch createSegmentAllocator( abstract T createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, - Map taskReport, - Long segmentsRead + Map taskReport ); private Long getSegementsSize(InputSource inputSource) @@ -252,7 +250,7 @@ private List generateSegments( /** * Generate an IngestionStatsAndErrorsTaskReport for the task. */ - private Map getTaskCompletionReports() + private Map getTaskCompletionReports(Long segmentsRead) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( @@ -264,7 +262,9 @@ private Map getTaskCompletionReports() "", false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + segmentsRead, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 8d9bd8f7b6d6..bbd3f2964b61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -644,7 +644,9 @@ private Map getTaskCompletionReports() errorMsg, false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index c7f77cea5662..09e89b1d6019 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1130,7 +1130,9 @@ private Map getTaskCompletionReports(@Nullable String errorM errorMsg, errorMsg == null, handoffWaitMs, - getPartitionStats() + getPartitionStats(), + null, + null ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 303dbe581dd4..9b68cebe9eeb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -41,7 +41,6 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.TaskToolbox; @@ -255,12 +254,12 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc // this test reads 3 segments and publishes 6 segments Assert.assertEquals( 3, - reports.stream().mapToLong(r -> ((ParallelCompactionTaskReportData) r).getSegmentsRead()).sum() + reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum() ); Assert.assertEquals( 6, reports.stream() - .mapToLong(r -> ((ParallelCompactionTaskReportData) r).getSegmentsPublished()) + .mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished) .sum() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 39097de6ef54..5c0e7e868c61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -25,11 +25,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; -import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; -import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TestUtils; @@ -62,46 +60,6 @@ public void testSerde() throws Exception IngestionStatsAndErrorsTaskReport report1 = new IngestionStatsAndErrorsTaskReport( "testID", new IngestionStatsAndErrorsTaskReportData( - IngestionState.BUILD_SEGMENTS, - ImmutableMap.of( - "hello", "world" - ), - ImmutableMap.of( - "number", 1234 - ), - "an error message", - true, - 1000L, - ImmutableMap.of("PartitionA", 5000L) - ) - ); - String report1serialized = jsonMapper.writeValueAsString(report1); - IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue( - report1serialized, - TaskReport.class - ); - Assert.assertEquals(report1, report2); - Assert.assertEquals(report1.hashCode(), report2.hashCode()); - - final File reportFile = temporaryFolder.newFile(); - final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); - writer.setObjectMapper(jsonMapper); - Map reportMap1 = TaskReport.buildTaskReports(report1); - writer.write("testID", reportMap1); - - Map reportMap2 = jsonMapper.readValue( - reportFile, - new TypeReference>() {} - ); - Assert.assertEquals(reportMap1, reportMap2); - } - - @Test - public void testSerdeForParallelCompactionReport() throws Exception - { - IngestionStatsAndErrorsTaskReport report1 = new IngestionStatsAndErrorsTaskReport( - "testID", - new ParallelCompactionTaskReportData( IngestionState.BUILD_SEGMENTS, ImmutableMap.of( "hello", "world" @@ -133,21 +91,11 @@ public void testSerdeForParallelCompactionReport() throws Exception Map reportMap2 = jsonMapper.readValue( reportFile, - new TypeReference>() - { - } + new TypeReference>() {} ); Assert.assertEquals(reportMap1, reportMap2); } - @Test - public void testEqualsForParallelCompactionReport() - { - EqualsVerifier.simple() - .forClass(ParallelCompactionTaskReportData.class) - .verify(); - } - @Test public void testExceptionWhileWritingReport() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 17d19dd0d894..6fb56aa33f7f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -450,13 +450,13 @@ public void test_getPartitionToLocations_ordersPartitionsCorrectly() createRangePartitionStat(day2, 7), createRangePartitionStat(day1, 0), createRangePartitionStat(day2, 1) - ), null, 1L)); + ), null)); taskIdToReport.put(task2, new GeneratedPartitionsReport(task2, Arrays.asList( createRangePartitionStat(day1, 4), createRangePartitionStat(day1, 6), createRangePartitionStat(day2, 1), createRangePartitionStat(day1, 1) - ), null, 1L)); + ), null)); Map> partitionToLocations = ParallelIndexSupervisorTask.getPartitionToLocations(taskIdToReport); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 8d540227ec9e..296f7fe747a2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -22,7 +22,7 @@ import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.ParallelCompactionTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -176,13 +176,13 @@ public void testParallelHashedCompaction() throws Exception Assert.assertEquals(2, reports.values() .stream() - .mapToLong(r -> ((ParallelCompactionTaskReportData) r.getPayload()).getSegmentsPublished()) + .mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished()) .sum() ); Assert.assertEquals(4, reports.values() .stream() - .mapToLong(r -> ((ParallelCompactionTaskReportData) r.getPayload()).getSegmentsRead()) + .mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead()) .sum() ); } From 02798ca536959369026ee597462e8c184f036983 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 22:29:56 -0600 Subject: [PATCH 20/23] minor edits --- .../task/batch/parallel/GeneratedPartitionsReport.java | 6 +----- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 6 +++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index 44691f1dc7cc..1fa025d1c91f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -37,11 +37,7 @@ public class GeneratedPartitionsReport implements SubTaskReport private final List partitionStats; private final Map taskReport; - GeneratedPartitionsReport( - String taskId, - List partitionStats, - Map taskReport - ) + GeneratedPartitionsReport(String taskId, List partitionStats, Map taskReport) { this.taskId = taskId; this.partitionStats = partitionStats; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 9ddc11a1cd50..93f1c54e961f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1661,11 +1661,11 @@ private Pair, Map> doGetRowStatsAndUnparseab buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); - Long segmentsRead = ((IngestionStatsAndErrorsTaskReport) + Long currSegmentsRead = ((IngestionStatsAndErrorsTaskReport) taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) ).getPayload().getSegmentsRead(); - if (segmentsRead != null) { - totalSegmentsRead += segmentsRead; + if (currSegmentsRead != null) { + totalSegmentsRead += currSegmentsRead; } } From cabbc724e817b5985eafecae8bed303edb476c0b Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 6 Mar 2024 08:02:10 -0600 Subject: [PATCH 21/23] comments --- docs/ingestion/tasks.md | 4 +++- .../IngestionStatsAndErrorsTaskReportData.java | 4 ++-- .../parallel/ParallelIndexSupervisorTask.java | 17 ++++++----------- .../parallel/PartialSegmentGenerateTask.java | 16 ++++++++-------- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 82b325c149fa..4b6153fa26ac 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -156,7 +156,9 @@ For some task types, the indexing task can wait for the newly ingested segments |`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| -|Field| Description| +#### Compaction task segment info fields + +|Field|Description| |---|---| |`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.| |`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index 7c2819836aa8..97ea58e1c5c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -64,8 +64,8 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, @JsonProperty("recordsProcessed") Map recordsProcessed, - @JsonProperty("segmentsRead") Long segmentsRead, - @JsonProperty("segmentsPublished") Long segmentsPublished + @Nullable @JsonProperty("segmentsRead") Long segmentsRead, + @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished ) { this.ingestionState = ingestionState; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 93f1c54e961f..a824948dc0f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -652,11 +652,8 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions - segmentsPublished = publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); if (isCompactionTask) { - // segements are only read for compactiont tasks. For `index_parallel` - // tasks this would result to 0, but we want to rather have it as null - // because segmentsRead is not applicable for such tasks. + // Populate segmentsRead only for compaction tasks segmentsRead = parallelSinglePhaseRunner.getReports() .values() .stream() @@ -834,7 +831,6 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions - segmentsPublished = publishSegments(toolbox, mergeRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(mergeRunner.getReports()); } @@ -932,7 +928,6 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep TaskState mergeState = runNextPhase(mergeRunner); TaskStatus taskStatus; if (mergeState.isSuccess()) { - segmentsPublished = publishSegments(toolbox, mergeRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(mergeRunner.getReports()); } @@ -1144,7 +1139,7 @@ private static Pair getPartitionBoundaries(int index, int tota return Pair.of(start, stop); } - private long publishSegments( + private void publishSegments( TaskToolbox toolbox, Map reportsMap ) @@ -1213,7 +1208,7 @@ private long publishSegments( throw new ISE("Failed to publish segments"); } - return newSegments.size(); + segmentsPublished = (long) newSegments.size(); } private TaskStatus runSequential(TaskToolbox toolbox) throws Exception @@ -1661,11 +1656,11 @@ private Pair, Map> doGetRowStatsAndUnparseab buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); - Long currSegmentsRead = ((IngestionStatsAndErrorsTaskReport) + Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport) taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) ).getPayload().getSegmentsRead(); - if (currSegmentsRead != null) { - totalSegmentsRead += currSegmentsRead; + if (segmentsReadFromPartition != null) { + totalSegmentsRead += segmentsReadFromPartition; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 6a7eff1271aa..1cb3d36ad759 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder; +import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.query.DruidMetrics; @@ -126,13 +127,9 @@ public final TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getIndexingTmpDir() ); - Map taskReport = getTaskCompletionReports(getSegementsSize(inputSource)); + Map taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource)); - taskClient.report(createGeneratedPartitionsReport( - toolbox, - segments, - taskReport - )); + taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport)); return TaskStatus.success(getId()); } @@ -154,10 +151,13 @@ abstract T createGeneratedPartitionsReport( Map taskReport ); - private Long getSegementsSize(InputSource inputSource) + private Long getNumSegmentsRead(InputSource inputSource) { if (inputSource instanceof DruidInputSource) { - return (long) ((DruidInputSource) inputSource).getSegmentIds().size(); + List segments = ((DruidInputSource) inputSource).getSegmentIds(); + if (segments != null) { + return (long) segments.size(); + } } return null; From 6296d565bb0468f9f9ca321a33b41b0632ce06c8 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 6 Mar 2024 09:45:21 -0600 Subject: [PATCH 22/23] fix unwanted deleteions --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index a824948dc0f0..31472f64bee2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -652,6 +652,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions + publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); if (isCompactionTask) { // Populate segmentsRead only for compaction tasks segmentsRead = parallelSinglePhaseRunner.getReports() @@ -831,6 +832,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions + publishSegments(toolbox, mergeRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(mergeRunner.getReports()); } @@ -928,6 +930,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep TaskState mergeState = runNextPhase(mergeRunner); TaskStatus taskStatus; if (mergeState.isSuccess()) { + publishSegments(toolbox, mergeRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(mergeRunner.getReports()); } From 98ec4b78e89bb95438000ba4ecf5d4d5fa22b114 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 6 Mar 2024 09:48:14 -0600 Subject: [PATCH 23/23] remove unwanted changes --- .../task/batch/parallel/PartialHashSegmentGenerateTask.java | 6 +----- .../batch/parallel/PartialRangeSegmentGenerateTask.java | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 74cf75705543..49e3591ff18a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -177,11 +177,7 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd } @Override - GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( - TaskToolbox toolbox, - List segments, - Map taskReport - ) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 4a9f3be68293..27604eb7e770 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -192,11 +192,7 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd } @Override - GeneratedPartitionsMetadataReport createGeneratedPartitionsReport( - TaskToolbox toolbox, - List segments, - Map taskReport - ) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))