Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ For some task types, the indexing task can wait for the newly ingested segments

|Field|Description|
|---|---|
|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.|
|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.|
|`segmentsRead`|Number of segments read by compaction task.|
|`segmentsPublished`|Number of segments published by compaction task.|

### Live report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis;
import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.input.TaskInputSource;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -540,12 +541,18 @@ public TaskStatus runTask(final TaskToolbox toolbox)

private void updateAndWriteCompletionReports(TaskToolbox toolbox)
{
completionReports = buildIngestionStatsReport(ingestionState, errorMsg, null, null);
updateAndWriteCompletionReports(toolbox, null, null);
}

private void updateAndWriteCompletionReports(TaskToolbox toolbox, Long segmentsRead, Long segmentsPublished)
{
completionReports = buildIngestionStatsReport(ingestionState, errorMsg, segmentsRead, segmentsPublished);
if (isStandAloneTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
}


@Override
protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
Expand Down Expand Up @@ -1004,7 +1011,14 @@ private TaskStatus generateAndPublishSegments(

log.debugSegments(published.getSegments(), "Published segments");

updateAndWriteCompletionReports(toolbox);
updateAndWriteCompletionReports(
toolbox,
// only applicable to the compaction use cases
inputSource instanceof DruidInputSource
? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead()
: null,
(long) published.getSegments().size()
);
return TaskStatus.success(getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
Expand All @@ -78,6 +79,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -155,6 +157,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI

@Nullable
private final TaskToolbox toolbox;
@Nullable
private Integer numSegmentsInTimeline;

@JsonCreator
public DruidInputSource(
Expand Down Expand Up @@ -362,11 +366,21 @@ InputRowSchema getInputRowSchemaToUse(InputRowSchema inputRowSchema)

private List<TimelineObjectHolder<String, DataSegment>> createTimeline()
{
List<TimelineObjectHolder<String, DataSegment>> timeline;
if (interval == null) {
return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds);
timeline = getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds);
} else {
return getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval);
timeline = getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval);
}

Set<SegmentId> ids = new HashSet<>();
for (TimelineObjectHolder<String, DataSegment> holder : timeline) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
ids.add(chunk.getObject().getId());
}
}
numSegmentsInTimeline = ids.size();
return timeline;
}

@Override
Expand Down Expand Up @@ -620,4 +634,13 @@ public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegm

return new ArrayList<>(timeline.values());
}

/**
* @return Number of segments read by this input source. This value is null until
* the method {@link #fixedFormatReader} has been invoked on this input source.
*/
public int getNumberOfSegmentsRead()
{
return numSegmentsInTimeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
Expand Down Expand Up @@ -318,6 +320,16 @@ public void testRunWithDynamicPartitioning() throws Exception

List<String> rowsFromSegment = getCSVFormatRowsFromSegments(segments);
Assert.assertEquals(TEST_ROWS, rowsFromSegment);

List<IngestionStatsAndErrors> reports = getIngestionReports();
Assert.assertEquals(
3L,
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsPublished).sum()
);
Assert.assertEquals(
6L,
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum()
);
}

@Test
Expand Down Expand Up @@ -2019,7 +2031,7 @@ public List<StorageLocationConfig> getLocations()
.taskWorkDir(temporaryFolder.newFolder())
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
Expand All @@ -36,8 +37,11 @@
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.RegexParseSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
Expand Down Expand Up @@ -97,6 +101,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
Expand All @@ -114,6 +119,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
protected File reportsFile;
Comment thread
ac9817 marked this conversation as resolved.

@Before
public void setUpIngestionTestBase() throws IOException
Expand All @@ -139,6 +145,7 @@ public void setUpIngestionTestBase() throws IOException
);
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper());
reportsFile = temporaryFolder.newFile();
}

@After
Expand Down Expand Up @@ -502,4 +509,20 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
throw new UnsupportedOperationException();
}
}

public Map<String, TaskReport> getReports() throws IOException
{
return objectMapper.readValue(reportsFile, new TypeReference<Map<String, TaskReport>>()
{
});
}

public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
{
return getReports().entrySet()
.stream()
.filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
.map(entry -> (IngestionStatsAndErrors) entry.getValue().getPayload())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
Expand Down Expand Up @@ -50,13 +49,10 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand All @@ -65,7 +61,6 @@
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
Expand Down Expand Up @@ -236,7 +231,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
private CoordinatorClient coordinatorClient;
// An executor that executes API calls using a different thread from the caller thread as if they were remote calls.
private ExecutorService remoteApiExecutor;
private File reportsFile;

protected AbstractParallelIndexSupervisorTaskTest(
double transientTaskFailureRate,
Expand All @@ -262,7 +256,6 @@ public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException
remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
prepareObjectMapper(objectMapper, getIndexIO());
reportsFile = temporaryFolder.newFile();
}

@After
Expand Down Expand Up @@ -701,7 +694,6 @@ public File getStorageDirectory()
.taskWorkDir(temporaryFolder.newFolder(task.getId()))
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.intermediaryDataManager(intermediaryDataManager)
.taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
Expand Down Expand Up @@ -1066,20 +1058,4 @@ public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segm
throw new ISE("Can't find segment for id[%s]", segmentId);
}
}

public Map<String, TaskReport> getReports() throws IOException
{
return objectMapper.readValue(reportsFile, new TypeReference<Map<String, TaskReport>>()
{
});
}

public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
{
return getReports().entrySet()
.stream()
.filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
.map(entry -> (IngestionStatsAndErrors) entry.getValue().getPayload())
.collect(Collectors.toList());
}
}