Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,12 @@ public Response getUnparseableEvents(
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, List<String>> events = new HashMap<>();
return Response.ok(doGetUnparseableEvents(full)).build();
}

public Map<String, Object> doGetUnparseableEvents(String full)
{
Map<String, Object> events = new HashMap<>();

boolean needsDeterminePartitions = false;
boolean needsBuildSegments = false;
Expand Down Expand Up @@ -325,11 +330,10 @@ public Response getUnparseableEvents(
)
);
}

return Response.ok(events).build();
return events;
}

private Map<String, Object> doGetRowStats(String full)
public Map<String, Object> doGetRowStats(String full)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Expand Down Expand Up @@ -784,6 +788,11 @@ private Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSp
return hllCollectors;
}

public IngestionState getIngestionState()
{
return ingestionState;
}

/**
* This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}.
* If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
Expand Down Expand Up @@ -66,6 +67,8 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
Expand All @@ -92,6 +95,7 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -169,6 +173,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@MonotonicNonNull
private volatile TaskToolbox toolbox;

private IngestionState ingestionState;

@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
Expand Down Expand Up @@ -218,6 +224,7 @@ public ParallelIndexSupervisorTask(
}

awaitSegmentAvailabilityTimeoutMillis = ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
this.ingestionState = IngestionState.NOT_STARTED;
}

private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
Expand Down Expand Up @@ -484,6 +491,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}
}
finally {
ingestionState = IngestionState.COMPLETED;
toolbox.getChatHandlerProvider().unregister(getId());
}
}
Expand Down Expand Up @@ -553,26 +561,27 @@ private void waitForSegmentAvailability(Map<String, PushedSegmentsReport> report
*/
private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
{
final ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> runner = createRunner(
ingestionState = IngestionState.BUILD_SEGMENTS;
ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> parallelSinglePhaseRunner = createRunner(
toolbox,
this::createSinglePhaseTaskRunner
);

final TaskState state = runNextPhase(runner);
final TaskState state = runNextPhase(parallelSinglePhaseRunner);
TaskStatus taskStatus;
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, runner.getReports());
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(runner.getReports());
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
taskStatus = TaskStatus.success(getId());
} else {
// there is only success or failure after running....
Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state);
final String errorMessage = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
runner.getName()
parallelSinglePhaseRunner.getName()
);
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
Expand Down Expand Up @@ -1068,7 +1077,7 @@ private void publishSegments(

private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
{
final IndexTask indexTask = new IndexTask(
IndexTask sequentialIndexTask = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
Expand All @@ -1082,8 +1091,9 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
getContext()
);

if (currentSubTaskHolder.setTask(indexTask) && indexTask.isReady(toolbox.getTaskActionClient())) {
return indexTask.run(toolbox);
if (currentSubTaskHolder.setTask(sequentialIndexTask)
&& sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
return sequentialIndexTask.run(toolbox);
} else {
String msg = "Task was asked to stop. Finish as failed";
LOG.info(msg);
Expand All @@ -1101,13 +1111,17 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
*/
private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus, boolean segmentAvailabilityConfirmed)
{
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents(
"true",
true
);
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
new IngestionStatsAndErrorsTaskReportData(
IngestionState.COMPLETED,
new HashMap<>(),
new HashMap<>(),
rowStatsAndUnparseableEvents.rhs,
rowStatsAndUnparseableEvents.lhs,
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed
)
Expand Down Expand Up @@ -1415,4 +1429,192 @@ public Response getCompleteSubTaskSpecAttemptHistory(
}
}
}

private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
{
if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
// This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
return (RowIngestionMetersTotals) buildSegmentsRowStats;
} else if (buildSegmentsRowStats instanceof Map) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comment explaining when buildSegmentsRowStats can be a Map or RowIngestionMetersTotals?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment, the first case was just for unit tests

Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
return new RowIngestionMetersTotals(
((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
);
} else {
// should never happen
throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
}
}

private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
boolean includeUnparseable
)
{
long processed = 0L;
long processedWithError = 0L;
long thrownAway = 0L;
long unparseable = 0L;

List<String> unparseableEvents = new ArrayList<>();

// Get stats from completed tasks
Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
continue;
}
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);

if (includeUnparseable) {
List<String> taskUnparsebleEvents = (List<String>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
}

processed += totals.getProcessed();
processedWithError += totals.getProcessedWithError();
thrownAway += totals.getThrownAway();
unparseable += totals.getUnparseable();
}

// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
for (String runningTaskId : runningTaskIds) {
try {
Map<String, Object> report = toolbox.getIndexingServiceClient().getTaskReport(runningTaskId);
if (report == null || report.isEmpty()) {
// task does not have a running report yet
continue;
}
Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);

if (includeUnparseable) {
Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
List<String> buildSegmentsUnparseableEvents = (List<String>) taskUnparseableEvents.get(
RowIngestionMeters.BUILD_SEGMENTS
);
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
}

processed += ((Number) buildSegments.get("processed")).longValue();
processedWithError += ((Number) buildSegments.get("processedWithError")).longValue();
thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
unparseable += ((Number) buildSegments.get("unparseable")).longValue();
}
catch (Exception e) {
LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
}
}

Map<String, Object> rowStatsMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable)
);
rowStatsMap.put("totals", totalsMap);

return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
}

private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
{
if (currentSubTaskHolder == null) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}

Object currentRunner = currentSubTaskHolder.getTask();
if (currentRunner == null) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}

if (isParallelMode()) {
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
// multiphase is not supported yet
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
} else {
return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
(SinglePhaseParallelIndexTaskRunner) currentRunner,
includeUnparseable
);
}
} else {
IndexTask currentSequentialTask = (IndexTask) currentRunner;
return Pair.of(currentSequentialTask.doGetRowStats(full), currentSequentialTask.doGetUnparseableEvents(full));
}
}

@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(doGetRowStatsAndUnparseableEvents(full, false).lhs).build();
}

@VisibleForTesting
public Map<String, Object> doGetLiveReports(String full)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();

Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents =
doGetRowStatsAndUnparseableEvents(full, true);

// use the sequential task's ingestion state if we were running that mode
IngestionState ingestionStateForReport;
if (isParallelMode()) {
ingestionStateForReport = ingestionState;
} else {
IndexTask currentSequentialTask = (IndexTask) currentSubTaskHolder.getTask();
ingestionStateForReport = currentSequentialTask == null
? ingestionState
: currentSequentialTask.getIngestionState();
}

payload.put("ingestionState", ingestionStateForReport);
payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs);
payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs);

ingestionStatsAndErrors.put("taskId", getId());
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");

returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
}

@GET
@Path("/liveReports")
@Produces(MediaType.APPLICATION_JSON)
public Response getLiveReports(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);

return Response.ok(doGetLiveReports(full)).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
Expand Down Expand Up @@ -197,7 +198,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
intervalToUnzippedFiles
);

taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments));
taskClient.report(
supervisorTaskId,
new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments, ImmutableMap.of())
);

return TaskStatus.success(getId());
}
Expand Down
Loading