Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

int failCnt = 0;
Map<String, TaskReport> completionReports = new HashMap<>();
for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
for (int i = 0; i < indexTaskSpecs.size(); i++) {
ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
String errMsg = "Task was asked to stop. Finish as failed.";
Expand All @@ -518,9 +519,11 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}

String reportKeySuffix = "_" + i;
Optional.ofNullable(eachSpec.getCompletionReports())
.ifPresent(reports -> completionReports.putAll(
CollectionUtils.mapKeys(reports, key -> getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));
CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix)));
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json);
Expand Down Expand Up @@ -572,11 +575,6 @@ private String createIndexTaskSpecId(int i)
return StringUtils.format("%s_%d", getId(), i);
}

private String getReportkey(String baseSequenceName, String currentKey)
{
return StringUtils.format("%s_%s", currentKey, baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1));
}

/**
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode

private IngestionState ingestionState;

// used to specify if indextask.run() is run as a part of another task
// skips writing reports and cleanup if not a standalone task
private boolean isStandAloneTask;

@MonotonicNonNull
Expand Down Expand Up @@ -217,6 +215,10 @@ public IndexTask(
);
}

/**
* @param isStandAloneTask used to specify if indextask.run() is run as a part of another task
* skips writing reports and cleanup if not a standalone task
*/
public IndexTask(
String id,
String groupId,
Expand Down Expand Up @@ -567,8 +569,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState);
errorMsg = Throwables.getStackTraceAsString(e);
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
Expand All @@ -580,8 +581,10 @@ public TaskStatus runTask(final TaskToolbox toolbox)
}
}

private void writeCompletionReports(TaskToolbox toolbox)

private void updateAndWriteCompletionReports(TaskToolbox toolbox)
{
completionReports = getTaskCompletionReports();
if (isStandAloneTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
Expand Down Expand Up @@ -1043,8 +1046,7 @@ private TaskStatus generateAndPublishSegments(
if (published == null) {
log.error("Failed to publish segments, aborting!");
errorMsg = "Failed to publish segments.";
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
Expand All @@ -1067,8 +1069,7 @@ private TaskStatus generateAndPublishSegments(

log.debugSegments(published.getSegments(), "Published segments");

completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(toolbox);
return TaskStatus.success(getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen

private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private final Boolean isCompactionTask;
private final boolean isCompactionTask;
Comment thread
ac9817 marked this conversation as resolved.


@JsonCreator
Expand All @@ -225,7 +225,7 @@ public ParallelIndexSupervisorTask(
ParallelIndexIngestionSpec ingestionSchema,
@Nullable String baseSubtaskSpecName,
Map<String, Object> context,
Boolean isCompactionTask
boolean isCompactionTask
)
{
super(
Expand Down Expand Up @@ -303,13 +303,6 @@ public Map<String, TaskReport> getCompletionReports()
return completionReports;
}

@Nullable
@JsonIgnore
public String getBaseSubtaskSpecName()
{
return baseSubtaskSpecName;
}

@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
Expand Down Expand Up @@ -534,13 +527,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
try {

initializeSubTaskCleaner();
this.toolbox = toolbox;

if (isParallelMode()) {
// emit metric for parallel batch ingestion mode:
emitMetric(toolbox.getEmitter(), "ingest/count", 1);

this.toolbox = toolbox;

if (isGuaranteedRollup(
getIngestionMode(),
ingestionSchema.getTuningConfig()
Expand Down Expand Up @@ -669,8 +661,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
}
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}

Expand Down Expand Up @@ -837,8 +828,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
taskStatus = TaskStatus.failure(getId(), errMsg);
}

completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}

Expand Down Expand Up @@ -935,8 +925,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
taskStatus = TaskStatus.failure(getId(), errMsg);
}

completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}

Expand Down Expand Up @@ -1225,6 +1214,7 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
&& sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
TaskStatus status = sequentialIndexTask.run(toolbox);
completionReports = sequentialIndexTask.getCompletionReports();
writeCompletionReports();
return status;
} else {
String msg = "Task was asked to stop. Finish as failed";
Expand Down Expand Up @@ -1261,7 +1251,13 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
);
}

private void writeCompletionReports(TaskToolbox toolbox)
private void updateAndWriteCompletionReports(TaskStatus status)
{
completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted);
writeCompletionReports();
}

private void writeCompletionReports()
{
if (!isCompactionTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
Expand Down Expand Up @@ -1842,6 +1838,12 @@ static Map<String, Object> getTaskReport(final OverlordClient overlordClient, fi
}
}

@VisibleForTesting
public void setToolbox(TaskToolbox toolbox)
{
this.toolbox = toolbox;
}

/**
* Represents a partition uniquely identified by an Interval and a bucketId.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,52 @@ public void testSerde() throws Exception
Assert.assertEquals(reportMap1, reportMap2);
}

@Test
public void testSerializationOnMissingPartitionStats() throws Exception
Comment thread
ac9817 marked this conversation as resolved.
{
String json = "{\n"
+ " \"type\": \"ingestionStatsAndErrors\",\n"
+ " \"taskId\": \"ingestionStatsAndErrors\",\n"
+ " \"payload\": {\n"
+ " \"ingestionState\": \"COMPLETED\",\n"
+ " \"unparseableEvents\": {\n"
+ " \"hello\": \"world\"\n"
+ " },\n"
+ " \"rowStats\": {\n"
+ " \"number\": 1234\n"
+ " },\n"
+ " \"errorMsg\": \"an error message\",\n"
+ " \"segmentAvailabilityConfirmed\": true,\n"
+ " \"segmentAvailabilityWaitTimeMs\": 1000\n"
+ " }\n"
+ "}";

IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport(
IngestionStatsAndErrorsTaskReport.REPORT_KEY,
new IngestionStatsAndErrorsTaskReportData(
IngestionState.COMPLETED,
ImmutableMap.of(
"hello", "world"
),
ImmutableMap.of(
"number", 1234
),
"an error message",
true,
1000L,
null
)
);


Assert.assertEquals(expected, jsonMapper.readValue(
json,
new TypeReference<TaskReport>()
{
}
));
}

@Test
public void testExceptionWhileWritingReport() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void failsInThirdPhase() throws Exception
Assert.assertTrue(task.isReady(actionClient));
task.stopGracefully(null);


task.setToolbox(toolbox);
TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);

Assert.assertTrue(taskStatus.isFailure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void failsThirdPhase() throws Exception
Assert.assertTrue(task.isReady(actionClient));
task.stopGracefully(null);


task.setToolbox(toolbox);
TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);

Assert.assertTrue(taskStatus.isFailure());
Expand Down