diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index a7985223483a..f22c2a0f0682 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -286,7 +286,12 @@ public Response getUnparseableEvents( ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - Map> events = new HashMap<>(); + return Response.ok(doGetUnparseableEvents(full)).build(); + } + + public Map doGetUnparseableEvents(String full) + { + Map events = new HashMap<>(); boolean needsDeterminePartitions = false; boolean needsBuildSegments = false; @@ -325,11 +330,10 @@ public Response getUnparseableEvents( ) ); } - - return Response.ok(events).build(); + return events; } - private Map doGetRowStats(String full) + public Map doGetRowStats(String full) { Map returnMap = new HashMap<>(); Map totalsMap = new HashMap<>(); @@ -784,6 +788,11 @@ private Map> collectIntervalsAndShardSp return hllCollectors; } + public IngestionState getIngestionState() + { + return ingestionState; + } + /** * This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}. * If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index b66f49f7a6c8..19b965c2e57a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; @@ -66,6 +67,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; @@ -92,6 +95,7 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -169,6 +173,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @MonotonicNonNull private volatile TaskToolbox toolbox; + private IngestionState ingestionState; + @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, @@ -218,6 +224,7 @@ public ParallelIndexSupervisorTask( } awaitSegmentAvailabilityTimeoutMillis = ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis(); + this.ingestionState = IngestionState.NOT_STARTED; } private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec) @@ -484,6 +491,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } } finally { + ingestionState = IngestionState.COMPLETED; toolbox.getChatHandlerProvider().unregister(getId()); } } @@ -553,18 +561,19 @@ private void waitForSegmentAvailability(Map report */ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception { - final ParallelIndexTaskRunner runner = createRunner( + ingestionState = IngestionState.BUILD_SEGMENTS; + ParallelIndexTaskRunner parallelSinglePhaseRunner = createRunner( toolbox, this::createSinglePhaseTaskRunner ); - final TaskState state = runNextPhase(runner); + final TaskState state = runNextPhase(parallelSinglePhaseRunner); TaskStatus taskStatus; if (state.isSuccess()) { //noinspection ConstantConditions - publishSegments(toolbox, runner.getReports()); + publishSegments(toolbox, parallelSinglePhaseRunner.getReports()); if (awaitSegmentAvailabilityTimeoutMillis > 0) { - waitForSegmentAvailability(runner.getReports()); + waitForSegmentAvailability(parallelSinglePhaseRunner.getReports()); } taskStatus = TaskStatus.success(getId()); } else { @@ -572,7 +581,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state); final String errorMessage = StringUtils.format( TASK_PHASE_FAILURE_MSG, - runner.getName() + parallelSinglePhaseRunner.getName() ); taskStatus = TaskStatus.failure(getId(), errorMessage); } @@ -1068,7 +1077,7 @@ private void publishSegments( private TaskStatus runSequential(TaskToolbox toolbox) throws Exception { - final IndexTask indexTask = new IndexTask( + IndexTask sequentialIndexTask = new IndexTask( getId(), getGroupId(), getTaskResource(), @@ -1082,8 +1091,9 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception getContext() ); - if (currentSubTaskHolder.setTask(indexTask) && indexTask.isReady(toolbox.getTaskActionClient())) { - return indexTask.run(toolbox); + if (currentSubTaskHolder.setTask(sequentialIndexTask) + && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) { + return sequentialIndexTask.run(toolbox); } else { String msg = "Task was asked to stop. Finish as failed"; LOG.info(msg); @@ -1101,13 +1111,17 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception */ private Map getTaskCompletionReports(TaskStatus taskStatus, boolean segmentAvailabilityConfirmed) { + Pair, Map> rowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents( + "true", + true + ); return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( getId(), new IngestionStatsAndErrorsTaskReportData( IngestionState.COMPLETED, - new HashMap<>(), - new HashMap<>(), + rowStatsAndUnparseableEvents.rhs, + rowStatsAndUnparseableEvents.lhs, taskStatus.getErrorMsg(), segmentAvailabilityConfirmed ) @@ -1415,4 +1429,192 @@ public Response getCompleteSubTaskSpecAttemptHistory( } } } + + private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats) + { + if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) { + // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map. + return (RowIngestionMetersTotals) buildSegmentsRowStats; + } else if (buildSegmentsRowStats instanceof Map) { + Map buildSegmentsRowStatsMap = (Map) buildSegmentsRowStats; + return new RowIngestionMetersTotals( + ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(), + ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(), + ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(), + ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue() + ); + } else { + // should never happen + throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass()); + } + } + + private Pair, Map> doGetRowStatsAndUnparseableEventsParallelSinglePhase( + SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner, + boolean includeUnparseable + ) + { + long processed = 0L; + long processedWithError = 0L; + long thrownAway = 0L; + long unparseable = 0L; + + List unparseableEvents = new ArrayList<>(); + + // Get stats from completed tasks + Map completedSubtaskReports = parallelSinglePhaseRunner.getReports(); + for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) { + Map taskReport = pushedSegmentsReport.getTaskReport(); + if (taskReport == null || taskReport.isEmpty()) { + LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId()); + continue; + } + IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get( + IngestionStatsAndErrorsTaskReport.REPORT_KEY); + IngestionStatsAndErrorsTaskReportData reportData = + (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload(); + RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats( + reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS) + ); + + if (includeUnparseable) { + List taskUnparsebleEvents = (List) reportData.getUnparseableEvents() + .get(RowIngestionMeters.BUILD_SEGMENTS); + unparseableEvents.addAll(taskUnparsebleEvents); + } + + processed += totals.getProcessed(); + processedWithError += totals.getProcessedWithError(); + thrownAway += totals.getThrownAway(); + unparseable += totals.getUnparseable(); + } + + // Get stats from running tasks + Set runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds(); + for (String runningTaskId : runningTaskIds) { + try { + Map report = toolbox.getIndexingServiceClient().getTaskReport(runningTaskId); + if (report == null || report.isEmpty()) { + // task does not have a running report yet + continue; + } + Map ingestionStatsAndErrors = (Map) report.get("ingestionStatsAndErrors"); + Map payload = (Map) ingestionStatsAndErrors.get("payload"); + Map rowStats = (Map) payload.get("rowStats"); + Map totals = (Map) rowStats.get("totals"); + Map buildSegments = (Map) totals.get(RowIngestionMeters.BUILD_SEGMENTS); + + if (includeUnparseable) { + Map taskUnparseableEvents = (Map) payload.get("unparseableEvents"); + List buildSegmentsUnparseableEvents = (List) taskUnparseableEvents.get( + RowIngestionMeters.BUILD_SEGMENTS + ); + unparseableEvents.addAll(buildSegmentsUnparseableEvents); + } + + processed += ((Number) buildSegments.get("processed")).longValue(); + processedWithError += ((Number) buildSegments.get("processedWithError")).longValue(); + thrownAway += ((Number) buildSegments.get("thrownAway")).longValue(); + unparseable += ((Number) buildSegments.get("unparseable")).longValue(); + } + catch (Exception e) { + LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId); + } + } + + Map rowStatsMap = new HashMap<>(); + Map totalsMap = new HashMap<>(); + totalsMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable) + ); + rowStatsMap.put("totals", totalsMap); + + return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents)); + } + + private Pair, Map> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable) + { + if (currentSubTaskHolder == null) { + return Pair.of(ImmutableMap.of(), ImmutableMap.of()); + } + + Object currentRunner = currentSubTaskHolder.getTask(); + if (currentRunner == null) { + return Pair.of(ImmutableMap.of(), ImmutableMap.of()); + } + + if (isParallelMode()) { + if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) { + // multiphase is not supported yet + return Pair.of(ImmutableMap.of(), ImmutableMap.of()); + } else { + return doGetRowStatsAndUnparseableEventsParallelSinglePhase( + (SinglePhaseParallelIndexTaskRunner) currentRunner, + includeUnparseable + ); + } + } else { + IndexTask currentSequentialTask = (IndexTask) currentRunner; + return Pair.of(currentSequentialTask.doGetRowStats(full), currentSequentialTask.doGetUnparseableEvents(full)); + } + } + + @GET + @Path("/rowStats") + @Produces(MediaType.APPLICATION_JSON) + public Response getRowStats( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + return Response.ok(doGetRowStatsAndUnparseableEvents(full, false).lhs).build(); + } + + @VisibleForTesting + public Map doGetLiveReports(String full) + { + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + + Pair, Map> rowStatsAndUnparsebleEvents = + doGetRowStatsAndUnparseableEvents(full, true); + + // use the sequential task's ingestion state if we were running that mode + IngestionState ingestionStateForReport; + if (isParallelMode()) { + ingestionStateForReport = ingestionState; + } else { + IndexTask currentSequentialTask = (IndexTask) currentSubTaskHolder.getTask(); + ingestionStateForReport = currentSequentialTask == null + ? ingestionState + : currentSequentialTask.getIngestionState(); + } + + payload.put("ingestionState", ingestionStateForReport); + payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs); + payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs); + + ingestionStatsAndErrors.put("taskId", getId()); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + return returnMap; + } + + @GET + @Path("/liveReports") + @Produces(MediaType.APPLICATION_JSON) + public Response getLiveReports( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + + return Response.ok(doGetLiveReports(full)).build(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index 38a7261b786c..60814fde8cdc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; @@ -197,7 +198,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception intervalToUnzippedFiles ); - taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments)); + taskClient.report( + supervisorTaskId, + new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments, ImmutableMap.of()) + ); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java index 8654172c572c..8ed373c7d953 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.timeline.DataSegment; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -39,17 +41,20 @@ public class PushedSegmentsReport implements SubTaskReport private final String taskId; private final Set oldSegments; private final Set newSegments; + private final Map taskReport; @JsonCreator public PushedSegmentsReport( @JsonProperty("taskId") String taskId, @JsonProperty("oldSegments") Set oldSegments, - @JsonProperty("segments") Set newSegments + @JsonProperty("segments") Set newSegments, + @JsonProperty("taskReport") Map taskReport ) { this.taskId = Preconditions.checkNotNull(taskId, "taskId"); this.oldSegments = Preconditions.checkNotNull(oldSegments, "oldSegments"); this.newSegments = Preconditions.checkNotNull(newSegments, "newSegments"); + this.taskReport = taskReport; } @Override @@ -71,6 +76,12 @@ public Set getNewSegments() return newSegments; } + @JsonProperty("taskReport") + public Map getTaskReport() + { + return taskReport; + } + @Override public boolean equals(Object o) { @@ -81,14 +92,15 @@ public boolean equals(Object o) return false; } PushedSegmentsReport that = (PushedSegmentsReport) o; - return Objects.equals(taskId, that.taskId) && - Objects.equals(oldSegments, that.oldSegments) && - Objects.equals(newSegments, that.newSegments); + return Objects.equals(taskId, that.taskId) + && Objects.equals(oldSegments, that.oldSegments) + && Objects.equals(newSegments, that.newSegments) + && Objects.equals(taskReport, that.taskReport); } @Override public int hashCode() { - return Objects.hash(taskId, oldSegments, newSegments); + return Objects.hash(taskId, oldSegments, newSegments, taskReport); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index af885e715dee..08cba47f6f89 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -21,13 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSource; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -35,6 +42,7 @@ import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider; import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; import org.apache.druid.indexing.common.task.TaskResource; @@ -58,16 +66,29 @@ import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; +import org.apache.druid.segment.realtime.firehose.ChatHandler; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,7 +101,7 @@ * generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of * publishing on its own. */ -public class SinglePhaseSubTask extends AbstractBatchSubtask +public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler { public static final String TYPE = "single_phase_sub_task"; public static final String OLD_TYPE_NAME = "index_sub"; @@ -106,6 +127,20 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask */ private final boolean missingIntervalsInOverwriteMode; + @MonotonicNonNull + private AuthorizerMapper authorizerMapper; + + @MonotonicNonNull + private RowIngestionMeters rowIngestionMeters; + + @MonotonicNonNull + private ParseExceptionHandler parseExceptionHandler; + + @Nullable + private String errorMsg; + + private IngestionState ingestionState; + @JsonCreator public SinglePhaseSubTask( // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask @@ -144,6 +179,7 @@ public SinglePhaseSubTask( if (missingIntervalsInOverwriteMode) { addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } + this.ingestionState = IngestionState.NOT_STARTED; } @Override @@ -187,43 +223,74 @@ public String getSubtaskSpecId() } @Override - public TaskStatus runTask(final TaskToolbox toolbox) throws Exception + public TaskStatus runTask(final TaskToolbox toolbox) { - if (missingIntervalsInOverwriteMode) { - LOG.warn( - "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. " - + "Forced to use timeChunk lock." + try { + if (missingIntervalsInOverwriteMode) { + LOG.warn( + "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. " + + "Forced to use timeChunk lock." + ); + } + this.authorizerMapper = toolbox.getAuthorizerMapper(); + + toolbox.getChatHandlerProvider().register(getId(), this, false); + + rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters(); + parseExceptionHandler = new ParseExceptionHandler( + rowIngestionMeters, + ingestionSchema.getTuningConfig().isLogParseExceptions(), + ingestionSchema.getTuningConfig().getMaxParseExceptions(), + ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() ); - } - final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource( - ingestionSchema.getDataSchema().getParser() - ); - final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientFactory().build( - new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()), - getId(), - 1, // always use a single http thread - ingestionSchema.getTuningConfig().getChatHandlerTimeout(), - ingestionSchema.getTuningConfig().getChatHandlerNumRetries() - ); - final Set pushedSegments = generateAndPushSegments( - toolbox, - taskClient, - inputSource, - toolbox.getIndexingTmpDir() - ); + final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource( + ingestionSchema.getDataSchema().getParser() + ); - // Find inputSegments overshadowed by pushedSegments - final Set allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments()); - allSegments.addAll(pushedSegments); - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(allSegments); - final Set oldSegments = FluentIterable.from(timeline.findFullyOvershadowed()) - .transformAndConcat(TimelineObjectHolder::getObject) - .transform(PartitionChunk::getObject) - .toSet(); - taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), oldSegments, pushedSegments)); - - return TaskStatus.success(getId()); + final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientFactory().build( + new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()), + getId(), + 1, // always use a single http thread + ingestionSchema.getTuningConfig().getChatHandlerTimeout(), + ingestionSchema.getTuningConfig().getChatHandlerNumRetries() + ); + ingestionState = IngestionState.BUILD_SEGMENTS; + final Set pushedSegments = generateAndPushSegments( + toolbox, + taskClient, + inputSource, + toolbox.getIndexingTmpDir() + ); + + // Find inputSegments overshadowed by pushedSegments + final Set allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments()); + allSegments.addAll(pushedSegments); + final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(allSegments); + final Set oldSegments = FluentIterable.from(timeline.findFullyOvershadowed()) + .transformAndConcat(TimelineObjectHolder::getObject) + .transform(PartitionChunk::getObject) + .toSet(); + + Map taskReport = getTaskCompletionReports(); + taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), oldSegments, pushedSegments, taskReport)); + + toolbox.getTaskReportFileWriter().write(getId(), taskReport); + + return TaskStatus.success(getId()); + } + catch (Exception e) { + LOG.error(e, "Encountered exception in parallel sub task."); + errorMsg = Throwables.getStackTraceAsString(e); + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.failure( + getId(), + errorMsg + ); + } + finally { + toolbox.getChatHandlerProvider().unregister(getId()); + } } @Override @@ -324,13 +391,6 @@ private Set generateAndPushSegments( useLineageBasedSegmentAllocation ); - final RowIngestionMeters rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters(); - final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler( - rowIngestionMeters, - tuningConfig.isLogParseExceptions(), - tuningConfig.getMaxParseExceptions(), - tuningConfig.getMaxSavedParseExceptions() - ); final Appenderator appenderator = BatchAppenderators.newAppenderator( getId(), toolbox.getAppenderatorsManager(), @@ -339,12 +399,7 @@ private Set generateAndPushSegments( dataSchema, tuningConfig, rowIngestionMeters, - new ParseExceptionHandler( - rowIngestionMeters, - tuningConfig.isLogParseExceptions(), - tuningConfig.getMaxParseExceptions(), - tuningConfig.getMaxSavedParseExceptions() - ) + parseExceptionHandler ); boolean exceptionOccurred = false; try ( @@ -424,4 +479,170 @@ private Set generateAndPushSegments( } } } + + @GET + @Path("/unparseableEvents") + @Produces(MediaType.APPLICATION_JSON) + public Response getUnparseableEvents( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + Map> events = new HashMap<>(); + + boolean needsBuildSegments = false; + + if (full != null) { + needsBuildSegments = true; + } else { + switch (ingestionState) { + case BUILD_SEGMENTS: + case COMPLETED: + needsBuildSegments = true; + break; + default: + break; + } + } + + if (needsBuildSegments) { + events.put( + RowIngestionMeters.BUILD_SEGMENTS, + IndexTaskUtils.getMessagesFromSavedParseExceptions( + parseExceptionHandler.getSavedParseExceptions() + ) + ); + } + + return Response.ok(events).build(); + } + + private Map doGetRowStats(String full) + { + Map returnMap = new HashMap<>(); + Map totalsMap = new HashMap<>(); + Map averagesMap = new HashMap<>(); + + boolean needsBuildSegments = false; + + if (full != null) { + needsBuildSegments = true; + } else { + switch (ingestionState) { + case BUILD_SEGMENTS: + case COMPLETED: + needsBuildSegments = true; + break; + default: + break; + } + } + + if (needsBuildSegments) { + totalsMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getTotals() + ); + averagesMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getMovingAverages() + ); + } + + returnMap.put("totals", totalsMap); + returnMap.put("movingAverages", averagesMap); + return returnMap; + } + + @GET + @Path("/rowStats") + @Produces(MediaType.APPLICATION_JSON) + public Response getRowStats( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + return Response.ok(doGetRowStats(full)).build(); + } + + @VisibleForTesting + public Map doGetLiveReports(String full) + { + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + Map events = getTaskCompletionUnparseableEvents(); + + payload.put("ingestionState", ingestionState); + payload.put("unparseableEvents", events); + payload.put("rowStats", doGetRowStats(full)); + + ingestionStatsAndErrors.put("taskId", getId()); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + return returnMap; + } + + @GET + @Path("/liveReports") + @Produces(MediaType.APPLICATION_JSON) + public Response getLiveReports( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + return Response.ok(doGetLiveReports(full)).build(); + } + + private Map getTaskCompletionRowStats() + { + Map metrics = new HashMap<>(); + metrics.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getTotals() + ); + return metrics; + } + + /** + * Generate an IngestionStatsAndErrorsTaskReport for the task. + ** + * @return + */ + private Map getTaskCompletionReports() + { + return TaskReport.buildTaskReports( + new IngestionStatsAndErrorsTaskReport( + getId(), + new IngestionStatsAndErrorsTaskReportData( + IngestionState.COMPLETED, + getTaskCompletionUnparseableEvents(), + getTaskCompletionRowStats(), + errorMsg, + false // not applicable for parallel subtask + ) + ) + ); + } + + private Map getTaskCompletionUnparseableEvents() + { + Map unparseableEventsMap = new HashMap<>(); + List parseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions( + parseExceptionHandler.getSavedParseExceptions() + ); + + if (parseExceptionMessages != null) { + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, parseExceptionMessages); + } else { + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, ImmutableList.of()); + } + + return unparseableEventsMap; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 82bc1f386320..7fa0a4a1f9de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -177,7 +177,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, null, - null, + 5, null, null ); @@ -311,7 +311,7 @@ protected CoordinatorClient getCoordinatorClient() return coordinatorClient; } - private static class TaskContainer + protected static class TaskContainer { private final Task task; @MonotonicNonNull @@ -324,6 +324,11 @@ private TaskContainer(Task task) this.task = task; } + public Task getTask() + { + return task; + } + private void setStatusFuture(Future statusFuture) { this.statusFuture = statusFuture; @@ -420,6 +425,11 @@ private TaskStatus waitToFinish(Task task, long waitTime, TimeUnit timeUnit) } } + private TaskContainer getTaskContainer(String taskId) + { + return tasks.get(taskId); + } + private Future runTask(Task task) { final TaskContainer taskContainer = new TaskContainer(task); @@ -530,6 +540,11 @@ public String runTask(String taskId, Object taskObject) return taskRunner.run(injectIfNeeded(task)); } + public TaskContainer getTaskContainer(String taskId) + { + return taskRunner.getTaskContainer(taskId); + } + public TaskStatus runAndWait(Task task) { return taskRunner.runAndWait(injectIfNeeded(task)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index ecb910b2c9e8..184ff84a70b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSplit; @@ -708,7 +709,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception taskClient.report( getSupervisorTaskId(), - new PushedSegmentsReport(getId(), Collections.emptySet(), Collections.singleton(segment)) + new PushedSegmentsReport(getId(), Collections.emptySet(), Collections.singleton(segment), ImmutableMap.of()) ); return TaskStatus.fromCode(getId(), state); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java new file mode 100644 index 000000000000..05d0a1fe9de3 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class PushedSegmentsReportTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index f04559f26c83..fdcab963885c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -37,6 +39,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -61,6 +64,7 @@ import java.nio.file.Files; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -105,6 +109,14 @@ public void setup() throws IOException Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) { writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 + i, i)); writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i)); + if (i == 0) { + // thrown away due to timestamp outside interval + writer.write(StringUtils.format("2012-12-%d,%d th test file\n", 25 + i, i)); + // unparseable metric value + writer.write(StringUtils.format("2017-12-%d,%d th test file,badval\n", 25 + i, i)); + // unparseable row + writer.write(StringUtils.format("2017unparseable\n")); + } } } @@ -114,6 +126,7 @@ public void setup() throws IOException writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i)); } } + getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class); } @@ -153,7 +166,7 @@ public void testIsReady() throws Exception } } - private void runTestTask( + private ParallelIndexSupervisorTask runTestTask( @Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting, @@ -170,9 +183,11 @@ private void runTestTask( appendToExisting, originalSegmentsIfAppend ); + TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId()); + return (ParallelIndexSupervisorTask) taskContainer.getTask(); } - private void runOverwriteTask( + private ParallelIndexSupervisorTask runOverwriteTask( @Nullable Interval interval, Granularity segmentGranularity, LockGranularity actualLockGranularity @@ -182,6 +197,8 @@ private void runOverwriteTask( task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); assertShardSpecAfterOverwrite(task, actualLockGranularity); + TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId()); + return (ParallelIndexSupervisorTask) taskContainer.getTask(); } private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity) @@ -296,6 +313,58 @@ public void testRunInParallel() testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY); } + @Test() + public void testRunInParallelTaskReports() + { + ParallelIndexSupervisorTask task = runTestTask( + Intervals.of("2017-12/P1M"), + Granularities.DAY, + false, + Collections.emptyList() + ); + Map actualReports = task.doGetLiveReports("full"); + Map expectedReports = getExpectedTaskReportParallel( + task.getId(), + ImmutableList.of( + "Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}", + "Found unparseable columns in row: [MapBasedInputRow{timestamp=2017-12-25T00:00:00.000Z," + + " event={ts=2017-12-25, dim=0 th test file, val=badval}, dimensions=[ts, dim]}], " + + "exceptions: [Unable to parse value[badval] for field[val]]" + ), + new RowIngestionMetersTotals( + 10, + 1, + 1, + 1) + ); + Assert.assertEquals(expectedReports, actualReports); + } + + private Map getExpectedTaskReportParallel( + String taskId, + List expectedUnparseableEvents, + RowIngestionMetersTotals expectedTotals + ) + { + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + + payload.put("ingestionState", IngestionState.COMPLETED); + payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents)); + payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals))); + + ingestionStatsAndErrors.put("taskId", taskId); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + return returnMap; + } + + // + // Ingest all data. + @Test public void testWithoutIntervalWithDifferentSegmentGranularity() { @@ -318,6 +387,102 @@ public void testRunInSequential() task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList()); + + TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId()); + final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask(); + Map actualReports = executedTask.doGetLiveReports("full"); + + RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals( + 10, + 1, + 1, + 1 + ); + List expectedUnparseableEvents = ImmutableList.of( + "Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}", + "Found unparseable columns in row: [MapBasedInputRow{timestamp=2017-12-25T00:00:00.000Z," + + " event={ts=2017-12-25, dim=0 th test file, val=badval}, dimensions=[ts, dim]}], " + + "exceptions: [Unable to parse value[badval] for field[val]]" + ); + + Map expectedReports; + if (useInputFormatApi) { + expectedReports = getExpectedTaskReportSequential( + task.getId(), + expectedUnparseableEvents, + expectedTotals + ); + } else { + // when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses the single phase runner + // instead of sequential runner + expectedReports = getExpectedTaskReportParallel( + task.getId(), + expectedUnparseableEvents, + expectedTotals + ); + } + + Assert.assertEquals(expectedReports, actualReports); + System.out.println(actualReports); + } + + private Map getExpectedTaskReportSequential( + String taskId, + List expectedUnparseableEvents, + RowIngestionMetersTotals expectedTotals + ) + { + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + + payload.put("ingestionState", IngestionState.COMPLETED); + payload.put( + "unparseableEvents", + ImmutableMap.of( + "determinePartitions", ImmutableList.of(), + "buildSegments", expectedUnparseableEvents + ) + ); + Map emptyAverageMinuteMap = ImmutableMap.of( + "processed", 0.0, + "unparseable", 0.0, + "thrownAway", 0.0, + "processedWithError", 0.0 + ); + + Map emptyAverages = ImmutableMap.of( + "1m", emptyAverageMinuteMap, + "5m", emptyAverageMinuteMap, + "15m", emptyAverageMinuteMap + ); + + payload.put( + "rowStats", + ImmutableMap.of( + "movingAverages", + ImmutableMap.of( + "determinePartitions", + emptyAverages, + "buildSegments", + emptyAverages + ), + "totals", + ImmutableMap.of( + "determinePartitions", + new RowIngestionMetersTotals(0, 0, 0, 0), + "buildSegments", + expectedTotals + ) + ) + ); + + ingestionStatsAndErrors.put("taskId", taskId); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + return returnMap; } @Test diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java index d9c6ce5d327a..89bac7591996 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; + public class RowIngestionMetersTotals { private final long processed; @@ -66,4 +68,37 @@ public long getUnparseable() { return unparseable; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RowIngestionMetersTotals that = (RowIngestionMetersTotals) o; + return processed == that.processed + && processedWithError == that.processedWithError + && thrownAway == that.thrownAway + && unparseable == that.unparseable; + } + + @Override + public int hashCode() + { + return Objects.hash(processed, processedWithError, thrownAway, unparseable); + } + + @Override + public String toString() + { + return "RowIngestionMetersTotals{" + + "processed=" + processed + + ", processedWithError=" + processedWithError + + ", thrownAway=" + thrownAway + + ", unparseable=" + unparseable + + '}'; + } } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java new file mode 100644 index 000000000000..197b9842128d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class RowIngestionMetersTotalsTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(RowIngestionMetersTotals.class).usingGetClass().verify(); + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 60bbae63ce90..afab3b536544 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -336,6 +336,32 @@ public TaskPayloadResponse getTaskPayload(String taskId) } } + @Nullable + @Override + public Map getTaskReport(String taskId) + { + try { + final StringFullResponseHolder responseHolder = druidLeaderClient.go( + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format("/druid/indexer/v1/task/%s/reports", StringUtils.urlEncode(taskId)) + ) + ); + + if (responseHolder.getContent().length() == 0) { + return null; + } + + return jsonMapper.readValue( + responseHolder.getContent(), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override public Map> getLockedIntervals(Map minTaskPriority) { diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index c379077b6b82..84f9f554681b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -66,6 +66,9 @@ String compactSegments( @Nullable TaskPayloadResponse getTaskPayload(String taskId); + @Nullable + Map getTaskReport(String taskId); + /** * Gets a List of Intervals locked by higher priority tasks for each datasource. * diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index ff860eef2e62..67b99b95a645 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -39,6 +39,7 @@ import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Map; public class HttpIndexingServiceClientTest { @@ -161,4 +162,70 @@ public SamplerResponse sample() httpIndexingServiceClient.sample(samplerSpec); EasyMock.verify(druidLeaderClient, response); } + + @Test + public void testGetTaskReport() throws Exception + { + String taskId = "testTaskId"; + HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); + EasyMock.replay(response); + + Map dummyResponse = ImmutableMap.of("test", "value"); + + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + HttpResponseStatus.OK, + response, + StandardCharsets.UTF_8 + ).addChunk(jsonMapper.writeValueAsString(dummyResponse)); + + EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class))) + .andReturn(responseHolder) + .anyTimes(); + + EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/task/testTaskId/reports")) + .andReturn(new Request( + HttpMethod.GET, + new URL("http://localhost:8090/druid/indexer/v1/task/testTaskId/reports") + )) + .anyTimes(); + EasyMock.replay(druidLeaderClient); + + final Map actualResponse = httpIndexingServiceClient.getTaskReport(taskId); + Assert.assertEquals(dummyResponse, actualResponse); + + EasyMock.verify(druidLeaderClient, response); + } + + @Test + public void testGetTaskReportEmpty() throws Exception + { + String taskId = "testTaskId"; + HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); + EasyMock.replay(response); + + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + HttpResponseStatus.OK, + response, + StandardCharsets.UTF_8 + ).addChunk(""); + + EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class))) + .andReturn(responseHolder) + .anyTimes(); + + EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/task/testTaskId/reports")) + .andReturn(new Request( + HttpMethod.GET, + new URL("http://localhost:8090/druid/indexer/v1/task/testTaskId/reports") + )) + .anyTimes(); + EasyMock.replay(druidLeaderClient); + + final Map actualResponse = httpIndexingServiceClient.getTaskReport(taskId); + Assert.assertNull(actualResponse); + + EasyMock.verify(druidLeaderClient, response); + } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index af307a467f9f..2035dbae76e9 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -107,6 +107,13 @@ public TaskPayloadResponse getTaskPayload(String taskId) return null; } + @Nullable + @Override + public Map getTaskReport(String taskId) + { + return null; + } + @Override public Map> getLockedIntervals(Map minTaskPriority) {