Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ A sample task is shown below:
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no|
|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes|
|context|Context containing various task configuration parameters. See below for more details.|no|
|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait will occur. If `> 0`, the task will wait for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task will exit as successful, but the segments were not confirmed to have become available for query. Note for compaction tasks: you should not set this to a non-zero value because it is not supported by the compaction task type at this time.|no (default = 0)|
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized that this doc was misplaced and duplicated in the native-batch file. It is found below in the tuningConfig section where it should be. This is just clean up.


### `dataSchema`

Expand Down
11 changes: 11 additions & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,24 @@ An example output is shown below:
"unparseable": 0
}
},
"segmentAvailabilityConfirmed": false,
"segmentAvailabilityWaitTimeMs": 0,
"errorMsg": null
},
"type": "ingestionStatsAndErrors"
}
}
```

#### Segment Availability Fields

For some task types, the indexing task can wait for the newly ingested segments to become available for queries after ingestion completes. The below fields inform the end user regarding the duration and result of the availability wait. For batch ingestion task types, refer to `tuningConfig` docs to see if the task supports an availability waiting period.

|Field|Description|
|---|---|
|`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.|
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|

### Live report

When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty
private boolean segmentAvailabilityConfirmed;

@JsonProperty
private long segmentAvailabilityWaitTimeMs;

public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs
)
{
this.ingestionState = ingestionState;
this.unparseableEvents = unparseableEvents;
this.rowStats = rowStats;
this.errorMsg = errorMsg;
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
}

@JsonProperty
Expand Down Expand Up @@ -90,6 +95,12 @@ public boolean isSegmentAvailabilityConfirmed()
return segmentAvailabilityConfirmed;
}

@JsonProperty
public long getSegmentAvailabilityWaitTimeMs()
{
return segmentAvailabilityWaitTimeMs;
}

public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
Expand All @@ -112,7 +123,8 @@ public boolean equals(Object o)
Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) &&
Objects.equals(getRowStats(), that.getRowStats()) &&
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed());
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs());
}

@Override
Expand All @@ -123,7 +135,8 @@ public int hashCode()
getUnparseableEvents(),
getRowStats(),
getErrorMsg(),
isSegmentAvailabilityConfirmed()
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs()
);
}

Expand All @@ -135,7 +148,8 @@ public String toString()
", unparseableEvents=" + unparseableEvents +
", rowStats=" + rowStats +
", errorMsg='" + errorMsg + '\'' +
", segmentAvailabilityConfirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
private static final Logger log = new Logger(AbstractBatchIndexTask.class);

protected boolean segmentAvailabilityConfirmationCompleted = false;
protected long segmentAvailabilityWaitTimeMs = 0L;

@GuardedBy("this")
private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner();
Expand Down Expand Up @@ -613,6 +614,7 @@ protected boolean waitForSegmentAvailability(
return false;
}
log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size());
final long start = System.nanoTime();

try (
SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
Expand All @@ -636,13 +638,17 @@ protected boolean waitForSegmentAvailability(
}
);
}
return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
segmentAvailabilityConfirmationCompleted = doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
return segmentAvailabilityConfirmationCompleted;
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!");
Thread.currentThread().interrupt();
return false;
}
finally {
segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}
}

private static class LockGranularityDetermineResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec)
@MonotonicNonNull
private String errorMsg;


@JsonCreator
public AppenderatorDriverRealtimeIndexTask(
@JsonProperty("id") String id,
Expand Down Expand Up @@ -599,7 +598,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
getTaskCompletionUnparseableEvents(),
getTaskCompletionRowStats(),
errorMsg,
errorMsg == null
errorMsg == null,
0L
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
// for awaitSegmentAvailabilityTimeoutMillis
if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
waitForSegmentAvailability(
toolbox,
segments,
spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
Expand Down Expand Up @@ -658,7 +658,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
null,
getTaskCompletionRowStats(),
errorMsg,
segmentAvailabilityConfirmationCompleted
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
getTaskCompletionUnparseableEvents(),
getTaskCompletionRowStats(),
errorMsg,
segmentAvailabilityConfirmationCompleted
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs
)
)
);
Expand Down Expand Up @@ -929,7 +930,7 @@ private TaskStatus generateAndPublishSegments(
if (tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() > 0 && published != null) {
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<>(published.getSegments());
segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
waitForSegmentAvailability(
toolbox,
segmentsToWaitFor,
tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ private void waitForSegmentAvailability(Map<String, PushedSegmentsReport> report
.forEach(report -> {
segmentsToWaitFor.addAll(report.getNewSegments());
});
segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
waitForSegmentAvailability(
toolbox,
segmentsToWaitFor,
awaitSegmentAvailabilityTimeoutMillis
Expand Down Expand Up @@ -1069,7 +1069,8 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
new HashMap<>(),
new HashMap<>(),
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public TaskStatus run(TaskToolbox toolbox)
catch (Exception e) {
log.error(e, "Encountered exception while running task.");
final String errorMsg = Throwables.getStackTraceAsString(e);
toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg));
toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg, 0L));
return TaskStatus.failure(
task.getId(),
errorMsg
Expand Down Expand Up @@ -408,6 +408,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
);

Throwable caughtExceptionOuter = null;

//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;

try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier = task.newTaskRecordSupplier()) {

if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
Expand Down Expand Up @@ -811,6 +815,7 @@ public void onFailure(Throwable t)
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get();
} else {
final long start = System.nanoTime();
try {
handedOffList = Futures.allAsList(handOffWaitList)
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
Expand All @@ -823,6 +828,9 @@ public void onFailure(Throwable t)
.addData("handoffConditionTimeout", tuningConfig.getHandoffConditionTimeout())
.emit();
}
finally {
handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}
}

for (SegmentsAndCommitMetadata handedOff : handedOffList) {
Expand Down Expand Up @@ -898,7 +906,7 @@ public void onFailure(Throwable t)
}
}

toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null));
toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs));
return TaskStatus.success(task.getId());
}

Expand Down Expand Up @@ -1060,9 +1068,10 @@ private synchronized void persistSequences() throws IOException
* was not successful.
*
* @param errorMsg Nullable error message for the task. null if task succeeded.
* @param handoffWaitMs Milliseconds waited for segments to be handed off.
* @return Map of reports for the task.
*/
private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg)
private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
Expand All @@ -1072,7 +1081,8 @@ private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorM
getTaskCompletionUnparseableEvents(),
getTaskCompletionRowStats(),
errorMsg,
errorMsg == null
errorMsg == null,
handoffWaitMs
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public void testSerde() throws Exception
"number", 1234
),
"an error message",
true
true,
1000L
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ private void submitTaskAndWait(
TaskReport reportRaw = indexer.getTaskReport(taskID).get("ingestionStatsAndErrors");
IngestionStatsAndErrorsTaskReport report = (IngestionStatsAndErrorsTaskReport) reportRaw;
IngestionStatsAndErrorsTaskReportData reportData = (IngestionStatsAndErrorsTaskReportData) report.getPayload();

// Confirm that the task waited longer than 0ms for the task to complete.
Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0);

// Make sure that the result of waiting for segments to load matches the expected result
if (segmentAvailabilityConfirmationPair.rhs != null) {
Assert.assertEquals(
Boolean.valueOf(reportData.isSegmentAvailabilityConfirmed()),
Expand Down