From b2ac68fe69ab7f7edf21d71c00259678c41a703f Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 19 Mar 2024 10:38:53 -0500 Subject: [PATCH 1/6] Populate segment stats for non-parallel compaction jobs --- docs/ingestion/tasks.md | 4 ++-- .../druid/indexing/common/task/IndexTask.java | 19 ++++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 4b6153fa26ac..ab206c757627 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -160,8 +160,8 @@ For some task types, the indexing task can wait for the newly ingested segments |Field|Description| |---|---| -|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.| -|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.| +|`segmentsRead`|Number of segments read by compaction task.| +|`segmentsPublished`|Number of segments published by compaction task.| ### Live report diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 50e13a93c0be..6effa5bca4f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -584,13 +584,18 @@ public TaskStatus runTask(final TaskToolbox toolbox) private void updateAndWriteCompletionReports(TaskToolbox toolbox) { - completionReports = getTaskCompletionReports(); + updateAndWriteCompletionReports(toolbox, null, null); + } + + private void updateAndWriteCompletionReports(TaskToolbox toolbox, Integer segmentsRead, Integer segmentsPublished) + { + completionReports = getTaskCompletionReports(segmentsRead, segmentsPublished); if (isStandAloneTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } - private Map getTaskCompletionReports() + private Map getTaskCompletionReports(Integer segmentsRead, Integer segmentsPublished) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( @@ -603,8 +608,8 @@ private Map getTaskCompletionReports() segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, Collections.emptyMap(), - null, - null + Long.valueOf(segmentsRead), + Long.valueOf(segmentsPublished) ) ) ); @@ -1071,7 +1076,11 @@ private TaskStatus generateAndPublishSegments( log.debugSegments(published.getSegments(), "Published segments"); - updateAndWriteCompletionReports(toolbox); + updateAndWriteCompletionReports( + toolbox, + getTaskLockHelper().getLockedExistingSegments().size(), + published.getSegments().size() + ); return TaskStatus.success(getId()); } } From 4ad05dd35f6f9c915e5cc053d7a1591cf307f0d3 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 27 Mar 2024 17:34:24 -0500 Subject: [PATCH 2/6] fix --- .../druid/indexing/common/task/IndexTask.java | 16 ++++++++++------ .../druid/indexing/input/DruidInputSource.java | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 6effa5bca4f0..68b2c43f1c42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.input.TaskInputSource; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.java.util.common.IAE; @@ -587,7 +588,7 @@ private void updateAndWriteCompletionReports(TaskToolbox toolbox) updateAndWriteCompletionReports(toolbox, null, null); } - private void updateAndWriteCompletionReports(TaskToolbox toolbox, Integer segmentsRead, Integer segmentsPublished) + private void updateAndWriteCompletionReports(TaskToolbox toolbox, Long segmentsRead, Long segmentsPublished) { completionReports = getTaskCompletionReports(segmentsRead, segmentsPublished); if (isStandAloneTask) { @@ -595,7 +596,7 @@ private void updateAndWriteCompletionReports(TaskToolbox toolbox, Integer segmen } } - private Map getTaskCompletionReports(Integer segmentsRead, Integer segmentsPublished) + private Map getTaskCompletionReports(Long segmentsRead, Long segmentsPublished) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( @@ -608,8 +609,8 @@ private Map getTaskCompletionReports(Integer segmentsRead, I segmentAvailabilityConfirmationCompleted, segmentAvailabilityWaitTimeMs, Collections.emptyMap(), - Long.valueOf(segmentsRead), - Long.valueOf(segmentsPublished) + segmentsRead, + segmentsPublished ) ) ); @@ -1078,8 +1079,11 @@ private TaskStatus generateAndPublishSegments( updateAndWriteCompletionReports( toolbox, - getTaskLockHelper().getLockedExistingSegments().size(), - published.getSegments().size() + // only applicable to the compaction use cases + inputSource instanceof DruidInputSource + ? (long) ((DruidInputSource) inputSource).estimateSegmentsCount() + : null, + (long) published.getSegments().size() ); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 890a7c313fa4..eeb65c22328d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -62,6 +62,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.partition.PartitionChunk; @@ -78,6 +79,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -620,4 +622,20 @@ public static List> getTimelineForSegm return new ArrayList<>(timeline.values()); } + + public int estimateSegmentsCount() + { + if (segmentIds != null) { + return segmentIds.size(); + } + + Set ids = new HashSet<>(); + for (TimelineObjectHolder holder : createTimeline()) { + for (PartitionChunk chunk : holder.getObject()) { + ids.add(chunk.getObject().getId()); + } + } + + return ids.size(); + } } From efa73d12ceee2d263bfbbdec71bc259ffc8291a7 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 27 Mar 2024 18:04:30 -0500 Subject: [PATCH 3/6] add-tests --- .../common/task/CompactionTaskRunTest.java | 14 ++++++++++- .../common/task/IngestionTestBase.java | 23 ++++++++++++++++++ ...stractParallelIndexSupervisorTaskTest.java | 24 ------------------- 3 files changed, 36 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 03994e35e518..b83c499b8c72 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -42,10 +42,12 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; +import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; @@ -318,6 +320,16 @@ public void testRunWithDynamicPartitioning() throws Exception List rowsFromSegment = getCSVFormatRowsFromSegments(segments); Assert.assertEquals(TEST_ROWS, rowsFromSegment); + + List reports = getIngestionReports(); + Assert.assertEquals( + 3L, + reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished).sum() + ); + Assert.assertEquals( + 3L, + reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum() + ); } @Test @@ -2019,7 +2031,7 @@ public List getLocations() .taskWorkDir(temporaryFolder.newFolder()) .indexIO(getIndexIO()) .indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) - .taskReportFileWriter(new NoopTestTaskReportFileWriter()) + .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile)) .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) .chatHandlerProvider(new NoopChatHandlerProvider()) .rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory()) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index b82dafccc6be..6ca6f8bcd65f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; @@ -36,8 +37,11 @@ import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.RegexParseSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.SegmentInsertAction; @@ -97,6 +101,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.stream.Collectors; public abstract class IngestionTestBase extends InitializedNullHandlingTest { @@ -114,6 +119,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest private SegmentsMetadataManager segmentsMetadataManager; private TaskLockbox lockbox; private File baseDir; + protected File reportsFile; @Before public void setUpIngestionTestBase() throws IOException @@ -139,6 +145,7 @@ public void setUpIngestionTestBase() throws IOException ); lockbox = new TaskLockbox(taskStorage, storageCoordinator); segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper()); + reportsFile = temporaryFolder.newFile(); } @After @@ -502,4 +509,20 @@ public Map getBlacklistedTaskSlotCount() throw new UnsupportedOperationException(); } } + + public Map getReports() throws IOException + { + return objectMapper.readValue(reportsFile, new TypeReference>() + { + }); + } + + public List getIngestionReports() throws IOException + { + return getReports().entrySet() + .stream() + .filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY)) + .map(entry -> (IngestionStatsAndErrorsTaskReportData) entry.getValue().getPayload()) + .collect(Collectors.toList()); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 6b662e473b05..680b9f4e3fb4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; @@ -50,13 +49,10 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -65,7 +61,6 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.IngestionTestBase; -import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; @@ -236,7 +231,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase private CoordinatorClient coordinatorClient; // An executor that executes API calls using a different thread from the caller thread as if they were remote calls. private ExecutorService remoteApiExecutor; - private File reportsFile; protected AbstractParallelIndexSupervisorTaskTest( double transientTaskFailureRate, @@ -262,7 +256,6 @@ public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor"); coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor); prepareObjectMapper(objectMapper, getIndexIO()); - reportsFile = temporaryFolder.newFile(); } @After @@ -701,7 +694,6 @@ public File getStorageDirectory() .taskWorkDir(temporaryFolder.newFolder(task.getId())) .indexIO(getIndexIO()) .indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) - .taskReportFileWriter(new NoopTestTaskReportFileWriter()) .intermediaryDataManager(intermediaryDataManager) .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile)) .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) @@ -1066,20 +1058,4 @@ public ListenableFuture fetchSegment(String dataSource, String segm throw new ISE("Can't find segment for id[%s]", segmentId); } } - - public Map getReports() throws IOException - { - return objectMapper.readValue(reportsFile, new TypeReference>() - { - }); - } - - public List getIngestionReports() throws IOException - { - return getReports().entrySet() - .stream() - .filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY)) - .map(entry -> (IngestionStatsAndErrorsTaskReportData) entry.getValue().getPayload()) - .collect(Collectors.toList()); - } } From b004b484822d5a298f1adc29195e424251de8061 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 28 Mar 2024 12:13:44 -0500 Subject: [PATCH 4/6] comments --- .../druid/indexing/common/task/IndexTask.java | 2 +- .../indexing/input/DruidInputSource.java | 33 ++++++++++++++----- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 68b2c43f1c42..321a8e8c9112 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1081,7 +1081,7 @@ private TaskStatus generateAndPublishSegments( toolbox, // only applicable to the compaction use cases inputSource instanceof DruidInputSource - ? (long) ((DruidInputSource) inputSource).estimateSegmentsCount() + ? (long) ((DruidInputSource) inputSource).segmentsCount() : null, (long) published.getSegments().size() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index eeb65c22328d..af280767300a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -157,6 +157,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI @Nullable private final TaskToolbox toolbox; + @Nullable + private Integer segmentsCount; @JsonCreator public DruidInputSource( @@ -364,11 +366,21 @@ InputRowSchema getInputRowSchemaToUse(InputRowSchema inputRowSchema) private List> createTimeline() { + List> timeline; if (interval == null) { - return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); + timeline = getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); } else { - return getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval); + timeline = getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval); + } + + Set ids = new HashSet<>(); + for (TimelineObjectHolder holder : timeline) { + for (PartitionChunk chunk : holder.getObject()) { + ids.add(chunk.getObject().getId()); + } } + segmentsCount = ids.size(); + return timeline; } @Override @@ -623,19 +635,22 @@ public static List> getTimelineForSegm return new ArrayList<>(timeline.values()); } - public int estimateSegmentsCount() + /** + * Should ideally be called after creating the reader with {@link #reader(InputRowSchema, InputFormat, File)} + * call to not be a costly operation. + * + * @return count of segments handled by this input source + */ + public int segmentsCount() { if (segmentIds != null) { return segmentIds.size(); } - Set ids = new HashSet<>(); - for (TimelineObjectHolder holder : createTimeline()) { - for (PartitionChunk chunk : holder.getObject()) { - ids.add(chunk.getObject().getId()); - } + if (segmentsCount == null) { + createTimeline(); } - return ids.size(); + return segmentsCount; } } From 855dc3331755b39656e941f66d1e80c2a8f9174d Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 28 Mar 2024 13:28:56 -0500 Subject: [PATCH 5/6] update-test --- .../druid/indexing/common/task/CompactionTaskRunTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index a6afedcb54b6..7c14bc96dff9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -327,7 +327,7 @@ public void testRunWithDynamicPartitioning() throws Exception reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsPublished).sum() ); Assert.assertEquals( - 3L, + 6L, reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum() ); } From d0da0682828177ee7f2fe7410fc394a474d25547 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 28 Mar 2024 23:20:44 -0500 Subject: [PATCH 6/6] comments --- .../druid/indexing/common/task/IndexTask.java | 2 +- .../indexing/input/DruidInputSource.java | 22 +++++-------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 672a6fd82354..5c538d4a0fc3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1015,7 +1015,7 @@ private TaskStatus generateAndPublishSegments( toolbox, // only applicable to the compaction use cases inputSource instanceof DruidInputSource - ? (long) ((DruidInputSource) inputSource).segmentsCount() + ? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead() : null, (long) published.getSegments().size() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index af280767300a..dd1998645b34 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -158,7 +158,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI @Nullable private final TaskToolbox toolbox; @Nullable - private Integer segmentsCount; + private Integer numSegmentsInTimeline; @JsonCreator public DruidInputSource( @@ -379,7 +379,7 @@ private List> createTimeline() ids.add(chunk.getObject().getId()); } } - segmentsCount = ids.size(); + numSegmentsInTimeline = ids.size(); return timeline; } @@ -636,21 +636,11 @@ public static List> getTimelineForSegm } /** - * Should ideally be called after creating the reader with {@link #reader(InputRowSchema, InputFormat, File)} - * call to not be a costly operation. - * - * @return count of segments handled by this input source + * @return Number of segments read by this input source. This value is null until + * the method {@link #fixedFormatReader} has been invoked on this input source. */ - public int segmentsCount() + public int getNumberOfSegmentsRead() { - if (segmentIds != null) { - return segmentIds.size(); - } - - if (segmentsCount == null) { - createTimeline(); - } - - return segmentsCount; + return numSegmentsInTimeline; } }