Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ d.azure.] as [org.apache.druid.storage.azure.AzureAccountConfig@759c9ad9]
ip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0]
2015-04-14T02:49:08,276 INFO [ZkCoordinator-0] org.apache.druid.storage.azure.AzureDataSegmentPuller - Loaded 1196 bytes from [dde/2015-01-02T00:00:00.000Z_2015-01-03
T00:00:00.000Z/2015-04-14T02:41:09.484Z/0/index.zip] to [/opt/druid/zk_druid/dde/2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z/2015-04-14T02:41:09.484Z/0]
2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196]
2015-04-14T02:49:08,277 WARN [ZkCoordinator-0] org.apache.druid.segment.loading.SegmentLocalCacheManager - Segment [dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] is different than expected size. Expected [0] found [1196]
2015-04-14T02:49:08,282 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z] at path[/druid/dev/segments/192.168.33.104:8081/192.168.33.104:8081_historical__default_tier_2015-04-14T02:49:08.282Z_7bb87230ebf940188511dd4a53ffd7351]
2015-04-14T02:49:08,292 INFO [ZkCoordinator-0] org.apache.druid.server.coordination.ZkCoordinator - Completed request [LOAD: dde_2015-01-02T00:00:00.000Z_2015-01-03T00:00:00.000Z_2015-04-14T02:41:09.484Z]
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
Expand Down Expand Up @@ -2887,7 +2887,7 @@ public void close()
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
Expand Down Expand Up @@ -2974,7 +2974,7 @@ public void close()
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;

