diff --git a/docs/development/modules.md b/docs/development/modules.md index d1080b630404..011028808448 100644 --- a/docs/development/modules.md +++ b/docs/development/modules.md @@ -137,7 +137,7 @@ d.azure.] as [org.apache.druid.storage.azure.AzureAccountConfig@759c9ad9] ip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0] 2015-04-14T02:49:08,276 INFO [ZkCoordinator-0] org.apache.druid.storage.azure.AzureDataSegmentPuller - Loaded 1196 bytes from [dde/2015-01-02T00:00:00.000Z_2015-01-03 T00:00:00.000Z/2015-04-14T02:41:09.484Z/0/index.zip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0] -2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196] +2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196] 2015-04-14T02:49:08,282 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] at path[/druid/dev/segments/192.168.33.104:8081/192.168.33.104:8081_historical__default_tier_2015-04-14T02:49:08.282Z_7bb87230ebf940188511dd4a53ffd7351] 2015-04-14T02:49:08,292 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.ZkCoordinator - Completed request [LOAD: dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] ``` diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index ca3630d635d0..54242883c01b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -60,7 +60,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; @@ -2887,7 +2887,7 @@ public void close() DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), + new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 5b3a77cff91f..3d0f86d67f53 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -48,7 +48,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; @@ -2974,7 +2974,7 @@ public void close() DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), + new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java similarity index 77% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java rename to indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java index 17b8dc131648..6672cf0dca46 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java @@ -22,10 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.druid.guice.annotations.Json; -import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import java.io.File; @@ -34,25 +33,21 @@ /** * */ -public class SegmentLoaderFactory +public class SegmentCacheManagerFactory { - private final IndexIO indexIO; private final ObjectMapper jsonMapper; @Inject - public SegmentLoaderFactory( - IndexIO indexIO, + public SegmentCacheManagerFactory( @Json ObjectMapper mapper ) { - this.indexIO = indexIO; this.jsonMapper = mapper; } - public SegmentLoader manufacturate(File storageDir) + public SegmentCacheManager manufacturate(File storageDir) { - return new SegmentLoaderLocalCacheManager( - indexIO, + return new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations( Collections.singletonList(new StorageLocationConfig(storageDir, null, null))), jsonMapper diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index dbaec5a757f7..979de5f66ec6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -57,7 +57,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -101,7 +101,7 @@ public class TaskToolbox private final Provider monitorSchedulerProvider; private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; - private final SegmentLoader segmentLoader; + private final SegmentCacheManager segmentCacheManager; private final ObjectMapper jsonMapper; private final File taskWorkDir; private final IndexIO indexIO; @@ -144,7 +144,7 @@ public TaskToolbox( QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, @Nullable Provider monitorSchedulerProvider, - SegmentLoader segmentLoader, + SegmentCacheManager segmentCacheManager, ObjectMapper jsonMapper, File taskWorkDir, IndexIO indexIO, @@ -183,7 +183,7 @@ public TaskToolbox( this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.monitorSchedulerProvider = monitorSchedulerProvider; - this.segmentLoader = segmentLoader; + this.segmentCacheManager = segmentCacheManager; this.jsonMapper = jsonMapper; this.taskWorkDir = taskWorkDir; this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); @@ -318,7 +318,7 @@ public Map fetchSegments(List segments) { Map retVal = Maps.newLinkedHashMap(); for (DataSegment segment : segments) { - retVal.put(segment, segmentLoader.getSegmentFiles(segment)); + retVal.put(segment, segmentCacheManager.getSegmentFiles(segment)); } return retVal; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 5cd4eb56067b..cb1d3c13218d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -83,7 +83,7 @@ public class TaskToolboxFactory private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final Provider monitorSchedulerProvider; - private final SegmentLoaderFactory segmentLoaderFactory; + private final SegmentCacheManagerFactory segmentCacheManagerFactory; private final ObjectMapper jsonMapper; private final IndexIO indexIO; private final Cache cache; @@ -124,7 +124,7 @@ public TaskToolboxFactory( QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Provider monitorSchedulerProvider, - SegmentLoaderFactory segmentLoaderFactory, + SegmentCacheManagerFactory segmentCacheManagerFactory, @Json ObjectMapper jsonMapper, IndexIO indexIO, Cache cache, @@ -162,7 +162,7 @@ public TaskToolboxFactory( this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.monitorSchedulerProvider = monitorSchedulerProvider; - this.segmentLoaderFactory = segmentLoaderFactory; + this.segmentCacheManagerFactory = segmentCacheManagerFactory; this.jsonMapper = jsonMapper; this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; @@ -204,7 +204,7 @@ public TaskToolbox build(Task task) queryProcessingPool, joinableFactory, monitorSchedulerProvider, - segmentLoaderFactory.manufacturate(taskWorkDir), + segmentCacheManagerFactory.manufacturate(taskWorkDir), jsonMapper, taskWorkDir, indexIO, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 879949dbaa2b..3374e8c8a246 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -52,7 +52,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -157,7 +157,7 @@ public class CompactionTask extends AbstractBatchIndexTask private final PartitionConfigurationManager partitionConfigurationManager; @JsonIgnore - private final SegmentLoaderFactory segmentLoaderFactory; + private final SegmentCacheManagerFactory segmentCacheManagerFactory; @JsonIgnore private final RetryPolicyFactory retryPolicyFactory; @@ -185,7 +185,7 @@ public CompactionTask( @JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map context, - @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, @JacksonInject RetryPolicyFactory retryPolicyFactory ) { @@ -233,7 +233,7 @@ public CompactionTask( this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null; this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); - this.segmentLoaderFactory = segmentLoaderFactory; + this.segmentCacheManagerFactory = segmentCacheManagerFactory; this.retryPolicyFactory = retryPolicyFactory; } @@ -422,7 +422,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception metricsSpec, granularitySpec, toolbox.getCoordinatorClient(), - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, ioConfig.isDropExisting() ); @@ -521,7 +521,7 @@ static List createIngestionSchema( @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, final CoordinatorClient coordinatorClient, - final SegmentLoaderFactory segmentLoaderFactory, + final SegmentCacheManagerFactory segmentCacheManagerFactory, final RetryPolicyFactory retryPolicyFactory, final boolean dropExisting ) throws IOException, SegmentLoadingException @@ -604,7 +604,7 @@ static List createIngestionSchema( dataSchema, interval, coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, dropExisting ), @@ -632,7 +632,7 @@ static List createIngestionSchema( dataSchema, segmentProvider.interval, coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, dropExisting ), @@ -647,7 +647,7 @@ private static ParallelIndexIOConfig createIoConfig( DataSchema dataSchema, Interval interval, CoordinatorClient coordinatorClient, - SegmentLoaderFactory segmentLoaderFactory, + SegmentCacheManagerFactory segmentCacheManagerFactory, RetryPolicyFactory retryPolicyFactory, boolean dropExisting ) @@ -663,7 +663,7 @@ private static ParallelIndexIOConfig createIoConfig( null, toolbox.getIndexIO(), coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, toolbox.getConfig() ), @@ -1016,7 +1016,7 @@ CompactionTuningConfig computeTuningConfig() public static class Builder { private final String dataSource; - private final SegmentLoaderFactory segmentLoaderFactory; + private final SegmentCacheManagerFactory segmentCacheManagerFactory; private final RetryPolicyFactory retryPolicyFactory; private CompactionIOConfig ioConfig; @@ -1035,12 +1035,12 @@ public static class Builder public Builder( String dataSource, - SegmentLoaderFactory segmentLoaderFactory, + SegmentCacheManagerFactory segmentCacheManagerFactory, RetryPolicyFactory retryPolicyFactory ) { this.dataSource = dataSource; - this.segmentLoaderFactory = segmentLoaderFactory; + this.segmentCacheManagerFactory = segmentCacheManagerFactory; this.retryPolicyFactory = retryPolicyFactory; } @@ -1118,7 +1118,7 @@ public CompactionTask build() granularitySpec, tuningConfig, context, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 01b3d95672e0..f20a0ddd1a1b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -36,7 +36,7 @@ import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IAE; @@ -45,7 +45,7 @@ import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndexStorageAdapter; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; @@ -84,7 +84,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory>> splits; @@ -102,7 +102,7 @@ public IngestSegmentFirehoseFactory( @JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, - @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, @JacksonInject RetryPolicyFactory retryPolicyFactory ) { @@ -119,7 +119,7 @@ public IngestSegmentFirehoseFactory( this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); - this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); + this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory"); this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); } @@ -136,7 +136,7 @@ public FiniteFirehoseFactory> withSplit( maxInputSegmentBytesPerTask, indexIO, coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory ); } @@ -202,7 +202,7 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) // Note: this requires enough local storage space to fit all of the segments, even though // IngestSegmentFirehose iterates over the segments in series. We may want to change this // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. - final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory); Map segmentFileMap = Maps.newLinkedHashMap(); for (TimelineObjectHolder holder : timeLineSegments) { for (PartitionChunk chunk : holder.getObject()) { @@ -210,7 +210,7 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) segmentFileMap.computeIfAbsent(segment, k -> { try { - return segmentLoader.getSegmentFiles(segment); + return segmentCacheManager.getSegmentFiles(segment); } catch (SegmentLoadingException e) { throw new RuntimeException(e); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index c9d0f4e464b2..9be337822a42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -42,7 +42,7 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.java.util.common.IAE; @@ -52,7 +52,7 @@ import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnHolder; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -129,7 +129,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI private final DimFilter dimFilter; private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; - private final SegmentLoaderFactory segmentLoaderFactory; + private final SegmentCacheManagerFactory segmentCacheManagerFactory; private final RetryPolicyFactory retryPolicyFactory; private final TaskConfig taskConfig; @@ -155,7 +155,7 @@ public DruidInputSource( @Nullable @JsonProperty("metrics") List metrics, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, - @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, @JacksonInject RetryPolicyFactory retryPolicyFactory, @JacksonInject TaskConfig taskConfig ) @@ -172,7 +172,7 @@ public DruidInputSource( this.metrics = metrics; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); - this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); + this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory"); this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig"); } @@ -224,7 +224,7 @@ public List getMetrics() @Override protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) { - final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory); final List> timeline = createTimeline(); final Iterator entityIterator = FluentIterable @@ -235,7 +235,7 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu //noinspection ConstantConditions return FluentIterable .from(partitionHolder) - .transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); + .transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval())); }).iterator(); final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter); @@ -339,7 +339,7 @@ public SplittableInputSource> withSplit(InputSplit mockMonitorScheduler, - mockSegmentLoaderFactory, + mockSegmentCacheManagerFactory, ObjectMapper, mockIndexIO, mockCache, @@ -194,12 +194,12 @@ public void testFetchSegments() throws SegmentLoadingException, IOException { File expectedFile = temporaryFolder.newFile(); EasyMock - .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject())) + .expect(mockSegmentCacheManagerFactory.manufacturate(EasyMock.anyObject())) .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); EasyMock .expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject())) .andReturn(expectedFile).anyTimes(); - EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager); + EasyMock.replay(mockSegmentCacheManagerFactory, mockSegmentLoaderLocalCacheManager); DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build(); List segments = ImmutableList.of ( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index d7c5e32260f2..0c6b1c482a21 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -51,7 +51,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -1590,7 +1590,7 @@ public void close() DirectQueryProcessingPool.INSTANCE, // queryExecutorService NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), + new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 04b1b00fe3cc..fd7b69957a9b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -40,7 +40,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; @@ -215,7 +215,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); final CompactionTask.Builder builder = new CompactionTask.Builder( "datasource", - new SegmentLoaderFactory(null, mapper), + new SegmentCacheManagerFactory(mapper), new RetryPolicyFactory(new RetryPolicyConfig()) ); final CompactionTask task = builder @@ -338,7 +338,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); - binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); + binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); binder.bind(IndexingServiceClient.class).toInstance(new NoopIndexingServiceClient()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index e8b6b12bfcd3..1732b3b31d02 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -139,7 +139,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -182,7 +182,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -222,7 +222,7 @@ public void testRunParallelWithRangePartitioning() throws Exception final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -262,7 +262,7 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -300,7 +300,7 @@ public void testRunCompactionStateNotStoreIfContextSetToFalse() final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -328,7 +328,7 @@ public void testCompactHashAndDynamicPartitionedSegments() runIndexTask(null, true); final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -373,7 +373,7 @@ public void testCompactRangeAndDynamicPartitionedSegments() runIndexTask(null, true); final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -456,7 +456,7 @@ public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder @@ -490,7 +490,7 @@ public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() final Builder builder = new Builder( DATA_SOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index a958ee66203f..b6098ea9854c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -41,7 +41,7 @@ import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; @@ -75,9 +75,9 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.LocalLoadSpec; import org.apache.druid.segment.loading.NoopDataSegmentKiller; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; @@ -160,7 +160,7 @@ public static Iterable constructorFeeder() private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); private final IndexingServiceClient indexingServiceClient; private final CoordinatorClient coordinatorClient; - private final SegmentLoaderFactory segmentLoaderFactory; + private final SegmentCacheManagerFactory segmentCacheManagerFactory; private final LockGranularity lockGranularity; private final TestUtils testUtils; @@ -182,7 +182,7 @@ public Collection fetchUsedSegmentsInDataSourceForIntervals( return getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); } }; - segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()); + segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper()); this.lockGranularity = lockGranularity; } @@ -230,7 +230,7 @@ public void testRunWithDynamicPartitioning() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -279,7 +279,7 @@ public void testRunWithHashPartitioning() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -368,7 +368,7 @@ public void testRunCompactionTwice() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -446,7 +446,7 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -543,7 +543,7 @@ public void testWithSegmentGranularity() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -598,7 +598,7 @@ public void testWithGranularitySpecNonNullSegmentGranularityAndNullQueryGranular final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -653,7 +653,7 @@ public void testWithGranularitySpecNonNullQueryGranularityAndNullSegmentGranular final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -698,7 +698,7 @@ public void testWithGranularitySpecNonNullQueryGranularityAndNonNullSegmentGranu final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -731,7 +731,7 @@ public void testWithGranularitySpecNullQueryGranularityAndNullSegmentGranularity final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -775,7 +775,7 @@ public void testCompactThenAppend() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -837,7 +837,7 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -925,7 +925,7 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -994,7 +994,7 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -1046,7 +1046,7 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime2() throws Exceptio final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -1154,7 +1154,7 @@ public void testRunRegularIndexTaskWithIngestSegmentFirehose() throws Exception null, getIndexIO(), coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ), false, @@ -1285,8 +1285,7 @@ private Pair> runTask( private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException { - final SegmentLoader loader = new SegmentLoaderLocalCacheManager( - getIndexIO(), + final SegmentCacheManager loader = new SegmentLocalCacheManager( new SegmentLoaderConfig() { @Override public List getLocations() @@ -1342,11 +1341,11 @@ private List getCSVFormatRowsFromSegments(List segments) th { final File cacheDir = temporaryFolder.newFolder(); - final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(cacheDir); + final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir); List cursors = new ArrayList<>(); for (DataSegment segment : segments) { - final File segmentFile = segmentLoader.getSegmentFiles(segment); + final File segmentFile = segmentCacheManager.getSegmentFiles(segment); final WindowedStorageAdapter adapter = new WindowedStorageAdapter( new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 330ccbf64945..ac9e3d011aef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -57,7 +57,7 @@ import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; @@ -188,7 +188,7 @@ public class CompactionTaskTest private static List SEGMENTS; private TaskToolbox toolbox; - private SegmentLoaderFactory segmentLoaderFactory; + private SegmentCacheManagerFactory segmentCacheManagerFactory; @BeforeClass public static void setupClass() @@ -277,7 +277,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory()); binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); - binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); + binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager()); binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT); } @@ -361,7 +361,7 @@ public void setup() testIndexIO, SEGMENT_MAP ); - segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER); + segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER); } @Test @@ -369,7 +369,7 @@ public void testCreateCompactionTaskWithGranularitySpec() { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); @@ -379,7 +379,7 @@ public void testCreateCompactionTaskWithGranularitySpec() final Builder builder2 = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); @@ -397,7 +397,7 @@ public void testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGran { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); @@ -426,7 +426,7 @@ public void testCreateCompactionTaskWithNullSegmentGranularityInGranularitySpecA { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); @@ -455,7 +455,7 @@ public void testCreateCompactionTaskWithSameGranularitySpecAndSegmentGranularity { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); @@ -471,7 +471,7 @@ public void testSerdeWithInterval() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); final CompactionTask task = builder @@ -492,7 +492,7 @@ public void testSerdeWithSegments() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); final CompactionTask task = builder @@ -511,7 +511,7 @@ public void testSerdeWithDimensions() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -586,14 +586,14 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws toolbox.getChatHandlerProvider(), toolbox.getRowIngestionMetersFactory(), COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, toolbox.getAppenderatorsManager() ); final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -790,7 +790,7 @@ public void testSerdeWithUnknownTuningConfigThrowingError() throws IOException null, toolbox.getRowIngestionMetersFactory(), COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, toolbox.getAppenderatorsManager() ); @@ -848,7 +848,7 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -921,7 +921,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -995,7 +995,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1069,7 +1069,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1133,7 +1133,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1177,7 +1177,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, customMetricsSpec, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1214,7 +1214,7 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1257,7 +1257,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1281,7 +1281,7 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1295,7 +1295,7 @@ public void testEmptyInterval() final Builder builder = new Builder( DATA_SOURCE, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY ); @@ -1316,7 +1316,7 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException, null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null), COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1354,7 +1354,7 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException, null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)), COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1393,7 +1393,7 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio new PeriodGranularity(Period.months(3), null, null) ), COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1431,7 +1431,7 @@ public void testNullGranularitySpec() throws IOException, SegmentLoadingExceptio null, null, COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -1468,7 +1468,7 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity null, new ClientCompactionTaskGranularitySpec(null, null), COORDINATOR_CLIENT, - segmentLoaderFactory, + segmentCacheManagerFactory, RETRY_POLICY_FACTORY, IOConfig.DEFAULT_DROP_EXISTING ); @@ -2014,7 +2014,7 @@ public OldCompactionTaskWithAnyTuningConfigType( @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject CoordinatorClient coordinatorClient, - @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, @JacksonInject RetryPolicyFactory retryPolicyFactory, @JacksonInject AppenderatorsManager appenderatorsManager ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 55c1f150c316..f805b2c5ce24 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -80,9 +80,9 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -172,7 +172,7 @@ public static Iterable constructorFeeder() private final boolean useInputFormatApi; private AppenderatorsManager appenderatorsManager; - private SegmentLoader segmentLoader; + private SegmentCacheManager segmentCacheManager; private TestTaskRunner taskRunner; public IndexTaskTest(LockGranularity lockGranularity, boolean useInputFormatApi) @@ -190,8 +190,7 @@ public void setup() throws IOException appenderatorsManager = new TestAppenderatorsManager(); final File cacheDir = temporaryFolder.newFolder(); - segmentLoader = new SegmentLoaderLocalCacheManager( - indexIO, + segmentCacheManager = new SegmentLocalCacheManager( new SegmentLoaderConfig() { @Override @@ -345,7 +344,7 @@ public void testTransformSpec() throws Exception Assert.assertEquals(1, segments.size()); DataSegment segment = segments.get(0); - final File segmentFile = segmentLoader.getSegmentFiles(segment); + final File segmentFile = segmentCacheManager.getSegmentFiles(segment); final WindowedStorageAdapter adapter = new WindowedStorageAdapter( new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)), @@ -595,7 +594,7 @@ public void testNumShardsAndPartitionDimensionsProvided() throws Exception final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec(); Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, hashBasedNumberedShardSpec.getPartitionFunction()); - final File segmentFile = segmentLoader.getSegmentFiles(segment); + final File segmentFile = segmentCacheManager.getSegmentFiles(segment); final WindowedStorageAdapter adapter = new WindowedStorageAdapter( new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index c42bf20dad2a..ab9b0d72b695 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -63,7 +63,7 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.DruidNode; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -95,7 +95,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest private final TestUtils testUtils = new TestUtils(); private final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); - private SegmentLoaderFactory segmentLoaderFactory; + private SegmentCacheManagerFactory segmentCacheManagerFactory; private TaskStorage taskStorage; private IndexerSQLMetadataStorageCoordinator storageCoordinator; private SegmentsMetadataManager segmentsMetadataManager; @@ -123,7 +123,7 @@ public void setUpIngestionTestBase() throws IOException derbyConnectorRule.getConnector() ); lockbox = new TaskLockbox(taskStorage, storageCoordinator); - segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()); + segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper()); } @After @@ -153,9 +153,9 @@ public void shutdownTask(Task task) lockbox.remove(task); } - public SegmentLoader newSegmentLoader(File storageDir) + public SegmentCacheManager newSegmentLoader(File storageDir) { - return segmentLoaderFactory.manufacturate(storageDir); + return segmentCacheManagerFactory.manufacturate(storageDir); } public ObjectMapper getObjectMapper() @@ -168,9 +168,9 @@ public TaskStorage getTaskStorage() return taskStorage; } - public SegmentLoaderFactory getSegmentLoaderFactory() + public SegmentCacheManagerFactory getSegmentCacheManagerFactory() { - return segmentLoaderFactory; + return segmentCacheManagerFactory; } public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index f2b1503ab40f..94bf84d0e329 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -42,7 +42,7 @@ import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestFirehose; @@ -989,7 +989,7 @@ public void close() DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), + new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index a716217c0c0d..f79406be7303 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -30,7 +30,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.input.DruidInputSource; @@ -55,8 +55,10 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentLocalCacheLoader; import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -287,8 +289,9 @@ List querySegment(DataSegment dataSegment, List columns private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir) { - final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()) + final SegmentCacheManager cacheManager = new SegmentCacheManagerFactory(getObjectMapper()) .manufacturate(tempSegmentDir); + final SegmentLoader loader = new SegmentLocalCacheLoader(cacheManager, getIndexIO(), getObjectMapper()); try { return loader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index a0d544968702..d10013e743e7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -49,7 +49,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -614,7 +614,7 @@ public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) .addValue(AppenderatorsManager.class, TestUtils.APPENDERATORS_MANAGER) .addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller()) .addValue(CoordinatorClient.class, coordinatorClient) - .addValue(SegmentLoaderFactory.class, new SegmentLoaderFactory(indexIO, objectMapper)) + .addValue(SegmentCacheManagerFactory.class, new SegmentCacheManagerFactory(objectMapper)) .addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig())) .addValue(TaskConfig.class, taskConfig) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java index 7f001c27f3c7..c2021fcd47d8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java @@ -227,7 +227,7 @@ private Builder newCompactionTaskBuilder() { return new Builder( DATASOURCE, - getSegmentLoaderFactory(), + getSegmentCacheManagerFactory(), RETRY_POLICY_FACTORY ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 34ae0ab0e0e7..fa13c8e39f1d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -44,7 +44,7 @@ import org.apache.druid.indexing.common.ReingestionTimelineUtils; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; @@ -226,7 +226,7 @@ public Collection fetchUsedSegmentsInDataSourceForIntervals( SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); - final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER); + final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER); final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); Collection values = new ArrayList<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 1213f9eaa2e9..d226dc292e62 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -37,7 +37,7 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; @@ -318,7 +318,7 @@ public static Collection constructorFeeder() for (final TestCase testCase : testCases) { SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); - final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER); + final SegmentCacheManagerFactory slf = new SegmentCacheManagerFactory(MAPPER); final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); final CoordinatorClient cc = new CoordinatorClient(null, null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java index dcdc537e9cd8..ebc2b94f3289 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java @@ -27,7 +27,7 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.java.util.common.Intervals; @@ -45,7 +45,7 @@ public class DruidInputSourceTest { private final IndexIO indexIO = EasyMock.createMock(IndexIO.class); private final CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); - private final SegmentLoaderFactory segmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class); + private final SegmentCacheManagerFactory segmentCacheManagerFactory = EasyMock.createMock(SegmentCacheManagerFactory.class); private final RetryPolicyFactory retryPolicyFactory = EasyMock.createMock(RetryPolicyFactory.class); private final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class); @@ -63,7 +63,7 @@ public void setUp() final InjectableValues.Std injectableValues = (InjectableValues.Std) mapper.getInjectableValues(); injectableValues.addValue(IndexIO.class, indexIO); injectableValues.addValue(CoordinatorClient.class, coordinatorClient); - injectableValues.addValue(SegmentLoaderFactory.class, segmentLoaderFactory); + injectableValues.addValue(SegmentCacheManagerFactory.class, segmentCacheManagerFactory); injectableValues.addValue(RetryPolicyFactory.class, retryPolicyFactory); injectableValues.addValue(TaskConfig.class, taskConfig); } @@ -90,7 +90,7 @@ public void testSerdeUsingIntervals() throws Exception null, indexIO, coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, taskConfig ), @@ -124,7 +124,7 @@ public void testSerdeUsingIntervalsAndLegacyDimensionsMetrics() throws Exception ImmutableList.of("b"), indexIO, coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, taskConfig ), @@ -164,7 +164,7 @@ public void testSerdeUsingSegments() throws Exception null, indexIO, coordinatorClient, - segmentLoaderFactory, + segmentCacheManagerFactory, retryPolicyFactory, taskConfig ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java index 9270f5f8a573..3886f3dd1454 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java @@ -46,12 +46,10 @@ import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.Segment; -import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.loading.SegmentLoader; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -589,16 +587,10 @@ public void close() private DruidSegmentInputEntity makeInputEntity(final Interval interval) { return new DruidSegmentInputEntity( - new SegmentLoader() + new SegmentCacheManager() { @Override - public boolean isSegmentLoaded(DataSegment segment) - { - throw new UnsupportedOperationException("unused"); - } - - @Override - public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) + public boolean isSegmentCached(DataSegment segment) { throw new UnsupportedOperationException("unused"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index c05cdb8a90b6..f1ea2a0afade 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -23,7 +23,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; @@ -110,7 +110,7 @@ public void setup() throws IOException null, NoopJoinableFactory.INSTANCE, null, - new SegmentLoaderFactory(null, utils.getTestObjectMapper()), + new SegmentCacheManagerFactory(utils.getTestObjectMapper()), utils.getTestObjectMapper(), utils.getTestIndexIO(), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 190390267f87..032c2fda84fe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -54,7 +54,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; @@ -665,7 +665,7 @@ public void unannounceSegments(Iterable segments) DirectQueryProcessingPool.INSTANCE, // query executor service NoopJoinableFactory.INSTANCE, () -> monitorScheduler, // monitor scheduler - new SegmentLoaderFactory(null, new DefaultObjectMapper()), + new SegmentCacheManagerFactory(new DefaultObjectMapper()), MAPPER, INDEX_IO, MapCache.create(0), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 2dd94e8c2875..b7a489f4903e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -26,7 +26,7 @@ import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestTasks; import org.apache.druid.indexing.common.TestUtils; @@ -117,7 +117,7 @@ private WorkerTaskManager createWorkerTaskManager() null, NoopJoinableFactory.INSTANCE, null, - new SegmentLoaderFactory(null, jsonMapper), + new SegmentCacheManagerFactory(jsonMapper), jsonMapper, indexIO, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index c1845097996d..dbc44f069279 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -31,7 +31,7 @@ import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.IndexingServiceCondition; -import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestRealtimeTask; import org.apache.druid.indexing.common.TestTasks; @@ -190,7 +190,7 @@ private WorkerTaskMonitor createTaskMonitor() null, NoopJoinableFactory.INSTANCE, null, - new SegmentLoaderFactory(null, jsonMapper), + new SegmentCacheManagerFactory(jsonMapper), jsonMapper, indexIO, null, diff --git a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java index 99e38e3bc712..ed9d1fd2042b 100644 --- a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java +++ b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java @@ -34,8 +34,10 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder; import org.apache.druid.segment.loading.LocalLoadSpec; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoader; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; +import org.apache.druid.segment.loading.SegmentLocalCacheLoader; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import java.util.List; @@ -48,7 +50,8 @@ public class LocalDataStorageDruidModule implements DruidModule @Override public void configure(Binder binder) { - binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class); + binder.bind(SegmentCacheManager.class).to(SegmentLocalCacheManager.class).in(LazySingleton.class); + binder.bind(SegmentLoader.class).to(SegmentLocalCacheLoader.class).in(LazySingleton.class); bindDeepStorageLocal(binder); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java new file mode 100644 index 000000000000..945857470316 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import org.apache.druid.timeline.DataSegment; + +import java.io.File; + +/** + * A class to fetch segment files to local disk and manage the local cache. + * Implementations must be thread-safe. + */ +public interface SegmentCacheManager +{ + /** + * Checks whether a segment is already cached. + */ + boolean isSegmentCached(DataSegment segment); + + /** + * This method fetches the files for the given segment if the segment is not downloaded already. + * @throws SegmentLoadingException if there is an error in downloading files + */ + File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; + + /** + * Cleanup the cache space used by the segment + */ + void cleanup(DataSegment segment); +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 741cfa1373ad..8fe38a310290 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -23,16 +23,23 @@ import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; -import java.io.File; - /** - * Loading segments from deep storage to local storage. - * Implementations must be thread-safe. + * Loading segments from deep storage to local storage. Internally, this class can delegate the download to + * {@link SegmentCacheManager}. Implementations must be thread-safe. */ public interface SegmentLoader { - boolean isSegmentLoaded(DataSegment segment); + /** + * Builds a {@link Segment} by downloading if necessary + * @param segment - Segment to load + * @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading + * @param loadFailed - Callback to invoke if lazy loading fails during column access. + * @throws SegmentLoadingException - If there is an error in loading the segment + */ Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; - File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; + + /** + * cleanup any state used by this segment + */ void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java new file mode 100644 index 000000000000..6970f7b0caf2 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; +import org.apache.druid.timeline.DataSegment; + +import javax.inject.Inject; + +import java.io.File; +import java.io.IOException; + +public class SegmentLocalCacheLoader implements SegmentLoader +{ + private final SegmentCacheManager cacheManager; + private final IndexIO indexIO; + private final ObjectMapper jsonMapper; + + @Inject + public SegmentLocalCacheLoader(SegmentCacheManager cacheManager, IndexIO indexIO, @Json ObjectMapper mapper) + { + this.cacheManager = cacheManager; + this.indexIO = indexIO; + this.jsonMapper = mapper; + } + + @Override + public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException + { + final File segmentFiles = cacheManager.getSegmentFiles(segment); + File factoryJson = new File(segmentFiles, "factory.json"); + final SegmentizerFactory factory; + + if (factoryJson.exists()) { + try { + factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "%s", e.getMessage()); + } + } else { + factory = new MMappedQueryableSegmentizerFactory(indexIO); + } + + return factory.factorize(segment, segmentFiles, lazy, loadFailed); + } + + @Override + public void cleanup(DataSegment segment) + { + cacheManager.cleanup(segment); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java similarity index 88% rename from server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java rename to server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index eb32ea4c5f62..412cfe9728b1 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -26,9 +26,6 @@ import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.Segment; -import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nonnull; @@ -41,14 +38,13 @@ /** */ -public class SegmentLoaderLocalCacheManager implements SegmentLoader +public class SegmentLocalCacheManager implements SegmentCacheManager { @VisibleForTesting static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker"; - private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); + private static final EmittingLogger log = new EmittingLogger(SegmentLocalCacheManager.class); - private final IndexIO indexIO; private final SegmentLoaderConfig config; private final ObjectMapper jsonMapper; @@ -82,18 +78,16 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader private final StorageLocationSelectorStrategy strategy; // Note that we only create this via injection in historical and realtime nodes. Peons create these - // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific + // objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific // directories rather than statically configured directories. @Inject - public SegmentLoaderLocalCacheManager( - IndexIO indexIO, + public SegmentLocalCacheManager( List locations, SegmentLoaderConfig config, @Nonnull StorageLocationSelectorStrategy strategy, @Json ObjectMapper mapper ) { - this.indexIO = indexIO; this.config = config; this.jsonMapper = mapper; this.locations = locations; @@ -102,14 +96,13 @@ public SegmentLoaderLocalCacheManager( } @VisibleForTesting - SegmentLoaderLocalCacheManager( - IndexIO indexIO, + SegmentLocalCacheManager( SegmentLoaderConfig config, @Nonnull StorageLocationSelectorStrategy strategy, @Json ObjectMapper mapper ) { - this(indexIO, config.toStorageLocations(), config, strategy, mapper); + this(config.toStorageLocations(), config, strategy, mapper); } /** @@ -117,13 +110,11 @@ public SegmentLoaderLocalCacheManager( * * This ctor is mainly for test cases, including test cases in other modules */ - public SegmentLoaderLocalCacheManager( - IndexIO indexIO, + public SegmentLocalCacheManager( SegmentLoaderConfig config, @Json ObjectMapper mapper ) { - this.indexIO = indexIO; this.config = config; this.jsonMapper = mapper; this.locations = config.toStorageLocations(); @@ -132,7 +123,7 @@ public SegmentLoaderLocalCacheManager( } @Override - public boolean isSegmentLoaded(final DataSegment segment) + public boolean isSegmentCached(final DataSegment segment) { return findStorageLocationIfLoaded(segment) != null; } @@ -177,36 +168,6 @@ private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) return downloadStartMarker.exists(); } - @Override - public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException - { - final ReferenceCountingLock lock = createOrGetLock(segment); - final File segmentFiles; - synchronized (lock) { - try { - segmentFiles = getSegmentFiles(segment); - } - finally { - unlock(segment, lock); - } - } - File factoryJson = new File(segmentFiles, "factory.json"); - final SegmentizerFactory factory; - - if (factoryJson.exists()) { - try { - factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "%s", e.getMessage()); - } - } else { - factory = new MMappedQueryableSegmentizerFactory(indexIO); - } - - return factory.factorize(segment, segmentFiles, lazy, loadFailed); - } - /** * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged. * @param segment diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 93b16a317b25..486ad46920b8 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -154,11 +154,6 @@ public Map getDataSourceCounts() return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments); } - public boolean isSegmentCached(final DataSegment segment) - { - return segmentLoader.isSegmentLoaded(segment); - } - /** * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based * datasource of a single table. diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index bc0da936df05..a9d972f7ae85 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; @@ -88,6 +89,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private final ScheduledExecutorService exec; private final ServerTypeConfig serverTypeConfig; private final ConcurrentSkipListSet segmentsToDelete; + private final SegmentCacheManager segmentCacheManager; private volatile boolean started = false; @@ -108,6 +110,7 @@ public SegmentLoadDropHandler( DataSegmentAnnouncer announcer, DataSegmentServerAnnouncer serverAnnouncer, SegmentManager segmentManager, + SegmentCacheManager segmentCacheManager, ServerTypeConfig serverTypeConfig ) { @@ -117,6 +120,7 @@ public SegmentLoadDropHandler( announcer, serverAnnouncer, segmentManager, + segmentCacheManager, Executors.newScheduledThreadPool( config.getNumLoadingThreads(), Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s") @@ -132,6 +136,7 @@ public SegmentLoadDropHandler( DataSegmentAnnouncer announcer, DataSegmentServerAnnouncer serverAnnouncer, SegmentManager segmentManager, + SegmentCacheManager segmentCacheManager, ScheduledExecutorService exec, ServerTypeConfig serverTypeConfig ) @@ -141,6 +146,7 @@ public SegmentLoadDropHandler( this.announcer = announcer; this.serverAnnouncer = serverAnnouncer; this.segmentManager = segmentManager; + this.segmentCacheManager = segmentCacheManager; this.exec = exec; this.serverTypeConfig = serverTypeConfig; @@ -228,7 +234,7 @@ private void loadLocalCache() if (!segment.getId().toString().equals(file.getName())) { log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getId()); ignored++; - } else if (segmentManager.isSegmentCached(segment)) { + } else if (segmentCacheManager.isSegmentCached(segment)) { cachedSegments.add(segment); } else { log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getId()); diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java new file mode 100644 index 000000000000..a2686816325c --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import org.apache.druid.java.util.common.MapUtils; +import org.apache.druid.timeline.DataSegment; + +import java.io.File; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + */ +public class CacheTestSegmentCacheManager implements SegmentCacheManager +{ + private final Set segmentsInTrash = new HashSet<>(); + + @Override + public boolean isSegmentCached(DataSegment segment) + { + Map loadSpec = segment.getLoadSpec(); + return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); + } + + @Override + public File getSegmentFiles(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public void cleanup(DataSegment segment) + { + segmentsInTrash.add(segment); + } + + public Set getSegmentsInTrash() + { + return segmentsInTrash; + } +} diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index 557537c06811..cf47755ba942 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -19,7 +19,6 @@ package org.apache.druid.segment.loading; -import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; @@ -28,23 +27,10 @@ import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; -import java.io.File; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - /** */ public class CacheTestSegmentLoader implements SegmentLoader { - private final Set segmentsInTrash = new HashSet<>(); - - @Override - public boolean isSegmentLoaded(DataSegment segment) - { - Map loadSpec = segment.getLoadSpec(); - return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); - } @Override public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) @@ -82,20 +68,9 @@ public void close() }; } - @Override - public File getSegmentFiles(DataSegment segment) - { - throw new UnsupportedOperationException(); - } - @Override public void cleanup(DataSegment segment) { - segmentsInTrash.add(segment); - } - public Set getSegmentsInTrash() - { - return segmentsInTrash; } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java similarity index 95% rename from server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java rename to server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java index f617261068ce..e812ff7ac71e 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.segment.TestHelper; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -53,7 +52,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; -public class SegmentLoaderLocalCacheManagerConcurrencyTest +public class SegmentLocalCacheManagerConcurrencyTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -66,10 +65,10 @@ public class SegmentLoaderLocalCacheManagerConcurrencyTest private final String segmentVersion; private File localSegmentCacheFolder; - private SegmentLoaderLocalCacheManager manager; + private SegmentLocalCacheManager manager; private ExecutorService executorService; - public SegmentLoaderLocalCacheManagerConcurrencyTest() + public SegmentLocalCacheManagerConcurrencyTest() { jsonMapper = new DefaultObjectMapper(); jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); @@ -93,8 +92,7 @@ public void setUp() throws Exception final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null); locations.add(locationConfig); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java similarity index 91% rename from server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java rename to server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java index 75bbeff61c7a..26c9cbdabf4c 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java @@ -27,7 +27,6 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.segment.TestHelper; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -41,7 +40,7 @@ import java.util.ArrayList; import java.util.List; -public class SegmentLoaderLocalCacheManagerTest +public class SegmentLocalCacheManagerTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -49,9 +48,9 @@ public class SegmentLoaderLocalCacheManagerTest private final ObjectMapper jsonMapper; private File localSegmentCacheFolder; - private SegmentLoaderLocalCacheManager manager; + private SegmentLocalCacheManager manager; - public SegmentLoaderLocalCacheManagerTest() + public SegmentLocalCacheManagerTest() { jsonMapper = new DefaultObjectMapper(); jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); @@ -73,8 +72,7 @@ public void setUp() throws Exception final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null); locations.add(locationConfig); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -90,10 +88,10 @@ public void testIfSegmentIsLoaded() ); cachedSegmentFile.mkdirs(); - Assert.assertTrue("Expect cache hit", manager.isSegmentLoaded(cachedSegment)); + Assert.assertTrue("Expect cache hit", manager.isSegmentCached(cachedSegment)); final DataSegment uncachedSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D"); - Assert.assertFalse("Expect cache miss", manager.isSegmentLoaded(uncachedSegment)); + Assert.assertFalse("Expect cache miss", manager.isSegmentCached(uncachedSegment)); } @Test @@ -122,13 +120,13 @@ public void testGetAndCleanSegmentFiles() throws Exception final File indexZip = new File(localSegmentFile, "index.zip"); indexZip.createNewFile(); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); manager.getSegmentFiles(segmentToDownload); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); manager.cleanup(segmentToDownload); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload)); } @Test @@ -143,8 +141,7 @@ public void testRetrySuccessAtFirstLocation() throws Exception final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null); locations.add(locationConfig2); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -169,14 +166,14 @@ public void testRetrySuccessAtFirstLocation() throws Exception final File indexZip = new File(localSegmentFile, "index.zip"); indexZip.createNewFile(); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); File segmentFile = manager.getSegmentFiles(segmentToDownload); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); manager.cleanup(segmentToDownload); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload)); } @Test @@ -192,8 +189,7 @@ public void testRetrySuccessAtSecondLocation() throws Exception final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null); locations.add(locationConfig2); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -218,14 +214,14 @@ public void testRetrySuccessAtSecondLocation() throws Exception final File indexZip = new File(localSegmentFile, "index.zip"); indexZip.createNewFile(); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); File segmentFile = manager.getSegmentFiles(segmentToDownload); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder2/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); manager.cleanup(segmentToDownload); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload)); } @Test @@ -243,8 +239,7 @@ public void testRetryAllFail() throws Exception final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null); locations.add(locationConfig2); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -276,7 +271,7 @@ public void testRetryAllFail() throws Exception } catch (SegmentLoadingException e) { } - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload)); manager.cleanup(segmentToDownload); } @@ -293,8 +288,7 @@ public void testEmptyToFullOrder() throws Exception final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null); locations.add(locationConfig2); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -319,11 +313,11 @@ public void testEmptyToFullOrder() throws Exception final File indexZip = new File(localSegmentFile, "index.zip"); indexZip.createNewFile(); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); File segmentFile = manager.getSegmentFiles(segmentToDownload); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec( ImmutableMap.of( @@ -347,10 +341,10 @@ public void testEmptyToFullOrder() throws Exception File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2)); manager.cleanup(segmentToDownload2); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload2)); } private DataSegment dataSegmentWithInterval(String intervalStr) @@ -402,8 +396,7 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception ); } - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locationConfigs), new RoundRobinStorageLocationSelectorStrategy(locations), jsonMapper @@ -425,14 +418,14 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception // manually create a local segment under segmentSrcFolder createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload1)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload1)); File segmentFile = manager.getSegmentFiles(segmentToDownload1); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload1)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload1)); manager.cleanup(segmentToDownload1); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload1)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload1)); // Segment 2 should be downloaded in local_storage_folder2 final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec( @@ -449,14 +442,14 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception // manually create a local segment under segmentSrcFolder createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2)); File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2)); manager.cleanup(segmentToDownload2); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload2)); // Segment 3 should be downloaded in local_storage_folder3 final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D").withLoadSpec( @@ -476,10 +469,10 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception File segmentFile3 = manager.getSegmentFiles(segmentToDownload3); Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload3)); manager.cleanup(segmentToDownload3); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload3)); // Segment 4 should be downloaded in local_storage_folder again, asserting round robin distribution of segments final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec( @@ -497,13 +490,13 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" + ".000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload4)); File segmentFile1 = manager.getSegmentFiles(segmentToDownload4); Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload4)); manager.cleanup(segmentToDownload4); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload4)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload4)); } private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPath) throws Exception @@ -538,8 +531,7 @@ public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exceptio locations.add(locationConfig2); locations.add(locationConfig3); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -561,11 +553,11 @@ public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exceptio createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); File segmentFile = manager.getSegmentFiles(segmentToDownload); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); // Segment 2 should be downloaded in local_storage_folder2, segment2 size 5L final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 5L).withLoadSpec( @@ -583,11 +575,11 @@ public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exceptio createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2)); File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2)); // Segment 3 should be downloaded in local_storage_folder3, segment3 size 20L @@ -608,7 +600,7 @@ public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exceptio File segmentFile3 = manager.getSegmentFiles(segmentToDownload3); Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload3)); // Now the storage locations local_storage_folder1, local_storage_folder2 and local_storage_folder3 have 10, 5 and // 20 bytes occupied respectively. The default strategy should pick location2 (as it has least bytes used) for the @@ -628,11 +620,11 @@ public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exceptio createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" + ".000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload4)); File segmentFile1 = manager.getSegmentFiles(segmentToDownload4); Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder2/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload4)); } @@ -652,8 +644,7 @@ public void testSegmentDistributionUsingRandomStrategy() throws Exception SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig().withLocations(locationConfigs); - manager = new SegmentLoaderLocalCacheManager( - TestHelper.getTestIndexIO(), + manager = new SegmentLocalCacheManager( new SegmentLoaderConfig().withLocations(locationConfigs), new RandomStorageLocationSelectorStrategy(segmentLoaderConfig.toStorageLocations()), jsonMapper @@ -677,11 +668,11 @@ public void testSegmentDistributionUsingRandomStrategy() throws Exception createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); File segmentFile = manager.getSegmentFiles(segmentToDownload); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); // Segment 2 should be downloaded in local_storage_folder3, segment2 size 9L final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 9L).withLoadSpec( @@ -699,11 +690,11 @@ public void testSegmentDistributionUsingRandomStrategy() throws Exception createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload2)); File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder3/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload2)); // Segment 3 should not be downloaded, segment3 size 20L @@ -729,7 +720,7 @@ public void testSegmentDistributionUsingRandomStrategy() throws Exception } catch (SegmentLoadingException e) { } - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload3)); } @Test @@ -759,16 +750,16 @@ public void testGetSegmentFilesWhenDownloadStartMarkerExists() throws Exception Assert.assertTrue(indexZip.createNewFile()); final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); // Emulate a corrupted segment file final File downloadMarker = new File( cachedSegmentDir, - SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME + SegmentLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME ); Assert.assertTrue(downloadMarker.createNewFile()); - Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload)); Assert.assertFalse(cachedSegmentDir.exists()); } } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java index 38c410019400..b62d45348b3c 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java @@ -52,8 +52,9 @@ import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalLoadSpec; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentLocalCacheLoader; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.SegmentizerFactory; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; @@ -102,7 +103,7 @@ public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNull private IndexIO indexIO; private File segmentCacheDir; private File segmentDeepStorageDir; - private SegmentLoaderLocalCacheManager segmentLoader; + private SegmentLocalCacheManager segmentCacheManager; private SegmentManager segmentManager; private BroadcastTableJoinableFactory joinableFactory; @@ -125,8 +126,7 @@ public void setup() throws IOException ); segmentCacheDir = temporaryFolder.newFolder(); segmentDeepStorageDir = temporaryFolder.newFolder(); - segmentLoader = new SegmentLoaderLocalCacheManager( - indexIO, + segmentCacheManager = new SegmentLocalCacheManager( new SegmentLoaderConfig() { @Override @@ -139,7 +139,7 @@ public List getLocations() }, objectMapper ); - segmentManager = new SegmentManager(segmentLoader); + segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper)); joinableFactory = new BroadcastTableJoinableFactory(segmentManager); EmittingLogger.registerEmitter(new NoopServiceEmitter()); } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 762339f7f1f8..8698146c9d53 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -46,7 +46,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.File; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -61,14 +60,9 @@ public class SegmentManagerTest { + private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader() { - @Override - public boolean isSegmentLoaded(DataSegment segment) - { - return false; - } - @Override public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) { @@ -78,12 +72,6 @@ public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLo ); } - @Override - public File getSegmentFiles(DataSegment segment) - { - throw new UnsupportedOperationException(); - } - @Override public void cleanup(DataSegment segment) { diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index 87587ce69a66..5295b9dcde51 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -40,8 +40,9 @@ import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalLoadSpec; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentLocalCacheLoader; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.SegmentizerFactory; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -57,6 +58,7 @@ import org.junit.rules.TemporaryFolder; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -82,7 +84,7 @@ public class SegmentManagerThreadSafetyTest private IndexIO indexIO; private File segmentCacheDir; private File segmentDeepStorageDir; - private SegmentLoaderLocalCacheManager segmentLoader; + private SegmentLocalCacheManager segmentCacheManager; private SegmentManager segmentManager; private ExecutorService exec; @@ -98,8 +100,7 @@ public void setup() throws IOException indexIO = new IndexIO(objectMapper, () -> 0); segmentCacheDir = temporaryFolder.newFolder(); segmentDeepStorageDir = temporaryFolder.newFolder(); - segmentLoader = new SegmentLoaderLocalCacheManager( - indexIO, + segmentCacheManager = new SegmentLocalCacheManager( new SegmentLoaderConfig() { @Override @@ -112,7 +113,7 @@ public List getLocations() }, objectMapper ); - segmentManager = new SegmentManager(segmentLoader); + segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper)); exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d"); EmittingLogger.registerEmitter(new NoopServiceEmitter()); } @@ -137,7 +138,7 @@ public void testLoadSameSegment() throws IOException, ExecutionException, Interr } Assert.assertEquals(1, segmentPuller.numFileLoaded.size()); Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue()); - Assert.assertEquals(0, segmentLoader.getSegmentLocks().size()); + Assert.assertEquals(0, segmentCacheManager.getSegmentLocks().size()); } @Test(timeout = 6000L) @@ -168,7 +169,7 @@ public void testLoadMultipleSegments() throws IOException, ExecutionException, I } Assert.assertEquals(11, segmentPuller.numFileLoaded.size()); Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue()); - Assert.assertEquals(0, segmentLoader.getSegmentLocks().size()); + Assert.assertEquals(0, segmentCacheManager.getSegmentLocks().size()); } private DataSegment createSegment(String interval) throws IOException diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java index 37cc2606a639..00164fc43dff 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java @@ -35,9 +35,11 @@ import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LoadSpec; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentLocalCacheLoader; +import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.SegmentizerFactory; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -87,9 +89,10 @@ public void setup() throws IOException objectMapper = TestHelper.makeJsonMapper(); objectMapper.registerSubtypes(TestLoadSpec.class); objectMapper.registerSubtypes(TestSegmentizerFactory.class); - SegmentManager segmentManager = new SegmentManager(new SegmentLoaderLocalCacheManager( + SegmentCacheManager cacheManager = new SegmentLocalCacheManager(config, objectMapper); + SegmentManager segmentManager = new SegmentManager(new SegmentLocalCacheLoader( + cacheManager, TestIndex.INDEX_IO, - config, objectMapper )); segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class); @@ -99,6 +102,7 @@ public void setup() throws IOException segmentAnnouncer, Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, + cacheManager, new ServerTypeConfig(ServerType.HISTORICAL) ); EmittingLogger.registerEmitter(new NoopServiceEmitter()); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 3930706658e9..02a6db13bcde 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -31,7 +31,9 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.loading.CacheTestSegmentCacheManager; import org.apache.druid.segment.loading.CacheTestSegmentLoader; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; @@ -80,7 +82,8 @@ public class SegmentLoadDropHandlerTest private TestStorageLocation testStorageLocation; private AtomicInteger announceCount; private ConcurrentSkipListSet segmentsAnnouncedByMe; - private CacheTestSegmentLoader segmentLoader; + private CacheTestSegmentCacheManager segmentCacheManager; + private SegmentLoader segmentLoader; private SegmentManager segmentManager; private List scheduledRunnable; private SegmentLoaderConfig segmentLoaderConfig; @@ -116,6 +119,7 @@ public void setUp() scheduledRunnable = new ArrayList<>(); + segmentCacheManager = new CacheTestSegmentCacheManager(); segmentLoader = new CacheTestSegmentLoader(); segmentManager = new SegmentManager(segmentLoader); segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); @@ -239,6 +243,7 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) announcer, Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, + segmentCacheManager, scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), new ServerTypeConfig(ServerType.HISTORICAL) ); @@ -273,7 +278,7 @@ Because another addSegment() call is executed, which removes the segment from se } Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment)); segmentLoadDropHandler.stop(); } @@ -312,7 +317,7 @@ Because another addSegment() call is executed, which removes the segment from se } Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment)); segmentLoadDropHandler.stop(); } @@ -409,6 +414,7 @@ public int getAnnounceIntervalMillis() announcer, Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, + segmentCacheManager, new ServerTypeConfig(ServerType.HISTORICAL) ); @@ -495,6 +501,7 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ announcer, Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, + segmentCacheManager, scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), new ServerTypeConfig(ServerType.HISTORICAL) ); diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 902acd8d02af..16832b3dfcb3 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -104,7 +104,6 @@ import javax.annotation.Nullable; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -148,12 +147,6 @@ public void setUp() segmentManager = new SegmentManager( new SegmentLoader() { - @Override - public boolean isSegmentLoaded(DataSegment segment) - { - return false; - } - @Override public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { @@ -163,12 +156,6 @@ public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLo ); } - @Override - public File getSegmentFiles(DataSegment segment) - { - throw new UnsupportedOperationException(); - } - @Override public void cleanup(DataSegment segment) { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java index c30a37cb01ad..8356a26d33ac 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; @@ -169,6 +170,7 @@ public int getDropSegmentDelayMillis() EasyMock.createNiceMock(DataSegmentAnnouncer.class), EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), EasyMock.createNiceMock(SegmentManager.class), + EasyMock.createNiceMock(SegmentCacheManager.class), EasyMock.createNiceMock(ScheduledExecutorService.class), new ServerTypeConfig(ServerType.HISTORICAL) ) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 5a78f0f26bb6..569e831240c7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -131,6 +131,7 @@ import org.joda.time.chrono.ISOChronology; import javax.annotation.Nullable; + import java.io.File; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList;