Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the newly ingested segments
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|


#### Compaction task segment info fields

|Field|Description|
|---|---|
|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.|
|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.|

### Live report

When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getReportKey()
}

@Override
public Object getPayload()
public IngestionStatsAndErrorsTaskReportData getPayload()
{
return payload;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.IngestionState;

Expand Down Expand Up @@ -50,14 +51,21 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty
private Map<String, Long> recordsProcessed;

@JsonProperty
private Long segmentsRead;
@JsonProperty
private Long segmentsPublished;

public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs,
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
@Nullable @JsonProperty("segmentsRead") Long segmentsRead,
@Nullable @JsonProperty("segmentsPublished") Long segmentsPublished
)
{
this.ingestionState = ingestionState;
Expand All @@ -67,6 +75,8 @@ public IngestionStatsAndErrorsTaskReportData(
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
this.recordsProcessed = recordsProcessed;
this.segmentsRead = segmentsRead;
this.segmentsPublished = segmentsPublished;
}

@JsonProperty
Expand Down Expand Up @@ -113,6 +123,22 @@ public Map<String, Long> getRecordsProcessed()
return recordsProcessed;
}

@JsonProperty
@Nullable
Comment thread
ac9817 marked this conversation as resolved.
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getSegmentsRead()
{
return segmentsRead;
}

@JsonProperty
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getSegmentsPublished()
{
return segmentsPublished;
}

public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
Expand All @@ -137,7 +163,9 @@ public boolean equals(Object o)
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) &&
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) &&
Objects.equals(getSegmentsRead(), that.getSegmentsRead()) &&
Objects.equals(getSegmentsPublished(), that.getSegmentsPublished());
}

@Override
Expand All @@ -150,7 +178,9 @@ public int hashCode()
getErrorMsg(),
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs(),
getRecordsProcessed()
getRecordsProcessed(),
getSegmentsRead(),
getSegmentsPublished()
);
}

Expand All @@ -165,6 +195,8 @@ public String toString()
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
", recordsProcessed=" + recordsProcessed +
", segmentsRead=" + segmentsRead +
", segmentsPublished=" + segmentsPublished +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
errorMsg == null,
0L,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen

private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private Long segmentsRead;
private Long segmentsPublished;
private final boolean isCompactionTask;


Expand Down Expand Up @@ -643,6 +645,14 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
if (isCompactionTask) {
// Populate segmentsRead only for compaction tasks
segmentsRead = parallelSinglePhaseRunner.getReports()
.values()
.stream()
.mapToLong(report -> report.getOldSegments().size()).sum();
}

if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
Expand Down Expand Up @@ -1189,6 +1199,8 @@ private void publishSegments(
} else {
throw new ISE("Failed to publish segments");
}

segmentsPublished = (long) newSegments.size();
}

private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
Expand Down Expand Up @@ -1245,7 +1257,9 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
segmentsRead,
segmentsPublished
)
)
);
Expand Down Expand Up @@ -1629,6 +1643,7 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
Expand All @@ -1639,6 +1654,13 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);

Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport)
taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
).getPayload().getSegmentsRead();
if (segmentsReadFromPartition != null) {
totalSegmentsRead += segmentsReadFromPartition;
}
}

RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
Expand All @@ -1647,6 +1669,9 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
if (totalSegmentsRead > 0) {
segmentsRead = totalSegmentsRead;
}

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
Expand Down Expand Up @@ -125,7 +127,7 @@ public final TaskStatus runTask(TaskToolbox toolbox) throws Exception
toolbox.getIndexingTmpDir()
);

Map<String, TaskReport> taskReport = getTaskCompletionReports();
Map<String, TaskReport> taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource));

taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport));

Expand All @@ -149,6 +151,18 @@ abstract T createGeneratedPartitionsReport(
Map<String, TaskReport> taskReport
);

private Long getNumSegmentsRead(InputSource inputSource)
{
if (inputSource instanceof DruidInputSource) {
List<WindowedSegmentId> segments = ((DruidInputSource) inputSource).getSegmentIds();
if (segments != null) {
return (long) segments.size();
}
}

return null;
}

private List<DataSegment> generateSegments(
final TaskToolbox toolbox,
final ParallelIndexSupervisorTaskClient taskClient,
Expand Down Expand Up @@ -236,7 +250,7 @@ private List<DataSegment> generateSegments(
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
private Map<String, TaskReport> getTaskCompletionReports()
private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
Expand All @@ -248,7 +262,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
"",
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
segmentsRead,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,9 @@ private Map<String, TaskReport> getTaskCompletionReports()
errorMsg,
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
Collections.emptyMap()
Collections.emptyMap(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,9 @@ private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorM
errorMsg,
errorMsg == null,
handoffWaitMs,
getPartitionStats()
getPartitionStats(),
null,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc

List<IngestionStatsAndErrorsTaskReportData> reports = getIngestionReports();
Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task

// this test reads 3 segments and publishes 6 segments
Assert.assertEquals(
3,
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
);
Assert.assertEquals(
6,
reports.stream()
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
.sum()
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void testSerde() throws Exception
"an error message",
true,
1000L,
ImmutableMap.of("PartitionA", 5000L)
ImmutableMap.of("PartitionA", 5000L),
5L,
10L
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
Expand Down Expand Up @@ -127,6 +129,8 @@ public void testSerializationOnMissingPartitionStats() throws Exception
"an error message",
true,
1000L,
null,
null,
null
)
);
Expand Down
Loading