From fa6c9a959968bb14744a82b33ca585ee0064a61a Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 9 Apr 2021 17:00:09 -0500 Subject: [PATCH 1/4] Add handoff wait time to ingestion stats report. Refactor some code for batch handoff --- ...IngestionStatsAndErrorsTaskReportData.java | 22 +++++++++++++++---- .../common/task/AbstractBatchIndexTask.java | 7 +++++- .../AppenderatorDriverRealtimeIndexTask.java | 4 ++-- .../indexing/common/task/HadoopIndexTask.java | 5 +++-- .../druid/indexing/common/task/IndexTask.java | 5 +++-- .../parallel/ParallelIndexSupervisorTask.java | 5 +++-- .../SeekableStreamIndexTaskRunner.java | 17 ++++++++++---- .../common/task/TaskReportSerdeTest.java | 3 ++- 8 files changed, 50 insertions(+), 18 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index bb149e54b50f..28388770c057 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -44,12 +44,16 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private boolean segmentAvailabilityConfirmed; + @JsonProperty + private long segmentAvailabilityWaitTimeMs; + public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @JsonProperty("rowStats") Map rowStats, @JsonProperty("errorMsg") @Nullable String errorMsg, - @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed + @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, + @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs ) { this.ingestionState = ingestionState; @@ -57,6 +61,7 @@ public IngestionStatsAndErrorsTaskReportData( this.rowStats = rowStats; this.errorMsg = errorMsg; this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; + this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; } @JsonProperty @@ -90,6 +95,12 @@ public boolean isSegmentAvailabilityConfirmed() return segmentAvailabilityConfirmed; } + @JsonProperty + public long getSegmentAvailabilityWaitTimeMs() + { + return segmentAvailabilityWaitTimeMs; + } + public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( Map taskReports ) @@ -112,7 +123,8 @@ public boolean equals(Object o) Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) && Objects.equals(getRowStats(), that.getRowStats()) && Objects.equals(getErrorMsg(), that.getErrorMsg()) && - Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()); + Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && + Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()); } @Override @@ -123,7 +135,8 @@ public int hashCode() getUnparseableEvents(), getRowStats(), getErrorMsg(), - isSegmentAvailabilityConfirmed() + isSegmentAvailabilityConfirmed(), + getSegmentAvailabilityWaitTimeMs() ); } @@ -135,7 +148,8 @@ public String toString() ", unparseableEvents=" + unparseableEvents + ", rowStats=" + rowStats + ", errorMsg='" + errorMsg + '\'' + - ", segmentAvailabilityConfirmed=" + segmentAvailabilityConfirmed + + ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 53734dd90d36..fbbadb9fdfb7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -94,6 +94,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private static final Logger log = new Logger(AbstractBatchIndexTask.class); protected boolean segmentAvailabilityConfirmationCompleted = false; + protected long segmentAvailabilityWaitTimeMs = 0L; @GuardedBy("this") private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner(); @@ -613,6 +614,7 @@ protected boolean waitForSegmentAvailability( return false; } log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size()); + final long start = System.nanoTime(); try ( SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory() @@ -636,12 +638,15 @@ protected boolean waitForSegmentAvailability( } ); } - return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS); + segmentAvailabilityConfirmationCompleted = doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS); + return segmentAvailabilityConfirmationCompleted; } catch (InterruptedException e) { log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!"); Thread.currentThread().interrupt(); return false; + } finally { + segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 550f81886f1f..6dcba64875ea 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -174,7 +174,6 @@ private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec) @MonotonicNonNull private String errorMsg; - @JsonCreator public AppenderatorDriverRealtimeIndexTask( @JsonProperty("id") String id, @@ -599,7 +598,8 @@ private Map getTaskCompletionReports() getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), errorMsg, - errorMsg == null + errorMsg == null, + 0L ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index b66ae471136c..1506784fa74a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -447,7 +447,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) { ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; ArrayList segmentsToWaitFor = new ArrayList<>(buildSegmentsStatus.getDataSegments()); - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + waitForSegmentAvailability( toolbox, segmentsToWaitFor, spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() @@ -548,7 +548,8 @@ private Map getTaskCompletionReports() null, getTaskCompletionRowStats(), errorMsg, - segmentAvailabilityConfirmationCompleted + segmentAvailabilityConfirmationCompleted, + segmentAvailabilityWaitTimeMs ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 3b759bc9b576..9a50c5e94f31 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -526,7 +526,8 @@ private Map getTaskCompletionReports() getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), errorMsg, - segmentAvailabilityConfirmationCompleted + segmentAvailabilityConfirmationCompleted, + segmentAvailabilityWaitTimeMs ) ) ); @@ -924,7 +925,7 @@ private TaskStatus generateAndPublishSegments( if (tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() > 0 && published != null) { ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; ArrayList segmentsToWaitFor = new ArrayList<>(published.getSegments()); - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + waitForSegmentAvailability( toolbox, segmentsToWaitFor, tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 60984e8a9dbb..0ab62195ab67 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -512,7 +512,7 @@ private void waitForSegmentAvailability(Map report .forEach(report -> { segmentsToWaitFor.addAll(report.getNewSegments()); }); - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + waitForSegmentAvailability( toolbox, segmentsToWaitFor, awaitSegmentAvailabilityTimeoutMillis @@ -1044,7 +1044,8 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, new HashMap<>(), new HashMap<>(), taskStatus.getErrorMsg(), - segmentAvailabilityConfirmed + segmentAvailabilityConfirmed, + segmentAvailabilityWaitTimeMs ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 6f0728d2c0be..25fd027241bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -265,7 +265,7 @@ public TaskStatus run(TaskToolbox toolbox) catch (Exception e) { log.error(e, "Encountered exception while running task."); final String errorMsg = Throwables.getStackTraceAsString(e); - toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg)); + toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg, 0L)); return TaskStatus.failure( task.getId(), errorMsg @@ -408,6 +408,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ); Throwable caughtExceptionOuter = null; + + //milliseconds waited for created segments to be handed off + long handoffWaitMs = 0L; + try (final RecordSupplier recordSupplier = task.newTaskRecordSupplier()) { if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) { @@ -811,6 +815,7 @@ public void onFailure(Throwable t) if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { + final long start = System.nanoTime(); try { handedOffList = Futures.allAsList(handOffWaitList) .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); @@ -822,6 +827,8 @@ public void onFailure(Throwable t) .addData("taskId", task.getId()) .addData("handoffConditionTimeout", tuningConfig.getHandoffConditionTimeout()) .emit(); + } finally { + handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } } @@ -898,7 +905,7 @@ public void onFailure(Throwable t) } } - toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null)); + toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs)); return TaskStatus.success(task.getId()); } @@ -1060,9 +1067,10 @@ private synchronized void persistSequences() throws IOException * was not successful. * * @param errorMsg Nullable error message for the task. null if task succeeded. + * @param handoffWaitMs Milliseconds waited for segments to be handed off. * @return Map of reports for the task. */ - private Map getTaskCompletionReports(@Nullable String errorMsg) + private Map getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( @@ -1072,7 +1080,8 @@ private Map getTaskCompletionReports(@Nullable String errorM getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), errorMsg, - errorMsg == null + errorMsg == null, + handoffWaitMs ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 65071b2ac194..cea2e83a25ef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -56,7 +56,8 @@ public void testSerde() throws Exception "number", 1234 ), "an error message", - true + true, + 1000L ) ); String report1serialized = jsonMapper.writeValueAsString(report1); From c44d814ec14cb6ad248710843a83802579668beb Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 12 Apr 2021 08:29:26 -0500 Subject: [PATCH 2/4] fix checkstyle --- .../druid/indexing/common/task/AbstractBatchIndexTask.java | 3 ++- .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index fbbadb9fdfb7..1a526b8b79a9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -645,7 +645,8 @@ protected boolean waitForSegmentAvailability( log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!"); Thread.currentThread().interrupt(); return false; - } finally { + } + finally { segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 25fd027241bf..f8b450dddc71 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -827,7 +827,8 @@ public void onFailure(Throwable t) .addData("taskId", task.getId()) .addData("handoffConditionTimeout", tuningConfig.getHandoffConditionTimeout()) .emit(); - } finally { + } + finally { handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } } From d7e740b7567289608c44e0067d70408ad805dc07 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Wed, 7 Jul 2021 15:21:45 -0500 Subject: [PATCH 3/4] Add assertion to AbstractITBatchIndexTask to make sure report reflects wait for segments happened --- .../apache/druid/tests/indexer/AbstractITBatchIndexTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index acab50df3d35..ff537235034c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -338,6 +338,11 @@ private void submitTaskAndWait( TaskReport reportRaw = indexer.getTaskReport(taskID).get("ingestionStatsAndErrors"); IngestionStatsAndErrorsTaskReport report = (IngestionStatsAndErrorsTaskReport) reportRaw; IngestionStatsAndErrorsTaskReportData reportData = (IngestionStatsAndErrorsTaskReportData) report.getPayload(); + + // Confirm that the task waited longer than 0ms for the task to complete. + Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0); + + // Make sure that the result of waiting for segments to load matches the expected result if (segmentAvailabilityConfirmationPair.rhs != null) { Assert.assertEquals( Boolean.valueOf(reportData.isSegmentAvailabilityConfirmed()), From fe1281dd44cfb95209008c82ef178da2bbce750a Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 27 Aug 2021 12:59:21 -0500 Subject: [PATCH 4/4] add docs to the task reports section of doc --- docs/ingestion/native-batch.md | 1 - docs/ingestion/tasks.md | 11 +++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index df7a09770ead..596dae4e6943 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -198,7 +198,6 @@ A sample task is shown below: |id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no| |spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes| |context|Context containing various task configuration parameters. See below for more details.|no| -|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait will occur. If `> 0`, the task will wait for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task will exit as successful, but the segments were not confirmed to have become available for query. Note for compaction tasks: you should not set this to a non-zero value because it is not supported by the compaction task type at this time.|no (default = 0)| ### `dataSchema` diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 2e62bd2c9cdc..a3b759fd386b 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -78,6 +78,8 @@ An example output is shown below: "unparseable": 0 } }, + "segmentAvailabilityConfirmed": false, + "segmentAvailabilityWaitTimeMs": 0, "errorMsg": null }, "type": "ingestionStatsAndErrors" @@ -85,6 +87,15 @@ An example output is shown below: } ``` +#### Segment Availability Fields + +For some task types, the indexing task can wait for the newly ingested segments to become available for queries after ingestion completes. The below fields inform the end user regarding the duration and result of the availability wait. For batch ingestion task types, refer to `tuningConfig` docs to see if the task supports an availability waiting period. + +|Field|Description| +|---|---| +|`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.| +|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.| + ### Live report When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at: