diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 1a5512889d94..892349f755fe 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -139,7 +139,6 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -2559,9 +2558,7 @@ public List getLocations() this::makeTimeseriesAndScanConglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index f0ef143e5eec..52941166c002 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -135,7 +135,6 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -2765,9 +2764,7 @@ public List getLocations() this::makeTimeseriesOnlyConglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java index b15d3dfc4bdd..83fa9dbb06b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java @@ -19,7 +19,10 @@ package org.apache.druid.indexing.common; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; @@ -29,23 +32,30 @@ import java.util.Collections; /** + * */ public class SegmentLoaderFactory { - private final SegmentLoaderLocalCacheManager loader; + private final IndexIO indexIO; + private final ObjectMapper jsonMapper; @Inject public SegmentLoaderFactory( - SegmentLoaderLocalCacheManager loader + IndexIO indexIO, + @Json ObjectMapper mapper ) { - this.loader = loader; + this.indexIO = indexIO; + this.jsonMapper = mapper; } public SegmentLoader manufacturate(File storageDir) { - return loader.withConfig( - new SegmentLoaderConfig().withLocations(Collections.singletonList(new StorageLocationConfig().setPath(storageDir))) + return new SegmentLoaderLocalCacheManager( + indexIO, + new SegmentLoaderConfig().withLocations( + Collections.singletonList(new StorageLocationConfig().setPath(storageDir))), + jsonMapper ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 740d78d36395..c897de3fc0f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -29,6 +29,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -40,6 +41,8 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -132,6 +135,15 @@ public class CompactionTask extends AbstractTask @JsonIgnore private final RowIngestionMetersFactory rowIngestionMetersFactory; + @JsonIgnore + private final CoordinatorClient coordinatorClient; + + @JsonIgnore + private final SegmentLoaderFactory segmentLoaderFactory; + + @JsonIgnore + private final RetryPolicyFactory retryPolicyFactory; + @JsonIgnore private List indexTaskSpecs; @@ -153,7 +165,10 @@ public CompactionTask( @JacksonInject ObjectMapper jsonMapper, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject ChatHandlerProvider chatHandlerProvider, - @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory ) { super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context); @@ -186,6 +201,9 @@ public CompactionTask( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.coordinatorClient = coordinatorClient; + this.segmentLoaderFactory = segmentLoaderFactory; + this.retryPolicyFactory = retryPolicyFactory; } @JsonProperty @@ -278,20 +296,23 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception metricsSpec, keepSegmentGranularity, segmentGranularity, - jsonMapper + jsonMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ).stream() - .map(spec -> new IndexTask( - getId(), - getGroupId(), - getTaskResource(), - getDataSource(), - spec, - getContext(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory - )) - .collect(Collectors.toList()); + .map(spec -> new IndexTask( + getId(), + getGroupId(), + getTaskResource(), + getDataSource(), + spec, + getContext(), + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory + )) + .collect(Collectors.toList()); } if (indexTaskSpecs.isEmpty()) { @@ -338,7 +359,10 @@ static List createIngestionSchema( @Nullable final AggregatorFactory[] metricsSpec, @Nullable final Boolean keepSegmentGranularity, @Nullable final Granularity segmentGranularity, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final CoordinatorClient coordinatorClient, + final SegmentLoaderFactory segmentLoaderFactory, + final RetryPolicyFactory retryPolicyFactory ) throws IOException, SegmentLoadingException { Pair, List>> pair = prepareSegments( @@ -379,7 +403,14 @@ static List createIngestionSchema( return Collections.singletonList( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, segmentProvider.interval), + createIoConfig( + toolbox, + dataSchema, + segmentProvider.interval, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ), compactionTuningConfig ) ); @@ -411,7 +442,14 @@ static List createIngestionSchema( specs.add( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, interval), + createIoConfig( + toolbox, + dataSchema, + interval, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ), compactionTuningConfig ) ); @@ -438,7 +476,14 @@ static List createIngestionSchema( return Collections.singletonList( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, segmentProvider.interval), + createIoConfig( + toolbox, + dataSchema, + segmentProvider.interval, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ), compactionTuningConfig ) ); @@ -446,7 +491,14 @@ static List createIngestionSchema( } } - private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval) + private static IndexIOConfig createIoConfig( + TaskToolbox toolbox, + DataSchema dataSchema, + Interval interval, + CoordinatorClient coordinatorClient, + SegmentLoaderFactory segmentLoaderFactory, + RetryPolicyFactory retryPolicyFactory + ) { return new IndexIOConfig( new IngestSegmentFirehoseFactory( @@ -456,7 +508,10 @@ private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema data // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), - toolbox.getIndexIO() + toolbox.getIndexIO(), + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ), false ); @@ -811,7 +866,7 @@ IndexTuningConfig computeTuningConfig(List> qu * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment}, * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together. * {@link #hasPartitionConfig} checks one of those configs is set. - * + *

* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig * returns true. If targetCompactionSizeBytes is not set, this returns null or * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of @@ -860,6 +915,9 @@ public static class Builder private final AuthorizerMapper authorizerMapper; private final ChatHandlerProvider chatHandlerProvider; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final CoordinatorClient coordinatorClient; + private final SegmentLoaderFactory segmentLoaderFactory; + private final RetryPolicyFactory retryPolicyFactory; @Nullable private Interval interval; @@ -885,7 +943,10 @@ public Builder( ObjectMapper jsonMapper, AuthorizerMapper authorizerMapper, ChatHandlerProvider chatHandlerProvider, - RowIngestionMetersFactory rowIngestionMetersFactory + RowIngestionMetersFactory rowIngestionMetersFactory, + CoordinatorClient coordinatorClient, + SegmentLoaderFactory segmentLoaderFactory, + RetryPolicyFactory retryPolicyFactory ) { this.dataSource = dataSource; @@ -893,6 +954,9 @@ public Builder( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.coordinatorClient = coordinatorClient; + this.segmentLoaderFactory = segmentLoaderFactory; + this.retryPolicyFactory = retryPolicyFactory; } public Builder interval(Interval interval) @@ -968,7 +1032,10 @@ public CompactionTask build() jsonMapper, authorizerMapper, chatHandlerProvider, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 6d5a0d8e56ad..d0e083a2b6ed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -84,7 +83,6 @@ import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; @@ -419,8 +417,6 @@ public TaskStatus run(final TaskToolbox toolbox) final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - setFirehoseFactoryToolbox(firehoseFactory, toolbox); - final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. FileUtils.forceMkdir(firehoseTempDir); @@ -489,25 +485,6 @@ public TaskStatus run(final TaskToolbox toolbox) } } - // pass toolbox to any IngestSegmentFirehoseFactory - private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox) - { - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - return; - } - - if (firehoseFactory instanceof CombiningFirehoseFactory) { - for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) { - if (delegateFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox); - } else if (delegateFactory instanceof CombiningFirehoseFactory) { - setFirehoseFactoryToolbox(delegateFactory, toolbox); - } - } - } - } - private Map getTaskCompletionReports() { return TaskReport.buildTaskReports( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 8004243bb3aa..435de05892ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -185,11 +184,6 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception { final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - // pass toolbox to Firehose - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - } - final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. FileUtils.forceMkdir(firehoseTempDir); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 8087582ecb55..bae2946bbdc9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -30,16 +30,20 @@ import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.SegmentListUsedAction; +import org.apache.druid.indexing.common.RetryPolicy; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; @@ -48,14 +52,17 @@ import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -68,7 +75,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory dimensions; private final List metrics; private final IndexIO indexIO; - private TaskToolbox taskToolbox; + private final CoordinatorClient coordinatorClient; + private final SegmentLoaderFactory segmentLoaderFactory; + private final RetryPolicyFactory retryPolicyFactory; @JsonCreator public IngestSegmentFirehoseFactory( @@ -77,7 +86,10 @@ public IngestSegmentFirehoseFactory( @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, - @JacksonInject IndexIO indexIO + @JacksonInject IndexIO indexIO, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -88,6 +100,9 @@ public IngestSegmentFirehoseFactory( this.dimensions = dimensions; this.metrics = metrics; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); + this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); + this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); + this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); } @JsonProperty @@ -120,23 +135,46 @@ public List getMetrics() return metrics; } - public void setTaskToolbox(TaskToolbox taskToolbox) - { - this.taskToolbox = taskToolbox; - } - @Override public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException { log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); - Preconditions.checkNotNull(taskToolbox, "taskToolbox is not set"); - try { - final List usedSegments = taskToolbox - .getTaskActionClient() - .submit(new SegmentListUsedAction(dataSource, interval, null)); - final Map segmentFileMap = taskToolbox.fetchSegments(usedSegments); + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration + // as TaskActionClient. + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + List usedSegments; + while (true) { + try { + usedSegments = + coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); + break; + } + catch (Throwable e) { + log.warn(e, "Exception getting database segments"); + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { + throw e; + } else { + final long sleepTime = jitter(delay.getMillis()); + log.info("Will try again in [%s].", new Duration(sleepTime).toString()); + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e2) { + throw new RuntimeException(e2); + } + } + } + } + + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + Map segmentFileMap = Maps.newLinkedHashMap(); + for (DataSegment segment : usedSegments) { + segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + } + final List> timeLineSegments = VersionedIntervalTimeline .forSegments(usedSegments) .lookup(interval); @@ -201,11 +239,18 @@ public WindowedStorageAdapter apply(final PartitionChunk input) final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser); return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); } - catch (IOException | SegmentLoadingException e) { + catch (SegmentLoadingException e) { throw Throwables.propagate(e); } } + private long jitter(long input) + { + final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; + long retval = input + (long) jitter; + return retval < 0 ? 0 : retval; + } + @VisibleForTesting static List getUniqueDimensions( List> timelineSegments, @@ -260,7 +305,7 @@ static List getUniqueMetrics(List orderedMetrics = uniqueMetrics.inverse(); return IntStream.range(0, orderedMetrics.size()) - .mapToObj(orderedMetrics::get) - .collect(Collectors.toList()); + .mapToObj(orderedMetrics::get) + .collect(Collectors.toList()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index d9acfd6bd833..0966d1b84c98 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -76,6 +76,7 @@ public class TaskToolboxTest private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class); private ObjectMapper ObjectMapper = new ObjectMapper(); + private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class); private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); private Task task = EasyMock.createMock(Task.class); private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class); @@ -107,7 +108,7 @@ public void setUp() throws IOException () -> mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, - new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), + mockSegmentLoaderFactory, ObjectMapper, mockIndexIO, mockCache, @@ -162,13 +163,13 @@ public void testGetObjectMapper() public void testFetchSegments() throws SegmentLoadingException, IOException { File expectedFile = temporaryFolder.newFile(); + EasyMock + .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject())) + .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); EasyMock .expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject())) .andReturn(expectedFile).anyTimes(); - EasyMock - .expect(mockSegmentLoaderLocalCacheManager.withConfig(EasyMock.anyObject())) - .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); - EasyMock.replay(mockSegmentLoaderLocalCacheManager); + EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager); DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build(); List segments = ImmutableList.of ( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index b57397ab8cd6..b809e7a5ee85 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -119,7 +119,6 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; @@ -1607,9 +1606,7 @@ public List getLocations() () -> conglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index bbf28708ff89..6ad0ec4b1465 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.LocalTaskActionClient; @@ -52,6 +56,7 @@ import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -96,12 +101,24 @@ public class CompactionTaskRunTest extends IngestionTestBase ); private RowIngestionMetersFactory rowIngestionMetersFactory; + private CoordinatorClient coordinatorClient; + private SegmentLoaderFactory segmentLoaderFactory; private ExecutorService exec; + private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); public CompactionTaskRunTest() { TestUtils testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); + coordinatorClient = new CoordinatorClient(null, null) + { + @Override + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals); + } + }; + segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()); } @Before @@ -126,7 +143,10 @@ public void testRun() throws Exception getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask compactionTask = builder @@ -156,7 +176,10 @@ public void testRunCompactionTwiceWithoutKeepSegmentGranularity() throws Excepti getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask compactionTask1 = builder @@ -200,7 +223,10 @@ public void testRunCompactionTwiceWithKeepSegmentGranularity() throws Exception getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask compactionTask1 = builder @@ -248,7 +274,10 @@ public void testWithSegmentGranularity() throws Exception getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); // day segmentGranularity diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f759cc7c1c62..5117c1a31c88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -41,6 +42,9 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; @@ -153,12 +157,15 @@ public class CompactionTaskTest private static List AGGREGATORS; private static List SEGMENTS; private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); + private static Map segmentMap = new HashMap<>(); + private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap); private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - private static Map segmentMap; + private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); private final boolean keepSegmentGranularity; private TaskToolbox toolbox; + private SegmentLoaderFactory segmentLoaderFactory; @BeforeClass public static void setupClass() @@ -202,7 +209,6 @@ public static void setupClass() AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); - segmentMap = new HashMap<>(SEGMENT_INTERVALS.size()); for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); segmentMap.put( @@ -243,6 +249,8 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); + binder.bind(CoordinatorClient.class).toInstance(coordinatorClient); + binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); } ) ) @@ -307,19 +315,21 @@ private static IndexTuningConfig createTuningConfig() @Before public void setup() { + final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap); toolbox = new TestTaskToolbox( new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())), - new TestIndexIO(objectMapper, segmentMap), + testIndexIO, segmentMap ); + segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper); } @Parameters(name = "keepSegmentGranularity={0}") public static Collection parameters() { return ImmutableList.of( - new Object[] {false}, - new Object[] {true} + new Object[]{false}, + new Object[]{true} ); } @@ -336,7 +346,10 @@ public void testSerdeWithInterval() throws IOException objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder .interval(COMPACTION_INTERVAL) @@ -357,7 +370,10 @@ public void testSerdeWithSegments() throws IOException objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder .segments(SEGMENTS) @@ -378,7 +394,10 @@ public void testSerdeWithDimensions() throws IOException objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder @@ -426,7 +445,10 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -440,7 +462,13 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept ) ); Assert.assertEquals(6, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + Granularities.MONTH + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( @@ -491,7 +519,10 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -564,7 +595,10 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -637,7 +671,10 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -710,7 +747,10 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); if (keepSegmentGranularity) { @@ -760,7 +800,10 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, customMetricsSpec, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( @@ -805,7 +848,10 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -819,7 +865,13 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se ) ); Assert.assertEquals(6, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + Granularities.MONTH + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( @@ -850,7 +902,10 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } @@ -871,7 +926,10 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } @@ -886,7 +944,10 @@ public void testEmptyInterval() objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder @@ -934,7 +995,10 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } @@ -949,7 +1013,10 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException null, null, new PeriodGranularity(Period.months(3), null, null), - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -982,7 +1049,10 @@ public void testSegmentGranularityWithFalseKeepSegmentGranularity() throws IOExc null, false, new PeriodGranularity(Period.months(3), null, null), - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1015,7 +1085,10 @@ public void testNullSegmentGranularityAndNullKeepSegmentGranularity() throws IOE null, null, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( true @@ -1048,7 +1121,10 @@ public void testUseKeepSegmentGranularityAndSegmentGranularityTogether() objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder .interval(COMPACTION_INTERVAL) @@ -1222,6 +1298,23 @@ private static void assertIngestionSchema( } } + private static class TestCoordinatorClient extends CoordinatorClient + { + private final Map segmentMap; + + TestCoordinatorClient(Map segmentMap) + { + super(null, null); + this.segmentMap = segmentMap; + } + + @Override + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + return new ArrayList<>(segmentMap.keySet()); + } + } + private static class TestTaskToolbox extends TaskToolbox { private final Map segmentFileMap; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 2c3d3c3d8730..2db92726b4cf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -107,7 +107,6 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -1076,9 +1075,7 @@ public List getLocations() () -> conglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 3329419cf337..7f44ad6a528e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -29,6 +29,9 @@ import com.google.common.io.Files; import com.google.inject.Binder; import com.google.inject.Module; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -39,21 +42,16 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; -import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; -import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; -import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -61,7 +59,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; @@ -72,16 +69,9 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.loading.DataSegmentArchiver; -import org.apache.druid.segment.loading.DataSegmentKiller; -import org.apache.druid.segment.loading.DataSegmentMover; -import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalLoadSpec; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; -import org.apache.druid.segment.loading.StorageLocationConfig; -import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; +import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; @@ -103,15 +93,12 @@ import java.io.File; import java.io.IOException; -import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -170,157 +157,21 @@ public static Collection constructorFeeder() throws IOException } INDEX_MERGER_V9.persist(index, persistDir, indexSpec, null); - final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) + final CoordinatorClient cc = new CoordinatorClient(null, null) { - private final Set published = new HashSet<>(); - @Override - public List getUsedSegmentsForInterval(String dataSource, Interval interval) + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) { return ImmutableList.copyOf(segmentSet); } - - @Override - public List getUsedSegmentsForIntervals(String dataSource, List interval) - { - return ImmutableList.copyOf(segmentSet); - } - - @Override - public List getUnusedSegmentsForInterval(String dataSource, Interval interval) - { - return ImmutableList.of(); - } - - @Override - public Set announceHistoricalSegments(Set segments) - { - Set added = new HashSet<>(); - for (final DataSegment segment : segments) { - if (published.add(segment)) { - added.add(segment); - } - } - - return ImmutableSet.copyOf(added); - } - - @Override - public void deleteSegments(Set segments) - { - // do nothing - } }; - final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory( - TASK_STORAGE, - new TaskActionToolbox( - TASK_LOCKBOX, - TASK_STORAGE, - mdc, - newMockEmitter(), - EasyMock.createMock(SupervisorManager.class) - ), - new TaskAuditLogConfig(false) - ); + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }; - final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), - tac, - newMockEmitter(), - new DataSegmentPusher() - { - @Deprecated - @Override - public String getPathForHadoop(String dataSource) - { - return getPathForHadoop(); - } - - @Override - public String getPathForHadoop() - { - throw new UnsupportedOperationException(); - } - - @Override - public DataSegment push(File file, DataSegment segment, boolean useUniquePath) - { - return segment; - } + final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER); + final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); - @Override - public Map makeLoadSpec(URI uri) - { - throw new UnsupportedOperationException(); - } - }, - new DataSegmentKiller() - { - @Override - public void kill(DataSegment segments) - { - - } - - @Override - public void killAll() - { - throw new UnsupportedOperationException("not implemented"); - } - }, - new DataSegmentMover() - { - @Override - public DataSegment move(DataSegment dataSegment, Map targetLoadSpec) - { - return dataSegment; - } - }, - new DataSegmentArchiver() - { - @Override - public DataSegment archive(DataSegment segment) - { - return segment; - } - - @Override - public DataSegment restore(DataSegment segment) - { - return segment; - } - }, - null, // segment announcer - null, - notifierFactory, - null, // query runner factory conglomerate corporation unionized collective - null, // query executor service - null, // monitor scheduler - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER) - ), - MAPPER, - INDEX_IO, - null, - null, - null, - INDEX_MERGER_V9, - null, - null, - null, - null, - new NoopTestTaskFileWriter() - ); Collection values = new ArrayList<>(); for (InputRowParser parser : Arrays.asList( ROW_PARSER, @@ -342,27 +193,35 @@ public DataSegment restore(DataSegment segment) null, ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME) )) { - final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( - TASK.getDataSource(), - Intervals.ETERNITY, - new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), - dim_names, - metric_names, - INDEX_IO - ); - factory.setTaskToolbox(taskToolboxFactory.build(TASK)); - values.add( - new Object[]{ - StringUtils.format( - "DimNames[%s]MetricNames[%s]ParserDimNames[%s]", - dim_names == null ? "null" : "dims", - metric_names == null ? "null" : "metrics", - parser == ROW_PARSER ? "dims" : "null" - ), - factory, - parser - } - ); + for (Boolean wrapInCombining : Arrays.asList(false, true)) { + final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory( + TASK.getDataSource(), + Intervals.ETERNITY, + new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), + dim_names, + metric_names, + INDEX_IO, + cc, + slf, + retryPolicyFactory + ); + final FirehoseFactory factory = wrapInCombining + ? new CombiningFirehoseFactory(ImmutableList.of(isfFactory)) + : isfFactory; + values.add( + new Object[]{ + StringUtils.format( + "DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]", + dim_names == null ? "null" : "dims", + metric_names == null ? "null" : "metrics", + parser == ROW_PARSER ? "dims" : "null", + wrapInCombining + ), + factory, + parser + } + ); + } } } } @@ -407,7 +266,7 @@ public void configure(Binder binder) public IngestSegmentFirehoseFactoryTest( String testName, - IngestSegmentFirehoseFactory factory, + FirehoseFactory factory, InputRowParser rowParser ) { @@ -436,7 +295,7 @@ public IngestSegmentFirehoseFactoryTest( private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile(); private static final List segmentSet = new ArrayList<>(MAX_SHARD_NUMBER); - private final IngestSegmentFirehoseFactory factory; + private final FirehoseFactory factory; private final InputRowParser rowParser; private static final InputRowParser> ROW_PARSER = new MapInputRowParser( @@ -518,15 +377,20 @@ private static void recursivelyDelete(final File dir) @Test public void sanityTest() { - Assert.assertEquals(TASK.getDataSource(), factory.getDataSource()); - if (factory.getDimensions() != null) { - Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray()); + if (factory instanceof CombiningFirehoseFactory) { + // This method tests IngestSegmentFirehoseFactory-specific methods. + return; + } + final IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory) factory; + Assert.assertEquals(TASK.getDataSource(), isfFactory.getDataSource()); + if (isfFactory.getDimensions() != null) { + Assert.assertArrayEquals(new String[]{DIM_NAME}, isfFactory.getDimensions().toArray()); } - Assert.assertEquals(Intervals.ETERNITY, factory.getInterval()); - if (factory.getMetrics() != null) { + Assert.assertEquals(Intervals.ETERNITY, isfFactory.getInterval()); + if (isfFactory.getMetrics() != null) { Assert.assertEquals( ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), - ImmutableSet.copyOf(factory.getMetrics()) + ImmutableSet.copyOf(isfFactory.getMetrics()) ); } } @@ -536,15 +400,17 @@ public void simpleFirehoseReadingTest() throws IOException { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(rowParser, null)) { + try (final Firehose firehose = factory.connect(rowParser, null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME)); - Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001); + Assert.assertEquals( + METRIC_FLOAT_VALUE, + row.getMetric(METRIC_FLOAT_NAME).floatValue(), + METRIC_FLOAT_VALUE * 0.0001 + ); ++rowcount; } } @@ -563,9 +429,8 @@ public void testTransformSpec() throws IOException ) ); int skipped = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(transformSpec.decorate(rowParser), null)) { + try (final Firehose firehose = + factory.connect(transformSpec.decorate(rowParser), null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); if (row == null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 70e877bad45b..2cc09481e527 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Iterables; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -34,20 +35,10 @@ import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; -import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; -import org.apache.druid.indexing.common.actions.LockAcquireAction; -import org.apache.druid.indexing.common.actions.SegmentListUsedAction; -import org.apache.druid.indexing.common.actions.TaskAction; -import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.actions.TaskActionClientFactory; -import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter; -import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -59,12 +50,8 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.TransformSpec; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -283,81 +270,34 @@ public static Collection constructorFeeder() final List constructors = new ArrayList<>(); for (final TestCase testCase : testCases) { - final TaskActionClient taskActionClient = new TaskActionClient() - { - @Override - public RetType submit(TaskAction taskAction) - { - if (taskAction instanceof SegmentListUsedAction) { - // Expect the interval we asked for - final SegmentListUsedAction action = (SegmentListUsedAction) taskAction; - if (action.getIntervals().equals(ImmutableList.of(testCase.interval))) { - return (RetType) ImmutableList.copyOf(testCase.segments); - } else { - throw new IllegalArgumentException("WTF"); - } - } else if (taskAction instanceof LockAcquireAction) { - return (RetType) new TaskLock(TaskLockType.EXCLUSIVE, null, DATA_SOURCE, Intervals.of("2000/2001"), "v1", 0); - } else { - throw new UnsupportedOperationException(); - } - } - }; SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() + final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER); + final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); + final CoordinatorClient cc = new CoordinatorClient(null, null) { @Override - public List getLocations() + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) { - return new ArrayList<>(); + // Expect the interval we asked for + if (intervals.equals(ImmutableList.of(testCase.interval))) { + return ImmutableList.copyOf(testCase.segments); + } else { + throw new IllegalArgumentException("WTF"); + } } }; - final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), - new TaskActionClientFactory() - { - @Override - public TaskActionClient create(Task task) - { - return taskActionClient; - } - }, - new NoopServiceEmitter(), - null, // segment pusher - null, // segment killer - null, // segment mover - null, // segment archiver - null, // segment announcer, - null, - notifierFactory, - null, // query runner factory conglomerate corporation unionized collective - null, // query executor service - null, // monitor scheduler - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER) - ), - MAPPER, - INDEX_IO, - null, - null, - null, - INDEX_MERGER_V9, - null, - null, - null, - null, - new NoopTestTaskFileWriter() - ); final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, testCase.interval, new TrueDimFilter(), Arrays.asList(DIMENSIONS), Arrays.asList(METRICS), - INDEX_IO + INDEX_IO, + cc, + slf, + retryPolicyFactory ); - factory.setTaskToolbox(taskToolboxFactory.build(NoopTask.create(DATA_SOURCE))); constructors.add( new Object[]{ diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index c8f9380714ba..49315d3973d0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -37,7 +37,6 @@ import org.apache.druid.segment.loading.NoopDataSegmentKiller; import org.apache.druid.segment.loading.NoopDataSegmentMover; import org.apache.druid.segment.loading.NoopDataSegmentPusher; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.server.initialization.ServerConfig; @@ -94,7 +93,7 @@ public void setup() throws IOException null, null, null, - new SegmentLoaderFactory(EasyMock.createMock(SegmentLoaderLocalCacheManager.class)), + new SegmentLoaderFactory(null, utils.getTestObjectMapper()), utils.getTestObjectMapper(), utils.getTestIndexIO(), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 265150c60564..455c3f02d965 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -106,7 +106,6 @@ import org.apache.druid.segment.loading.LocalDataSegmentKiller; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.FireDepartmentTest; @@ -611,9 +610,7 @@ public void unannounceSegments(Iterable segments) () -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective Execs.directExecutor(), // query executor service monitorScheduler, // monitor scheduler - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, new DefaultObjectMapper()) - ), + new SegmentLoaderFactory(null, new DefaultObjectMapper()), MAPPER, INDEX_IO, MapCache.create(0), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 749d44af805e..b86b654847a4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -41,7 +41,6 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.coordination.ChangeRequestHistory; @@ -120,7 +119,7 @@ public List getLocations() null, null, null, - new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager(null, loaderConfig, jsonMapper)), + new SegmentLoaderFactory(null, jsonMapper), jsonMapper, indexIO, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 20eb5fc3d675..4afdd3cf9d5e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -46,9 +46,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.IndexerZkConfig; @@ -62,10 +59,10 @@ import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.List; /** + * */ public class WorkerTaskMonitorTest { @@ -169,20 +166,8 @@ private WorkerTaskMonitor createTaskMonitor() new TaskToolboxFactory( taskConfig, taskActionClientFactory, - null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager( - null, - new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }, - jsonMapper - ) - ), + null, null, null, null, null, null, null, notifierFactory, null, null, null, + new SegmentLoaderFactory(null, jsonMapper), jsonMapper, indexIO, null, diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 2a197cfa0d8b..418854059389 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -28,11 +28,13 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.FullResponseHolder; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; import java.util.List; public class CoordinatorClient @@ -95,13 +97,15 @@ public List fetchServerView(String dataSource, Interva { try { FullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.GET, - StringUtils.format( - "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", - dataSource, - interval.toString().replace('/', '_'), - incompleteOk - )) + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", + StringUtils.urlEncode(dataSource), + interval.toString().replace('/', '_'), + incompleteOk + ) + ) ); if (!response.getStatus().equals(HttpResponseStatus.OK)) { @@ -121,4 +125,35 @@ public List fetchServerView(String dataSource, Interva throw new RuntimeException(e); } } + + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + try { + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest( + HttpMethod.POST, + StringUtils.format( + "/druid/coordinator/v1/metadata/datasources/%s/segments?full", + StringUtils.urlEncode(dataSource) + ) + ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(intervals)) + ); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching database segment data source segments status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 68e8a20b62b5..92987354a74b 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -58,6 +58,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader } }; + // Note that we only create this via injection in historical and realtime nodes. Peons create these + // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific + // directories rather than statically configured directories. @Inject public SegmentLoaderLocalCacheManager( IndexIO indexIO, @@ -79,11 +82,6 @@ public SegmentLoaderLocalCacheManager( } } - public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config) - { - return new SegmentLoaderLocalCacheManager(indexIO, config, jsonMapper); - } - @Override public boolean isSegmentLoaded(final DataSegment segment) { diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 71dc3050c285..e4334f4cb8eb 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -91,7 +91,6 @@ import org.apache.druid.segment.loading.OmniDataSegmentArchiver; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.apache.druid.segment.loading.OmniDataSegmentMover; -import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; @@ -109,7 +108,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; @@ -255,12 +253,6 @@ public void configure(Binder binder) .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) .in(LazySingleton.class); - // Override the default SegmentLoaderConfig because we don't actually care about the - // configuration based locations. This will override them anyway. This is also stopping - // configuration of other parameters, but I don't think that's actually a problem. - // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. - binder.bind(SegmentLoaderConfig.class) - .toInstance(new SegmentLoaderConfig().withLocations(Collections.emptyList())); binder.bind(CoordinatorClient.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);