diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index df7a09770ead..596dae4e6943 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -198,7 +198,6 @@ A sample task is shown below: |id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no| |spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes| |context|Context containing various task configuration parameters. See below for more details.|no| -|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait will occur. If `> 0`, the task will wait for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task will exit as successful, but the segments were not confirmed to have become available for query. Note for compaction tasks: you should not set this to a non-zero value because it is not supported by the compaction task type at this time.|no (default = 0)| ### `dataSchema` diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 2e62bd2c9cdc..a3b759fd386b 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -78,6 +78,8 @@ An example output is shown below: "unparseable": 0 } }, + "segmentAvailabilityConfirmed": false, + "segmentAvailabilityWaitTimeMs": 0, "errorMsg": null }, "type": "ingestionStatsAndErrors" @@ -85,6 +87,15 @@ An example output is shown below: } ``` +#### Segment Availability Fields + +For some task types, the indexing task can wait for the newly ingested segments to become available for queries after ingestion completes. The below fields inform the end user regarding the duration and result of the availability wait. For batch ingestion task types, refer to `tuningConfig` docs to see if the task supports an availability waiting period. + +|Field|Description| +|---|---| +|`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.| +|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.| + ### Live report When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java index bb149e54b50f..28388770c057 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java @@ -44,12 +44,16 @@ public class IngestionStatsAndErrorsTaskReportData @JsonProperty private boolean segmentAvailabilityConfirmed; + @JsonProperty + private long segmentAvailabilityWaitTimeMs; + public IngestionStatsAndErrorsTaskReportData( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map unparseableEvents, @JsonProperty("rowStats") Map rowStats, @JsonProperty("errorMsg") @Nullable String errorMsg, - @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed + @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed, + @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs ) { this.ingestionState = ingestionState; @@ -57,6 +61,7 @@ public IngestionStatsAndErrorsTaskReportData( this.rowStats = rowStats; this.errorMsg = errorMsg; this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; + this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs; } @JsonProperty @@ -90,6 +95,12 @@ public boolean isSegmentAvailabilityConfirmed() return segmentAvailabilityConfirmed; } + @JsonProperty + public long getSegmentAvailabilityWaitTimeMs() + { + return segmentAvailabilityWaitTimeMs; + } + public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( Map taskReports ) @@ -112,7 +123,8 @@ public boolean equals(Object o) Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) && Objects.equals(getRowStats(), that.getRowStats()) && Objects.equals(getErrorMsg(), that.getErrorMsg()) && - Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()); + Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) && + Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()); } @Override @@ -123,7 +135,8 @@ public int hashCode() getUnparseableEvents(), getRowStats(), getErrorMsg(), - isSegmentAvailabilityConfirmed() + isSegmentAvailabilityConfirmed(), + getSegmentAvailabilityWaitTimeMs() ); } @@ -135,7 +148,8 @@ public String toString() ", unparseableEvents=" + unparseableEvents + ", rowStats=" + rowStats + ", errorMsg='" + errorMsg + '\'' + - ", segmentAvailabilityConfirmed=" + segmentAvailabilityConfirmed + + ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed + + ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 53734dd90d36..1a526b8b79a9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -94,6 +94,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private static final Logger log = new Logger(AbstractBatchIndexTask.class); protected boolean segmentAvailabilityConfirmationCompleted = false; + protected long segmentAvailabilityWaitTimeMs = 0L; @GuardedBy("this") private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner(); @@ -613,6 +614,7 @@ protected boolean waitForSegmentAvailability( return false; } log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size()); + final long start = System.nanoTime(); try ( SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory() @@ -636,13 +638,17 @@ protected boolean waitForSegmentAvailability( } ); } - return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS); + segmentAvailabilityConfirmationCompleted = doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS); + return segmentAvailabilityConfirmationCompleted; } catch (InterruptedException e) { log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!"); Thread.currentThread().interrupt(); return false; } + finally { + segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + } } private static class LockGranularityDetermineResult diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 550f81886f1f..6dcba64875ea 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -174,7 +174,6 @@ private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec) @MonotonicNonNull private String errorMsg; - @JsonCreator public AppenderatorDriverRealtimeIndexTask( @JsonProperty("id") String id, @@ -599,7 +598,8 @@ private Map getTaskCompletionReports() getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), errorMsg, - errorMsg == null + errorMsg == null, + 0L ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index a1639050aa96..db1a3a5e2ba6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -465,7 +465,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception // for awaitSegmentAvailabilityTimeoutMillis if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) { ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + waitForSegmentAvailability( toolbox, segments, spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() @@ -658,7 +658,8 @@ private Map getTaskCompletionReports() null, getTaskCompletionRowStats(), errorMsg, - segmentAvailabilityConfirmationCompleted + segmentAvailabilityConfirmationCompleted, + segmentAvailabilityWaitTimeMs ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index a7985223483a..ec91574caf48 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -530,7 +530,8 @@ private Map getTaskCompletionReports() getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), errorMsg, - segmentAvailabilityConfirmationCompleted + segmentAvailabilityConfirmationCompleted, + segmentAvailabilityWaitTimeMs ) ) ); @@ -929,7 +930,7 @@ private TaskStatus generateAndPublishSegments( if (tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() > 0 && published != null) { ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; ArrayList segmentsToWaitFor = new ArrayList<>(published.getSegments()); - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + waitForSegmentAvailability( toolbox, segmentsToWaitFor, tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 91cdcc08225f..7085c96da87f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -536,7 +536,7 @@ private void waitForSegmentAvailability(Map report .forEach(report -> { segmentsToWaitFor.addAll(report.getNewSegments()); }); - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + waitForSegmentAvailability( toolbox, segmentsToWaitFor, awaitSegmentAvailabilityTimeoutMillis @@ -1069,7 +1069,8 @@ private Map getTaskCompletionReports(TaskStatus taskStatus, new HashMap<>(), new HashMap<>(), taskStatus.getErrorMsg(), - segmentAvailabilityConfirmed + segmentAvailabilityConfirmed, + segmentAvailabilityWaitTimeMs ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ffede95c1e62..fba93526aad9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -265,7 +265,7 @@ public TaskStatus run(TaskToolbox toolbox) catch (Exception e) { log.error(e, "Encountered exception while running task."); final String errorMsg = Throwables.getStackTraceAsString(e); - toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg)); + toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg, 0L)); return TaskStatus.failure( task.getId(), errorMsg @@ -408,6 +408,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ); Throwable caughtExceptionOuter = null; + + //milliseconds waited for created segments to be handed off + long handoffWaitMs = 0L; + try (final RecordSupplier recordSupplier = task.newTaskRecordSupplier()) { if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) { @@ -811,6 +815,7 @@ public void onFailure(Throwable t) if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { + final long start = System.nanoTime(); try { handedOffList = Futures.allAsList(handOffWaitList) .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); @@ -823,6 +828,9 @@ public void onFailure(Throwable t) .addData("handoffConditionTimeout", tuningConfig.getHandoffConditionTimeout()) .emit(); } + finally { + handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + } } for (SegmentsAndCommitMetadata handedOff : handedOffList) { @@ -898,7 +906,7 @@ public void onFailure(Throwable t) } } - toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null)); + toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs)); return TaskStatus.success(task.getId()); } @@ -1060,9 +1068,10 @@ private synchronized void persistSequences() throws IOException * was not successful. * * @param errorMsg Nullable error message for the task. null if task succeeded. + * @param handoffWaitMs Milliseconds waited for segments to be handed off. * @return Map of reports for the task. */ - private Map getTaskCompletionReports(@Nullable String errorMsg) + private Map getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( @@ -1072,7 +1081,8 @@ private Map getTaskCompletionReports(@Nullable String errorM getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), errorMsg, - errorMsg == null + errorMsg == null, + handoffWaitMs ) ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 65071b2ac194..cea2e83a25ef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -56,7 +56,8 @@ public void testSerde() throws Exception "number", 1234 ), "an error message", - true + true, + 1000L ) ); String report1serialized = jsonMapper.writeValueAsString(report1); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index acab50df3d35..ff537235034c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -338,6 +338,11 @@ private void submitTaskAndWait( TaskReport reportRaw = indexer.getTaskReport(taskID).get("ingestionStatsAndErrors"); IngestionStatsAndErrorsTaskReport report = (IngestionStatsAndErrorsTaskReport) reportRaw; IngestionStatsAndErrorsTaskReportData reportData = (IngestionStatsAndErrorsTaskReportData) report.getPayload(); + + // Confirm that the task waited longer than 0ms for the task to complete. + Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0); + + // Make sure that the result of waiting for segments to load matches the expected result if (segmentAvailabilityConfirmationPair.rhs != null) { Assert.assertEquals( Boolean.valueOf(reportData.isSegmentAvailabilityConfirmed()),