diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 776038b50a1f..59a0a499f917 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -503,7 +503,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception int failCnt = 0; Map completionReports = new HashMap<>(); - for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) { + for (int i = 0; i < indexTaskSpecs.size(); i++) { + ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i); final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); if (!currentSubTaskHolder.setTask(eachSpec)) { String errMsg = "Task was asked to stop. Finish as failed."; @@ -518,9 +519,11 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception failCnt++; log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json); } + + String reportKeySuffix = "_" + i; Optional.ofNullable(eachSpec.getCompletionReports()) .ifPresent(reports -> completionReports.putAll( - CollectionUtils.mapKeys(reports, key -> getReportkey(eachSpec.getBaseSubtaskSpecName(), key)))); + CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix))); } else { failCnt++; log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json); @@ -572,11 +575,6 @@ private String createIndexTaskSpecId(int i) return StringUtils.format("%s_%d", getId(), i); } - private String getReportkey(String baseSequenceName, String currentKey) - { - return StringUtils.format("%s_%s", currentKey, baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1)); - } - /** * Generate {@link ParallelIndexIngestionSpec} from input segments. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index ce3245944e26..0c9bc57aa50c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -169,8 +169,6 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode private IngestionState ingestionState; - // used to specify if indextask.run() is run as a part of another task - // skips writing reports and cleanup if not a standalone task private boolean isStandAloneTask; @MonotonicNonNull @@ -217,6 +215,10 @@ public IndexTask( ); } + /** + * @param isStandAloneTask used to specify if indextask.run() is run as a part of another task + * skips writing reports and cleanup if not a standalone task + */ public IndexTask( String id, String groupId, @@ -567,8 +569,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) catch (Exception e) { log.error(e, "Encountered exception in %s.", ingestionState); errorMsg = Throwables.getStackTraceAsString(e); - completionReports = getTaskCompletionReports(); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox); return TaskStatus.failure( getId(), errorMsg @@ -580,8 +581,10 @@ public TaskStatus runTask(final TaskToolbox toolbox) } } - private void writeCompletionReports(TaskToolbox toolbox) + + private void updateAndWriteCompletionReports(TaskToolbox toolbox) { + completionReports = getTaskCompletionReports(); if (isStandAloneTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } @@ -1043,8 +1046,7 @@ private TaskStatus generateAndPublishSegments( if (published == null) { log.error("Failed to publish segments, aborting!"); errorMsg = "Failed to publish segments."; - completionReports = getTaskCompletionReports(); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox); return TaskStatus.failure( getId(), errorMsg @@ -1067,8 +1069,7 @@ private TaskStatus generateAndPublishSegments( log.debugSegments(published.getSegments(), "Published segments"); - completionReports = getTaskCompletionReports(); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(toolbox); return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index d7980e9b3716..162aa30c6980 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -203,7 +203,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private IngestionState ingestionState; private Map completionReports; - private final Boolean isCompactionTask; + private final boolean isCompactionTask; @JsonCreator @@ -225,7 +225,7 @@ public ParallelIndexSupervisorTask( ParallelIndexIngestionSpec ingestionSchema, @Nullable String baseSubtaskSpecName, Map context, - Boolean isCompactionTask + boolean isCompactionTask ) { super( @@ -303,13 +303,6 @@ public Map getCompletionReports() return completionReports; } - @Nullable - @JsonIgnore - public String getBaseSubtaskSpecName() - { - return baseSubtaskSpecName; - } - @JsonProperty("spec") public ParallelIndexIngestionSpec getIngestionSchema() { @@ -534,13 +527,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception try { initializeSubTaskCleaner(); + this.toolbox = toolbox; if (isParallelMode()) { // emit metric for parallel batch ingestion mode: emitMetric(toolbox.getEmitter(), "ingest/count", 1); - this.toolbox = toolbox; - if (isGuaranteedRollup( getIngestionMode(), ingestionSchema.getTuningConfig() @@ -669,8 +661,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception } taskStatus = TaskStatus.failure(getId(), errorMessage); } - completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(taskStatus); return taskStatus; } @@ -837,8 +828,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except taskStatus = TaskStatus.failure(getId(), errMsg); } - completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(taskStatus); return taskStatus; } @@ -935,8 +925,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep taskStatus = TaskStatus.failure(getId(), errMsg); } - completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted); - writeCompletionReports(toolbox); + updateAndWriteCompletionReports(taskStatus); return taskStatus; } @@ -1225,6 +1214,7 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) { TaskStatus status = sequentialIndexTask.run(toolbox); completionReports = sequentialIndexTask.getCompletionReports(); + writeCompletionReports(); return status; } else { String msg = "Task was asked to stop. Finish as failed"; @@ -1261,7 +1251,13 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, ); } - private void writeCompletionReports(TaskToolbox toolbox) + private void updateAndWriteCompletionReports(TaskStatus status) + { + completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted); + writeCompletionReports(); + } + + private void writeCompletionReports() { if (!isCompactionTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); @@ -1842,6 +1838,12 @@ static Map getTaskReport(final OverlordClient overlordClient, fi } } + @VisibleForTesting + public void setToolbox(TaskToolbox toolbox) + { + this.toolbox = toolbox; + } + /** * Represents a partition uniquely identified by an Interval and a bucketId. * diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 13a850f5ee74..c0166d141b00 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -94,6 +94,52 @@ public void testSerde() throws Exception Assert.assertEquals(reportMap1, reportMap2); } + @Test + public void testSerializationOnMissingPartitionStats() throws Exception + { + String json = "{\n" + + " \"type\": \"ingestionStatsAndErrors\",\n" + + " \"taskId\": \"ingestionStatsAndErrors\",\n" + + " \"payload\": {\n" + + " \"ingestionState\": \"COMPLETED\",\n" + + " \"unparseableEvents\": {\n" + + " \"hello\": \"world\"\n" + + " },\n" + + " \"rowStats\": {\n" + + " \"number\": 1234\n" + + " },\n" + + " \"errorMsg\": \"an error message\",\n" + + " \"segmentAvailabilityConfirmed\": true,\n" + + " \"segmentAvailabilityWaitTimeMs\": 1000\n" + + " }\n" + + "}"; + + IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport( + IngestionStatsAndErrorsTaskReport.REPORT_KEY, + new IngestionStatsAndErrorsTaskReportData( + IngestionState.COMPLETED, + ImmutableMap.of( + "hello", "world" + ), + ImmutableMap.of( + "number", 1234 + ), + "an error message", + true, + 1000L, + null + ) + ); + + + Assert.assertEquals(expected, jsonMapper.readValue( + json, + new TypeReference() + { + } + )); + } + @Test public void testExceptionWhileWritingReport() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java index b9d5114a1e64..e71dc6db36d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java @@ -202,7 +202,7 @@ public void failsInThirdPhase() throws Exception Assert.assertTrue(task.isReady(actionClient)); task.stopGracefully(null); - + task.setToolbox(toolbox); TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox); Assert.assertTrue(taskStatus.isFailure()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java index 43b0ac902c1f..97e4f54c06ef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java @@ -211,7 +211,7 @@ public void failsThirdPhase() throws Exception Assert.assertTrue(task.isReady(actionClient)); task.stopGracefully(null); - + task.setToolbox(toolbox); TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox); Assert.assertTrue(taskStatus.isFailure());