From 1218976abbdb8d89717cc2ae04e39b6a31e911f1 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 4 Mar 2024 23:43:48 -0600 Subject: [PATCH 01/10] initial commit --- .../indexing/common/task/CompactionTask.java | 12 ++++---- .../druid/indexing/common/task/IndexTask.java | 29 ++++++++++--------- .../parallel/ParallelIndexSupervisorTask.java | 28 ++++++++---------- .../indexing/common/task/IndexTaskTest.java | 4 ++- 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 776038b50a1f..1295ed6edd06 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -503,7 +503,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception int failCnt = 0; Map completionReports = new HashMap<>(); - for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) { + for (int i = 0; i < indexTaskSpecs.size(); i++) { + ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i); final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); if (!currentSubTaskHolder.setTask(eachSpec)) { String errMsg = "Task was asked to stop. Finish as failed."; @@ -518,9 +519,11 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception failCnt++; log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json); } + + String reportKeyPrefix = "_" + i; Optional.ofNullable(eachSpec.getCompletionReports()) .ifPresent(reports -> completionReports.putAll( - CollectionUtils.mapKeys(reports, key -> getReportkey(eachSpec.getBaseSubtaskSpecName(), key)))); + CollectionUtils.mapKeys(reports, key -> key + reportKeyPrefix))); } else { failCnt++; log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json); @@ -572,11 +575,6 @@ private String createIndexTaskSpecId(int i) return StringUtils.format("%s_%d", getId(), i); } - private String getReportkey(String baseSequenceName, String currentKey) - { - return StringUtils.format("%s_%s", currentKey, baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1)); - } - /** * Generate {@link ParallelIndexIngestionSpec} from input segments. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index ce3245944e26..cfdf998ec78e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -169,9 +169,8 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode private IngestionState ingestionState; - // used to specify if indextask.run() is run as a part of another task - // skips writing reports and cleanup if not a standalone task - private boolean isStandAloneTask; + private boolean shouldCleanup; + private boolean shouldSendReports; @MonotonicNonNull private ParseExceptionHandler determinePartitionsParseExceptionHandler; @@ -213,6 +212,7 @@ public IndexTask( ingestionSchema, context, -1, + true, true ); } @@ -226,7 +226,8 @@ public IndexTask( IndexIngestionSpec ingestionSchema, Map context, int maxAllowedLockCount, - boolean isStandAloneTask + boolean shouldCleanup, + boolean shouldSendReports ) { super( @@ -241,7 +242,8 @@ public IndexTask( this.baseSequenceName = baseSequenceName == null ? getId() : baseSequenceName; this.ingestionSchema = ingestionSchema; this.ingestionState = IngestionState.NOT_STARTED; - this.isStandAloneTask = isStandAloneTask; + this.shouldCleanup = shouldCleanup; + this.shouldSendReports = shouldSendReports; } @Override @@ -567,8 +569,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) catch (Exception e) { log.error(e, "Encountered exception in %s.", ingestionState); errorMsg = Throwables.getStackTraceAsString(e); - completionReports = getTaskCompletionReports(); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox); return TaskStatus.failure( getId(), errorMsg @@ -580,9 +581,11 @@ public TaskStatus runTask(final TaskToolbox toolbox) } } - private void writeCompletionReports(TaskToolbox toolbox) + + private void updateAndWriteCompletionReports(TaskToolbox toolbox) { - if (isStandAloneTask) { + completionReports = getTaskCompletionReports(); + if (shouldSendReports) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } @@ -1043,8 +1046,7 @@ private TaskStatus generateAndPublishSegments( if (published == null) { log.error("Failed to publish segments, aborting!"); errorMsg = "Failed to publish segments."; - completionReports = getTaskCompletionReports(); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox); return TaskStatus.failure( getId(), errorMsg @@ -1067,8 +1069,7 @@ private TaskStatus generateAndPublishSegments( log.debugSegments(published.getSegments(), "Published segments"); - completionReports = getTaskCompletionReports(); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox); return TaskStatus.success(getId()); } } @@ -1110,7 +1111,7 @@ private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema) @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { - if (isStandAloneTask) { + if (shouldCleanup) { super.cleanUp(toolbox, taskStatus); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index d7980e9b3716..c3de69648601 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -203,7 +203,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private IngestionState ingestionState; private Map completionReports; - private final Boolean isCompactionTask; + private final boolean isCompactionTask; @JsonCreator @@ -303,13 +303,6 @@ public Map getCompletionReports() return completionReports; } - @Nullable - @JsonIgnore - public String getBaseSubtaskSpecName() - { - return baseSubtaskSpecName; - } - @JsonProperty("spec") public ParallelIndexIngestionSpec getIngestionSchema() { @@ -669,8 +662,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception } taskStatus = TaskStatus.failure(getId(), errorMessage); } - completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox, taskStatus, segmentAvailabilityConfirmationCompleted); return taskStatus; } @@ -837,8 +829,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except taskStatus = TaskStatus.failure(getId(), errMsg); } - completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox, taskStatus, segmentAvailabilityConfirmationCompleted); return taskStatus; } @@ -935,8 +926,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep taskStatus = TaskStatus.failure(getId(), errMsg); } - completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox, taskStatus, segmentAvailabilityConfirmationCompleted); return taskStatus; } @@ -1218,7 +1208,8 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception getContext(), getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(), // Don't run cleanup in the IndexTask since we are wrapping it in the ParallelIndexSupervisorTask - false + false, + !isCompactionTask ); if (currentSubTaskHolder.setTask(sequentialIndexTask) @@ -1261,8 +1252,13 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, ); } - private void writeCompletionReports(TaskToolbox toolbox) + private void updateAndWriteCompletionReports( + TaskToolbox toolbox, + TaskStatus status, + boolean segmentAvailabilityConfirmationCompleted + ) { + completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted); if (!isCompactionTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index b60ddb80970c..48ea03034c0a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -2714,7 +2714,8 @@ public void testCleanupIndexTask() throws Exception ), null, 0, - false + false, + true ).cleanUp(null, null); } @@ -2750,6 +2751,7 @@ public void testCleanup() throws Exception ), null, 0, + true, true ).cleanUp(toolbox, null); EasyMock.verify(toolbox, taskConfig); From dc0f4fb5d3a863a7ffea65439a9113eed4f3cd77 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 10:05:31 -0600 Subject: [PATCH 02/10] comments --- .../druid/indexing/common/task/IndexTask.java | 5 ++ .../parallel/ParallelIndexSupervisorTask.java | 11 ++--- .../common/task/TaskReportSerdeTest.java | 46 +++++++++++++++++++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index cfdf998ec78e..5fc870713845 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -170,6 +170,11 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode private IngestionState ingestionState; private boolean shouldCleanup; + + // There are cases where index task is not run as a standalone task and the + // generated completion reports are written by parent. In such cases, this + // flag would be helpful to specify that the child index task should not + // publish reports. private boolean shouldSendReports; @MonotonicNonNull diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index c3de69648601..3f3359d4cd37 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -225,7 +225,7 @@ public ParallelIndexSupervisorTask( ParallelIndexIngestionSpec ingestionSchema, @Nullable String baseSubtaskSpecName, Map context, - Boolean isCompactionTask + boolean isCompactionTask ) { super( @@ -662,7 +662,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception } taskStatus = TaskStatus.failure(getId(), errorMessage); } - updateAndWriteCompletionReports(toolbox, taskStatus, segmentAvailabilityConfirmationCompleted); + updateAndWriteCompletionReports(toolbox, taskStatus); return taskStatus; } @@ -829,7 +829,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except taskStatus = TaskStatus.failure(getId(), errMsg); } - updateAndWriteCompletionReports(toolbox, taskStatus, segmentAvailabilityConfirmationCompleted); + updateAndWriteCompletionReports(toolbox, taskStatus); return taskStatus; } @@ -926,7 +926,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep taskStatus = TaskStatus.failure(getId(), errMsg); } - updateAndWriteCompletionReports(toolbox, taskStatus, segmentAvailabilityConfirmationCompleted); + updateAndWriteCompletionReports(toolbox, taskStatus); return taskStatus; } @@ -1254,8 +1254,7 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, private void updateAndWriteCompletionReports( TaskToolbox toolbox, - TaskStatus status, - boolean segmentAvailabilityConfirmationCompleted + TaskStatus status ) { completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 13a850f5ee74..c0166d141b00 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -94,6 +94,52 @@ public void testSerde() throws Exception Assert.assertEquals(reportMap1, reportMap2); } + @Test + public void testSerializationOnMissingPartitionStats() throws Exception + { + String json = "{\n" + + " \"type\": \"ingestionStatsAndErrors\",\n" + + " \"taskId\": \"ingestionStatsAndErrors\",\n" + + " \"payload\": {\n" + + " \"ingestionState\": \"COMPLETED\",\n" + + " \"unparseableEvents\": {\n" + + " \"hello\": \"world\"\n" + + " },\n" + + " \"rowStats\": {\n" + + " \"number\": 1234\n" + + " },\n" + + " \"errorMsg\": \"an error message\",\n" + + " \"segmentAvailabilityConfirmed\": true,\n" + + " \"segmentAvailabilityWaitTimeMs\": 1000\n" + + " }\n" + + "}"; + + IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport( + IngestionStatsAndErrorsTaskReport.REPORT_KEY, + new IngestionStatsAndErrorsTaskReportData( + IngestionState.COMPLETED, + ImmutableMap.of( + "hello", "world" + ), + ImmutableMap.of( + "number", 1234 + ), + "an error message", + true, + 1000L, + null + ) + ); + + + Assert.assertEquals(expected, jsonMapper.readValue( + json, + new TypeReference() + { + } + )); + } + @Test public void testExceptionWhileWritingReport() throws Exception { From 3ab1b975128ec3ccc4276963097a2c1cc045e084 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 10:09:52 -0600 Subject: [PATCH 03/10] typo --- .../org/apache/druid/indexing/common/task/CompactionTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 1295ed6edd06..59a0a499f917 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -520,10 +520,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json); } - String reportKeyPrefix = "_" + i; + String reportKeySuffix = "_" + i; Optional.ofNullable(eachSpec.getCompletionReports()) .ifPresent(reports -> completionReports.putAll( - CollectionUtils.mapKeys(reports, key -> key + reportKeyPrefix))); + CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix))); } else { failCnt++; log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json); From a3f1b1bbb9e1e4ee95e88fd3393983498385a65c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 12:15:47 -0600 Subject: [PATCH 04/10] comments --- .../apache/druid/indexing/common/task/IndexTask.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5fc870713845..9be8a1a47d1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -174,8 +174,8 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode // There are cases where index task is not run as a standalone task and the // generated completion reports are written by parent. In such cases, this // flag would be helpful to specify that the child index task should not - // publish reports. - private boolean shouldSendReports; + // write reports. + private boolean shouldWriteReports; @MonotonicNonNull private ParseExceptionHandler determinePartitionsParseExceptionHandler; @@ -232,7 +232,7 @@ public IndexTask( Map context, int maxAllowedLockCount, boolean shouldCleanup, - boolean shouldSendReports + boolean shouldWriteReports ) { super( @@ -248,7 +248,7 @@ public IndexTask( this.ingestionSchema = ingestionSchema; this.ingestionState = IngestionState.NOT_STARTED; this.shouldCleanup = shouldCleanup; - this.shouldSendReports = shouldSendReports; + this.shouldWriteReports = shouldWriteReports; } @Override @@ -590,7 +590,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) private void updateAndWriteCompletionReports(TaskToolbox toolbox) { completionReports = getTaskCompletionReports(); - if (shouldSendReports) { + if (shouldWriteReports) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } From db7ab3094e59e24e009410843cf7cba2c7600b58 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 12:33:12 -0600 Subject: [PATCH 05/10] comments --- .../druid/indexing/common/task/IndexTask.java | 24 ++++++++----------- .../parallel/ParallelIndexSupervisorTask.java | 9 +++++-- .../indexing/common/task/IndexTaskTest.java | 4 +--- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 9be8a1a47d1d..9c4c1f0e1e5e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -169,13 +169,7 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode private IngestionState ingestionState; - private boolean shouldCleanup; - - // There are cases where index task is not run as a standalone task and the - // generated completion reports are written by parent. In such cases, this - // flag would be helpful to specify that the child index task should not - // write reports. - private boolean shouldWriteReports; + private boolean isStandAloneTask; @MonotonicNonNull private ParseExceptionHandler determinePartitionsParseExceptionHandler; @@ -217,11 +211,15 @@ public IndexTask( ingestionSchema, context, -1, - true, true ); } + /** + * @param isStandAloneTask used to specify if indextask.run() is run as a part of another task + * skips writing reports and cleanup if not a standalone task + */ + public IndexTask( String id, String groupId, @@ -231,8 +229,7 @@ public IndexTask( IndexIngestionSpec ingestionSchema, Map context, int maxAllowedLockCount, - boolean shouldCleanup, - boolean shouldWriteReports + boolean isStandAloneTask ) { super( @@ -247,8 +244,7 @@ public IndexTask( this.baseSequenceName = baseSequenceName == null ? getId() : baseSequenceName; this.ingestionSchema = ingestionSchema; this.ingestionState = IngestionState.NOT_STARTED; - this.shouldCleanup = shouldCleanup; - this.shouldWriteReports = shouldWriteReports; + this.isStandAloneTask = isStandAloneTask; } @Override @@ -590,7 +586,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) private void updateAndWriteCompletionReports(TaskToolbox toolbox) { completionReports = getTaskCompletionReports(); - if (shouldWriteReports) { + if (isStandAloneTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } @@ -1116,7 +1112,7 @@ private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema) @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { - if (shouldCleanup) { + if (isStandAloneTask) { super.cleanUp(toolbox, taskStatus); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 3f3359d4cd37..0542c13794c5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1208,14 +1208,14 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception getContext(), getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(), // Don't run cleanup in the IndexTask since we are wrapping it in the ParallelIndexSupervisorTask - false, - !isCompactionTask + false ); if (currentSubTaskHolder.setTask(sequentialIndexTask) && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) { TaskStatus status = sequentialIndexTask.run(toolbox); completionReports = sequentialIndexTask.getCompletionReports(); + writeCompletionReports(); return status; } else { String msg = "Task was asked to stop. Finish as failed"; @@ -1258,6 +1258,11 @@ private void updateAndWriteCompletionReports( ) { completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted); + writeCompletionReports(); + } + + private void writeCompletionReports() + { if (!isCompactionTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 48ea03034c0a..b60ddb80970c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -2714,8 +2714,7 @@ public void testCleanupIndexTask() throws Exception ), null, 0, - false, - true + false ).cleanUp(null, null); } @@ -2751,7 +2750,6 @@ public void testCleanup() throws Exception ), null, 0, - true, true ).cleanUp(toolbox, null); EasyMock.verify(toolbox, taskConfig); From ddafad3c56703d0673fa47b75daabee5c80b5868 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 13:54:36 -0600 Subject: [PATCH 06/10] remove var --- .../batch/parallel/ParallelIndexSupervisorTask.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 0542c13794c5..547f1782d5a0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -662,7 +662,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception } taskStatus = TaskStatus.failure(getId(), errorMessage); } - updateAndWriteCompletionReports(toolbox, taskStatus); + updateAndWriteCompletionReports(taskStatus); return taskStatus; } @@ -829,7 +829,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except taskStatus = TaskStatus.failure(getId(), errMsg); } - updateAndWriteCompletionReports(toolbox, taskStatus); + updateAndWriteCompletionReports(taskStatus); return taskStatus; } @@ -926,7 +926,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep taskStatus = TaskStatus.failure(getId(), errMsg); } - updateAndWriteCompletionReports(toolbox, taskStatus); + updateAndWriteCompletionReports(taskStatus); return taskStatus; } @@ -1252,10 +1252,7 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, ); } - private void updateAndWriteCompletionReports( - TaskToolbox toolbox, - TaskStatus status - ) + private void updateAndWriteCompletionReports(TaskStatus status) { completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted); writeCompletionReports(); From 9ba336057549938044d2a6173cf50104113cd625 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 15:44:25 -0600 Subject: [PATCH 07/10] initialize global var early --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 547f1782d5a0..e5b89f4104a2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -527,13 +527,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception try { initializeSubTaskCleaner(); + this.toolbox = toolbox; if (isParallelMode()) { // emit metric for parallel batch ingestion mode: emitMetric(toolbox.getEmitter(), "ingest/count", 1); - this.toolbox = toolbox; - if (isGuaranteedRollup( getIngestionMode(), ingestionSchema.getTuningConfig() From 92101a0e40807faf22d62026eb8c3e2381d46f94 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Mar 2024 20:50:09 -0600 Subject: [PATCH 08/10] remove new line --- .../java/org/apache/druid/indexing/common/task/IndexTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 9c4c1f0e1e5e..0c9bc57aa50c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -219,7 +219,6 @@ public IndexTask( * @param isStandAloneTask used to specify if indextask.run() is run as a part of another task * skips writing reports and cleanup if not a standalone task */ - public IndexTask( String id, String groupId, From ccd388ddb5326d366301f34c4d16836edb0de550 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 6 Mar 2024 09:27:54 -0600 Subject: [PATCH 09/10] small test fix --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 6 ++++++ .../task/batch/parallel/HashPartitionTaskKillTest.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index e5b89f4104a2..162aa30c6980 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1838,6 +1838,12 @@ static Map getTaskReport(final OverlordClient overlordClient, fi } } + @VisibleForTesting + public void setToolbox(TaskToolbox toolbox) + { + this.toolbox = toolbox; + } + /** * Represents a partition uniquely identified by an Interval and a bucketId. * diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java index b9d5114a1e64..e71dc6db36d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java @@ -202,7 +202,7 @@ public void failsInThirdPhase() throws Exception Assert.assertTrue(task.isReady(actionClient)); task.stopGracefully(null); - + task.setToolbox(toolbox); TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox); Assert.assertTrue(taskStatus.isFailure()); From 9e87ae92790d2a793efa90250ac405337c3b6c08 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 6 Mar 2024 09:30:03 -0600 Subject: [PATCH 10/10] same fix another test --- .../common/task/batch/parallel/RangePartitionTaskKillTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java index 43b0ac902c1f..97e4f54c06ef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java @@ -211,7 +211,7 @@ public void failsThirdPhase() throws Exception Assert.assertTrue(task.isReady(actionClient)); task.stopGracefully(null); - + task.setToolbox(toolbox); TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox); Assert.assertTrue(taskStatus.isFailure());