import java.io.File;
Expand All @@ -34,25 +33,21 @@
/**
*
*/
public class SegmentLoaderFactory
public class SegmentCacheManagerFactory
{
private final IndexIO indexIO;
private final ObjectMapper jsonMapper;

@Inject
public SegmentLoaderFactory(
IndexIO indexIO,
public SegmentCacheManagerFactory(
@Json ObjectMapper mapper
)
{
this.indexIO = indexIO;
this.jsonMapper = mapper;
}

public SegmentLoader manufacturate(File storageDir)
public SegmentCacheManager manufacturate(File storageDir)
{
return new SegmentLoaderLocalCacheManager(
indexIO,
return new SegmentLocalCacheManager(
new SegmentLoaderConfig().withLocations(
Collections.singletonList(new StorageLocationConfig(storageDir, null, null))),
jsonMapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -101,7 +101,7 @@ public class TaskToolbox
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final SegmentLoader segmentLoader;
private final SegmentCacheManager segmentCacheManager;
private final ObjectMapper jsonMapper;
private final File taskWorkDir;
private final IndexIO indexIO;
Expand Down Expand Up @@ -144,7 +144,7 @@ public TaskToolbox(
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
@Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoader segmentLoader,
SegmentCacheManager segmentCacheManager,
ObjectMapper jsonMapper,
File taskWorkDir,
IndexIO indexIO,
Expand Down Expand Up @@ -183,7 +183,7 @@ public TaskToolbox(
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoader = segmentLoader;
this.segmentCacheManager = segmentCacheManager;
this.jsonMapper = jsonMapper;
this.taskWorkDir = taskWorkDir;
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
Expand Down Expand Up @@ -318,7 +318,7 @@ public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
{
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
for (DataSegment segment : segments) {
retVal.put(segment, segmentLoader.getSegmentFiles(segment));
retVal.put(segment, segmentCacheManager.getSegmentFiles(segment));
}

return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class TaskToolboxFactory
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final SegmentLoaderFactory segmentLoaderFactory;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
private final ObjectMapper jsonMapper;
private final IndexIO indexIO;
private final Cache cache;
Expand Down Expand Up @@ -124,7 +124,7 @@ public TaskToolboxFactory(
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoaderFactory segmentLoaderFactory,
SegmentCacheManagerFactory segmentCacheManagerFactory,
@Json ObjectMapper jsonMapper,
IndexIO indexIO,
Cache cache,
Expand Down Expand Up @@ -162,7 +162,7 @@ public TaskToolboxFactory(
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoaderFactory = segmentLoaderFactory;
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
this.jsonMapper = jsonMapper;
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
Expand Down Expand Up @@ -204,7 +204,7 @@ public TaskToolbox build(Task task)
queryProcessingPool,
joinableFactory,
monitorSchedulerProvider,
segmentLoaderFactory.manufacturate(taskWorkDir),
segmentCacheManagerFactory.manufacturate(taskWorkDir),
jsonMapper,
taskWorkDir,
indexIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -157,7 +157,7 @@ public class CompactionTask extends AbstractBatchIndexTask
private final PartitionConfigurationManager partitionConfigurationManager;

@JsonIgnore
private final SegmentLoaderFactory segmentLoaderFactory;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;

@JsonIgnore
private final RetryPolicyFactory retryPolicyFactory;
Expand Down Expand Up @@ -185,7 +185,7 @@ public CompactionTask(
@JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
Expand Down Expand Up @@ -233,7 +233,7 @@ public CompactionTask(
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
this.segmentLoaderFactory = segmentLoaderFactory;
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
this.retryPolicyFactory = retryPolicyFactory;
}

Expand Down Expand Up @@ -422,7 +422,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
metricsSpec,
granularitySpec,
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
segmentCacheManagerFactory,
retryPolicyFactory,
ioConfig.isDropExisting()
);
Expand Down Expand Up @@ -521,7 +521,7 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory,
final SegmentCacheManagerFactory segmentCacheManagerFactory,
final RetryPolicyFactory retryPolicyFactory,
final boolean dropExisting
) throws IOException, SegmentLoadingException
Expand Down Expand Up @@ -604,7 +604,7 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
dataSchema,
interval,
coordinatorClient,
segmentLoaderFactory,
segmentCacheManagerFactory,
retryPolicyFactory,
dropExisting
),
Expand Down Expand Up @@ -632,7 +632,7 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
dataSchema,
segmentProvider.interval,
coordinatorClient,
segmentLoaderFactory,
segmentCacheManagerFactory,
retryPolicyFactory,
dropExisting
),
Expand All @@ -647,7 +647,7 @@ private static ParallelIndexIOConfig createIoConfig(
DataSchema dataSchema,
Interval interval,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
SegmentCacheManagerFactory segmentCacheManagerFactory,
RetryPolicyFactory retryPolicyFactory,
boolean dropExisting
)
Expand All @@ -663,7 +663,7 @@ private static ParallelIndexIOConfig createIoConfig(
null,
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,
segmentCacheManagerFactory,
retryPolicyFactory,
toolbox.getConfig()
),
Expand Down Expand Up @@ -1016,7 +1016,7 @@ CompactionTuningConfig computeTuningConfig()
public static class Builder
{
private final String dataSource;
private final SegmentLoaderFactory segmentLoaderFactory;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
private final RetryPolicyFactory retryPolicyFactory;

private CompactionIOConfig ioConfig;
Expand All @@ -1035,12 +1035,12 @@ public static class Builder

public Builder(
String dataSource,
SegmentLoaderFactory segmentLoaderFactory,
SegmentCacheManagerFactory segmentCacheManagerFactory,
RetryPolicyFactory retryPolicyFactory
)
{
this.dataSource = dataSource;
this.segmentLoaderFactory = segmentLoaderFactory;
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
this.retryPolicyFactory = retryPolicyFactory;
}

Expand Down Expand Up @@ -1118,7 +1118,7 @@ public CompactionTask build()
granularitySpec,
tuningConfig,
context,
segmentLoaderFactory,
segmentCacheManagerFactory,
retryPolicyFactory
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.ReingestionTimelineUtils;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -45,7 +45,7 @@
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
Expand Down Expand Up @@ -84,7 +84,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
private final Long maxInputSegmentBytesPerTask;
private final IndexIO indexIO;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
private final RetryPolicyFactory retryPolicyFactory;

private List<InputSplit<List<WindowedSegmentId>>> splits;
Expand All @@ -102,7 +102,7 @@ public IngestSegmentFirehoseFactory(
@JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask,
@JacksonInject IndexIO indexIO,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
Expand All @@ -119,7 +119,7 @@ public IngestSegmentFirehoseFactory(
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask;
this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory");
this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
}

Expand All @@ -136,7 +136,7 @@ public FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> withSplit(
maxInputSegmentBytesPerTask,
indexIO,
coordinatorClient,
segmentLoaderFactory,
segmentCacheManagerFactory,
retryPolicyFactory
);
}
Expand Down Expand Up @@ -202,15 +202,15 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory)
// Note: this requires enough local storage space to fit all of the segments, even though
// IngestSegmentFirehose iterates over the segments in series. We may want to change this
// to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory);
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
final DataSegment segment = chunk.getObject();

segmentFileMap.computeIfAbsent(segment, k -> {
try {
return segmentLoader.getSegmentFiles(segment);
return segmentCacheManager.getSegmentFiles(segment);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
Expand Down
Loading