diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index fbf1f4a38e7c..4b6153fa26ac 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the newly ingested segments |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.| |`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.| + +#### Compaction task segment info fields + +|Field|Description| +|---|---| +|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.| +|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.| + ### Live report When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java index 35ae2f669885..7518523b1de0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java @@ -57,7 +57,7 @@ public String getReportKey() } @Override - public Object getPayload() + public IngestionStatsAndErrorsTaskReportData getPayload() { return payload; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index ecdd9d3aeba9..97ea58e1c5c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.IngestionState; @@ -50,6 +51,11 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private Map recordsProcessed; + @JsonProperty + private Long segmentsRead; + @JsonProperty + private Long segmentsPublished; + public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @@ -57,7 +63,9 @@ public IngestionStatsAndErrorsTaskReportData( @JsonProperty("errorMsg") @Nullable String errorMsg, @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, - @JsonProperty("recordsProcessed") Map recordsProcessed + @JsonProperty("recordsProcessed") Map recordsProcessed, + @Nullable @JsonProperty("segmentsRead") Long segmentsRead, + @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished ) { this.ingestionState = ingestionState; @@ -67,6 +75,8 @@ public IngestionStatsAndErrorsTaskReportData( this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; this.recordsProcessed = recordsProcessed; + this.segmentsRead = segmentsRead; + this.segmentsPublished = segmentsPublished; } @JsonProperty @@ -113,6 +123,22 @@ public Map getRecordsProcessed() return recordsProcessed; } + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getSegmentsRead() + { + return segmentsRead; + } + + @JsonProperty + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getSegmentsPublished() + { + return segmentsPublished; + } + public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( Map taskReports ) @@ -137,7 +163,9 @@ public boolean equals(Object o) Objects.equals(getErrorMsg(), that.getErrorMsg()) && Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) && - Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()); + Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) && + Objects.equals(getSegmentsRead(), that.getSegmentsRead()) && + Objects.equals(getSegmentsPublished(), that.getSegmentsPublished()); } @Override @@ -150,7 +178,9 @@ public int hashCode() getErrorMsg(), isSegmentAvailabilityConfirmed(), getSegmentAvailabilityWaitTimeMs(), - getRecordsProcessed() + getRecordsProcessed(), + getSegmentsRead(), + getSegmentsPublished() ); } @@ -165,6 +195,8 @@ public String toString() ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + ", recordsProcessed=" + recordsProcessed + + ", segmentsRead=" + segmentsRead + + ", segmentsPublished=" + segmentsPublished + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9ea94b844cc4..e0c7f9bc934f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -621,7 +621,9 @@ private Map getTaskCompletionReports() errorMsg, errorMsg == null, 0L, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 1422720bf94d..019b63520e75 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -694,7 +694,9 @@ private Map getTaskCompletionReports() errorMsg, segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 0c9bc57aa50c..50e13a93c0be 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -602,7 +602,9 @@ private Map getTaskCompletionReports() errorMsg, segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 162aa30c6980..40366b7a7d1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -203,6 +203,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private IngestionState ingestionState; private Map completionReports; + private Long segmentsRead; + private Long segmentsPublished; private final boolean isCompactionTask; @@ -643,6 +645,14 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception if (state.isSuccess()) { //noinspection ConstantConditions publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); + if (isCompactionTask) { + // Populate segmentsRead only for compaction tasks + segmentsRead = parallelSinglePhaseRunner.getReports() + .values() + .stream() + .mapToLong(report -> report.getOldSegments().size()).sum(); + } + if (awaitSegmentAvailabilityTimeoutMillis > 0) { waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } @@ -1189,6 +1199,8 @@ private void publishSegments( } else { throw new ISE("Failed to publish segments"); } + + segmentsPublished = (long) newSegments.size(); } private TaskStatus runSequential(TaskToolbox toolbox) throws Exception @@ -1245,7 +1257,9 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, taskStatus.getErrorMsg(), segmentAvailabilityConfirmed, segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + segmentsRead, + segmentsPublished ) ) ); @@ -1629,6 +1643,7 @@ private Pair, Map> doGetRowStatsAndUnparseab final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters(); final List unparseableEvents = new ArrayList<>(); + long totalSegmentsRead = 0L; for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) { Map taskReport = generatedPartitionsReport.getTaskReport(); if (taskReport == null || taskReport.isEmpty()) { @@ -1639,6 +1654,13 @@ private Pair, Map> doGetRowStatsAndUnparseab getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); + + Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport) + taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) + ).getPayload().getSegmentsRead(); + if (segmentsReadFromPartition != null) { + totalSegmentsRead += segmentsReadFromPartition; + } } RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks( @@ -1647,6 +1669,9 @@ private Pair, Map> doGetRowStatsAndUnparseab includeUnparseable ); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks); + if (totalSegmentsRead > 0) { + segmentsRead = totalSegmentsRead; + } return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 9b1a90b57911..1cb3d36ad759 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -37,6 +37,8 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder; +import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -125,7 +127,7 @@ public final TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getIndexingTmpDir() ); - Map taskReport = getTaskCompletionReports(); + Map taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource)); taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport)); @@ -149,6 +151,18 @@ abstract T createGeneratedPartitionsReport( Map taskReport ); + private Long getNumSegmentsRead(InputSource inputSource) + { + if (inputSource instanceof DruidInputSource) { + List segments = ((DruidInputSource) inputSource).getSegmentIds(); + if (segments != null) { + return (long) segments.size(); + } + } + + return null; + } + private List generateSegments( final TaskToolbox toolbox, final ParallelIndexSupervisorTaskClient taskClient, @@ -236,7 +250,7 @@ private List generateSegments( /** * Generate an IngestionStatsAndErrorsTaskReport for the task. */ - private Map getTaskCompletionReports() + private Map getTaskCompletionReports(Long segmentsRead) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( @@ -248,7 +262,9 @@ private Map getTaskCompletionReports() "", false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + segmentsRead, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 8d9bd8f7b6d6..bbd3f2964b61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -644,7 +644,9 @@ private Map getTaskCompletionReports() errorMsg, false, // not applicable for parallel subtask segmentAvailabilityWaitTimeMs, - Collections.emptyMap() + Collections.emptyMap(), + null, + null ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index c7f77cea5662..09e89b1d6019 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1130,7 +1130,9 @@ private Map getTaskCompletionReports(@Nullable String errorM errorMsg, errorMsg == null, handoffWaitMs, - getPartitionStats() + getPartitionStats(), + null, + null ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 12cf09d33175..9b68cebe9eeb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -250,6 +250,18 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc List reports = getIngestionReports(); Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task + + // this test reads 3 segments and publishes 6 segments + Assert.assertEquals( + 3, + reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum() + ); + Assert.assertEquals( + 6, + reports.stream() + .mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished) + .sum() + ); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index c0166d141b00..53b8ace317a9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -70,7 +70,9 @@ public void testSerde() throws Exception "an error message", true, 1000L, - ImmutableMap.of("PartitionA", 5000L) + ImmutableMap.of("PartitionA", 5000L), + 5L, + 10L ) ); String report1serialized = jsonMapper.writeValueAsString(report1); @@ -127,6 +129,8 @@ public void testSerializationOnMissingPartitionStats() throws Exception "an error message", true, 1000L, + null, + null, null ) ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 95b3e1129706..296f7fe747a2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -22,6 +22,7 @@ import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -65,6 +66,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private static final String SEGMENT_METADATA_QUERY_RESOURCE = "/indexer/segment_metadata_query.json"; private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + private static final String PARALLEL_COMPACTION_TASK = "/indexer/wikipedia_compaction_task_parallel.json"; private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json"; private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json"; @@ -138,6 +140,54 @@ public void testCompactionWithQueryGranularityInGranularitySpec() throws Excepti } } + @Test + public void testParallelHashedCompaction() throws Exception + { + try (final Closeable ignored = unloader(fullDatasourceName)) { + loadData(INDEX_TASK, fullDatasourceName); + // 4 segments across 2 days + checkNumberOfSegments(4); + List expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + expectedIntervalAfterCompaction.sort(null); + + checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4); + String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE); + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%", + jsonMapper.writeValueAsString("0") + ); + + queryHelper.testQueriesFromString(queryResponseTemplate); + String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null); + + // The original 4 segments should be compacted into 2 new segments + checkNumberOfSegments(2); + queryHelper.testQueriesFromString(queryResponseTemplate); + checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2); + + + checkCompactionIntervals(expectedIntervalAfterCompaction); + + Map reports = indexer.getTaskReport(taskId); + Assert.assertTrue(reports != null && reports.size() > 0); + + Assert.assertEquals(2, + reports.values() + .stream() + .mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished()) + .sum() + ); + Assert.assertEquals(4, + reports.values() + .stream() + .mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead()) + .sum() + ); + } + } + @Test public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception { diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json new file mode 100644 index 000000000000..8616946ebbaa --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json @@ -0,0 +1,22 @@ +{ + "type" : "compact", + "dataSource" : "%%DATASOURCE%%", + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2013-08-31/2013-09-02" + } + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "hashed" + }, + "forceGuaranteedRollup": true, + "maxNumConcurrentSubTasks": 3 + }, + "context" : { + "storeCompactionState" : true + } +}