From 62533286d1eceffe6ee93068432747809a8b51ac Mon Sep 17 00:00:00 2001 From: David Glasser Date: Mon, 18 Feb 2019 23:24:40 -0800 Subject: [PATCH] ParallelIndexSubTask: support ingestSegment in delegating factories IndexTask had special-cased code to properly send a TaskToolbox to a IngestSegmentFirehoseFactory that's nested inside a CombiningFirehoseFactory, but ParallelIndexSubTask didn't. This change refactors IngestSegmentFirehoseFactory so that it doesn't need a TaskToolbox; it instead gets a CoordinatorClient and a SegmentLoaderFactory directly injected into it. This also refactors SegmentLoaderFactory so it doesn't depend on an injectable SegmentLoaderConfig, since its only method always replaces the preconfigured SegmentLoaderConfig anyway. This makes it possible to use SegmentLoaderFactory without setting druid.segmentCaches.locations to some dummy value. Another goal of this PR is to make it possible for IngestSegmentFirehoseFactory to list data segments outside of connect() --- specifically, to make it a FiniteFirehoseFactory which can query the coordinator in order to calculate its splits. See #7048. This also adds missing datasource name URL-encoding to an API used by CoordinatorBasedSegmentHandoffNotifier. --- .../indexing/kafka/KafkaIndexTaskTest.java | 5 +- .../kinesis/KinesisIndexTaskTest.java | 5 +- .../indexing/common/SegmentLoaderFactory.java | 20 +- .../indexing/common/task/CompactionTask.java | 113 ++++++-- .../druid/indexing/common/task/IndexTask.java | 23 -- .../batch/parallel/ParallelIndexSubTask.java | 6 - .../IngestSegmentFirehoseFactory.java | 81 ++++-- .../indexing/common/TaskToolboxTest.java | 11 +- ...penderatorDriverRealtimeIndexTaskTest.java | 5 +- .../common/task/CompactionTaskRunTest.java | 37 ++- .../common/task/CompactionTaskTest.java | 143 ++++++++-- .../common/task/RealtimeIndexTaskTest.java | 5 +- .../IngestSegmentFirehoseFactoryTest.java | 257 +++++------------- ...estSegmentFirehoseFactoryTimelineTest.java | 94 ++----- .../SingleTaskBackgroundRunnerTest.java | 3 +- .../indexing/overlord/TaskLifecycleTest.java | 5 +- .../worker/WorkerTaskManagerTest.java | 3 +- .../worker/WorkerTaskMonitorTest.java | 21 +- .../client/coordinator/CoordinatorClient.java | 49 +++- .../SegmentLoaderLocalCacheManager.java | 8 +- .../java/org/apache/druid/cli/CliPeon.java | 8 - 21 files changed, 458 insertions(+), 444 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 1a5512889d94..892349f755fe 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -139,7 +139,6 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -2559,9 +2558,7 @@ public List getLocations() this::makeTimeseriesAndScanConglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index f0ef143e5eec..52941166c002 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -135,7 +135,6 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -2765,9 +2764,7 @@ public List getLocations() this::makeTimeseriesOnlyConglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java index b15d3dfc4bdd..83fa9dbb06b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java @@ -19,7 +19,10 @@ package org.apache.druid.indexing.common; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; @@ -29,23 +32,30 @@ import java.util.Collections; /** + * */ public class SegmentLoaderFactory { - private final SegmentLoaderLocalCacheManager loader; + private final IndexIO indexIO; + private final ObjectMapper jsonMapper; @Inject public SegmentLoaderFactory( - SegmentLoaderLocalCacheManager loader + IndexIO indexIO, + @Json ObjectMapper mapper ) { - this.loader = loader; + this.indexIO = indexIO; + this.jsonMapper = mapper; } public SegmentLoader manufacturate(File storageDir) { - return loader.withConfig( - new SegmentLoaderConfig().withLocations(Collections.singletonList(new StorageLocationConfig().setPath(storageDir))) + return new SegmentLoaderLocalCacheManager( + indexIO, + new SegmentLoaderConfig().withLocations( + Collections.singletonList(new StorageLocationConfig().setPath(storageDir))), + jsonMapper ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 740d78d36395..c897de3fc0f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -29,6 +29,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -40,6 +41,8 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -132,6 +135,15 @@ public class CompactionTask extends AbstractTask @JsonIgnore private final RowIngestionMetersFactory rowIngestionMetersFactory; + @JsonIgnore + private final CoordinatorClient coordinatorClient; + + @JsonIgnore + private final SegmentLoaderFactory segmentLoaderFactory; + + @JsonIgnore + private final RetryPolicyFactory retryPolicyFactory; + @JsonIgnore private List indexTaskSpecs; @@ -153,7 +165,10 @@ public CompactionTask( @JacksonInject ObjectMapper jsonMapper, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject ChatHandlerProvider chatHandlerProvider, - @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory ) { super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context); @@ -186,6 +201,9 @@ public CompactionTask( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.coordinatorClient = coordinatorClient; + this.segmentLoaderFactory = segmentLoaderFactory; + this.retryPolicyFactory = retryPolicyFactory; } @JsonProperty @@ -278,20 +296,23 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception metricsSpec, keepSegmentGranularity, segmentGranularity, - jsonMapper + jsonMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ).stream() - .map(spec -> new IndexTask( - getId(), - getGroupId(), - getTaskResource(), - getDataSource(), - spec, - getContext(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory - )) - .collect(Collectors.toList()); + .map(spec -> new IndexTask( + getId(), + getGroupId(), + getTaskResource(), + getDataSource(), + spec, + getContext(), + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory + )) + .collect(Collectors.toList()); } if (indexTaskSpecs.isEmpty()) { @@ -338,7 +359,10 @@ static List createIngestionSchema( @Nullable final AggregatorFactory[] metricsSpec, @Nullable final Boolean keepSegmentGranularity, @Nullable final Granularity segmentGranularity, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final CoordinatorClient coordinatorClient, + final SegmentLoaderFactory segmentLoaderFactory, + final RetryPolicyFactory retryPolicyFactory ) throws IOException, SegmentLoadingException { Pair, List>> pair = prepareSegments( @@ -379,7 +403,14 @@ static List createIngestionSchema( return Collections.singletonList( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, segmentProvider.interval), + createIoConfig( + toolbox, + dataSchema, + segmentProvider.interval, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ), compactionTuningConfig ) ); @@ -411,7 +442,14 @@ static List createIngestionSchema( specs.add( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, interval), + createIoConfig( + toolbox, + dataSchema, + interval, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ), compactionTuningConfig ) ); @@ -438,7 +476,14 @@ static List createIngestionSchema( return Collections.singletonList( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, segmentProvider.interval), + createIoConfig( + toolbox, + dataSchema, + segmentProvider.interval, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ), compactionTuningConfig ) ); @@ -446,7 +491,14 @@ static List createIngestionSchema( } } - private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval) + private static IndexIOConfig createIoConfig( + TaskToolbox toolbox, + DataSchema dataSchema, + Interval interval, + CoordinatorClient coordinatorClient, + SegmentLoaderFactory segmentLoaderFactory, + RetryPolicyFactory retryPolicyFactory + ) { return new IndexIOConfig( new IngestSegmentFirehoseFactory( @@ -456,7 +508,10 @@ private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema data // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), - toolbox.getIndexIO() + toolbox.getIndexIO(), + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ), false ); @@ -811,7 +866,7 @@ IndexTuningConfig computeTuningConfig(List> qu * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment}, * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together. * {@link #hasPartitionConfig} checks one of those configs is set. - * + *

* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig * returns true. If targetCompactionSizeBytes is not set, this returns null or * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of @@ -860,6 +915,9 @@ public static class Builder private final AuthorizerMapper authorizerMapper; private final ChatHandlerProvider chatHandlerProvider; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final CoordinatorClient coordinatorClient; + private final SegmentLoaderFactory segmentLoaderFactory; + private final RetryPolicyFactory retryPolicyFactory; @Nullable private Interval interval; @@ -885,7 +943,10 @@ public Builder( ObjectMapper jsonMapper, AuthorizerMapper authorizerMapper, ChatHandlerProvider chatHandlerProvider, - RowIngestionMetersFactory rowIngestionMetersFactory + RowIngestionMetersFactory rowIngestionMetersFactory, + CoordinatorClient coordinatorClient, + SegmentLoaderFactory segmentLoaderFactory, + RetryPolicyFactory retryPolicyFactory ) { this.dataSource = dataSource; @@ -893,6 +954,9 @@ public Builder( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.coordinatorClient = coordinatorClient; + this.segmentLoaderFactory = segmentLoaderFactory; + this.retryPolicyFactory = retryPolicyFactory; } public Builder interval(Interval interval) @@ -968,7 +1032,10 @@ public CompactionTask build() jsonMapper, authorizerMapper, chatHandlerProvider, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 6d5a0d8e56ad..d0e083a2b6ed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -84,7 +83,6 @@ import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; @@ -419,8 +417,6 @@ public TaskStatus run(final TaskToolbox toolbox) final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - setFirehoseFactoryToolbox(firehoseFactory, toolbox); - final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. FileUtils.forceMkdir(firehoseTempDir); @@ -489,25 +485,6 @@ public TaskStatus run(final TaskToolbox toolbox) } } - // pass toolbox to any IngestSegmentFirehoseFactory - private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox) - { - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - return; - } - - if (firehoseFactory instanceof CombiningFirehoseFactory) { - for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) { - if (delegateFactory instanceof IngestSegmentFirehoseFactory) { - ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox); - } else if (delegateFactory instanceof CombiningFirehoseFactory) { - setFirehoseFactoryToolbox(delegateFactory, toolbox); - } - } - } - } - private Map getTaskCompletionReports() { return TaskReport.buildTaskReports( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 8004243bb3aa..435de05892ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -185,11 +184,6 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception { final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { - // pass toolbox to Firehose - ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); - } - final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); // Firehose temporary directory is automatically removed when this IndexTask completes. FileUtils.forceMkdir(firehoseTempDir); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 8087582ecb55..bae2946bbdc9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -30,16 +30,20 @@ import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.SegmentListUsedAction; +import org.apache.druid.indexing.common.RetryPolicy; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; @@ -48,14 +52,17 @@ import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -68,7 +75,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory dimensions; private final List metrics; private final IndexIO indexIO; - private TaskToolbox taskToolbox; + private final CoordinatorClient coordinatorClient; + private final SegmentLoaderFactory segmentLoaderFactory; + private final RetryPolicyFactory retryPolicyFactory; @JsonCreator public IngestSegmentFirehoseFactory( @@ -77,7 +86,10 @@ public IngestSegmentFirehoseFactory( @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, - @JacksonInject IndexIO indexIO + @JacksonInject IndexIO indexIO, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -88,6 +100,9 @@ public IngestSegmentFirehoseFactory( this.dimensions = dimensions; this.metrics = metrics; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); + this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); + this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); + this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); } @JsonProperty @@ -120,23 +135,46 @@ public List getMetrics() return metrics; } - public void setTaskToolbox(TaskToolbox taskToolbox) - { - this.taskToolbox = taskToolbox; - } - @Override public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException { log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); - Preconditions.checkNotNull(taskToolbox, "taskToolbox is not set"); - try { - final List usedSegments = taskToolbox - .getTaskActionClient() - .submit(new SegmentListUsedAction(dataSource, interval, null)); - final Map segmentFileMap = taskToolbox.fetchSegments(usedSegments); + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration + // as TaskActionClient. + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + List usedSegments; + while (true) { + try { + usedSegments = + coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); + break; + } + catch (Throwable e) { + log.warn(e, "Exception getting database segments"); + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { + throw e; + } else { + final long sleepTime = jitter(delay.getMillis()); + log.info("Will try again in [%s].", new Duration(sleepTime).toString()); + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e2) { + throw new RuntimeException(e2); + } + } + } + } + + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + Map segmentFileMap = Maps.newLinkedHashMap(); + for (DataSegment segment : usedSegments) { + segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + } + final List> timeLineSegments = VersionedIntervalTimeline .forSegments(usedSegments) .lookup(interval); @@ -201,11 +239,18 @@ public WindowedStorageAdapter apply(final PartitionChunk input) final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser); return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); } - catch (IOException | SegmentLoadingException e) { + catch (SegmentLoadingException e) { throw Throwables.propagate(e); } } + private long jitter(long input) + { + final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; + long retval = input + (long) jitter; + return retval < 0 ? 0 : retval; + } + @VisibleForTesting static List getUniqueDimensions( List> timelineSegments, @@ -260,7 +305,7 @@ static List getUniqueMetrics(List orderedMetrics = uniqueMetrics.inverse(); return IntStream.range(0, orderedMetrics.size()) - .mapToObj(orderedMetrics::get) - .collect(Collectors.toList()); + .mapToObj(orderedMetrics::get) + .collect(Collectors.toList()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index d9acfd6bd833..0966d1b84c98 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -76,6 +76,7 @@ public class TaskToolboxTest private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class); private ObjectMapper ObjectMapper = new ObjectMapper(); + private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class); private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); private Task task = EasyMock.createMock(Task.class); private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class); @@ -107,7 +108,7 @@ public void setUp() throws IOException () -> mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, - new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), + mockSegmentLoaderFactory, ObjectMapper, mockIndexIO, mockCache, @@ -162,13 +163,13 @@ public void testGetObjectMapper() public void testFetchSegments() throws SegmentLoadingException, IOException { File expectedFile = temporaryFolder.newFile(); + EasyMock + .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject())) + .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); EasyMock .expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject())) .andReturn(expectedFile).anyTimes(); - EasyMock - .expect(mockSegmentLoaderLocalCacheManager.withConfig(EasyMock.anyObject())) - .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); - EasyMock.replay(mockSegmentLoaderLocalCacheManager); + EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager); DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build(); List segments = ImmutableList.of ( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index b57397ab8cd6..b809e7a5ee85 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -119,7 +119,6 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; @@ -1607,9 +1606,7 @@ public List getLocations() () -> conglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index bbf28708ff89..6ad0ec4b1465 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.LocalTaskActionClient; @@ -52,6 +56,7 @@ import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -96,12 +101,24 @@ public class CompactionTaskRunTest extends IngestionTestBase ); private RowIngestionMetersFactory rowIngestionMetersFactory; + private CoordinatorClient coordinatorClient; + private SegmentLoaderFactory segmentLoaderFactory; private ExecutorService exec; + private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); public CompactionTaskRunTest() { TestUtils testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); + coordinatorClient = new CoordinatorClient(null, null) + { + @Override + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals); + } + }; + segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()); } @Before @@ -126,7 +143,10 @@ public void testRun() throws Exception getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask compactionTask = builder @@ -156,7 +176,10 @@ public void testRunCompactionTwiceWithoutKeepSegmentGranularity() throws Excepti getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask compactionTask1 = builder @@ -200,7 +223,10 @@ public void testRunCompactionTwiceWithKeepSegmentGranularity() throws Exception getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask compactionTask1 = builder @@ -248,7 +274,10 @@ public void testWithSegmentGranularity() throws Exception getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); // day segmentGranularity diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f759cc7c1c62..5117c1a31c88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -41,6 +42,9 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.SegmentListUsedAction; @@ -153,12 +157,15 @@ public class CompactionTaskTest private static List AGGREGATORS; private static List SEGMENTS; private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); + private static Map segmentMap = new HashMap<>(); + private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap); private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - private static Map segmentMap; + private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); private final boolean keepSegmentGranularity; private TaskToolbox toolbox; + private SegmentLoaderFactory segmentLoaderFactory; @BeforeClass public static void setupClass() @@ -202,7 +209,6 @@ public static void setupClass() AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); - segmentMap = new HashMap<>(SEGMENT_INTERVALS.size()); for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); segmentMap.put( @@ -243,6 +249,8 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); + binder.bind(CoordinatorClient.class).toInstance(coordinatorClient); + binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); } ) ) @@ -307,19 +315,21 @@ private static IndexTuningConfig createTuningConfig() @Before public void setup() { + final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap); toolbox = new TestTaskToolbox( new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())), - new TestIndexIO(objectMapper, segmentMap), + testIndexIO, segmentMap ); + segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper); } @Parameters(name = "keepSegmentGranularity={0}") public static Collection parameters() { return ImmutableList.of( - new Object[] {false}, - new Object[] {true} + new Object[]{false}, + new Object[]{true} ); } @@ -336,7 +346,10 @@ public void testSerdeWithInterval() throws IOException objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder .interval(COMPACTION_INTERVAL) @@ -357,7 +370,10 @@ public void testSerdeWithSegments() throws IOException objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder .segments(SEGMENTS) @@ -378,7 +394,10 @@ public void testSerdeWithDimensions() throws IOException objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder @@ -426,7 +445,10 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -440,7 +462,13 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept ) ); Assert.assertEquals(6, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + Granularities.MONTH + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( @@ -491,7 +519,10 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -564,7 +595,10 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -637,7 +671,10 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -710,7 +747,10 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); if (keepSegmentGranularity) { @@ -760,7 +800,10 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, customMetricsSpec, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( @@ -805,7 +848,10 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( keepSegmentGranularity @@ -819,7 +865,13 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se ) ); Assert.assertEquals(6, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + Granularities.MONTH + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( @@ -850,7 +902,10 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } @@ -871,7 +926,10 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } @@ -886,7 +944,10 @@ public void testEmptyInterval() objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder @@ -934,7 +995,10 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg null, keepSegmentGranularity, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); } @@ -949,7 +1013,10 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException null, null, new PeriodGranularity(Period.months(3), null, null), - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -982,7 +1049,10 @@ public void testSegmentGranularityWithFalseKeepSegmentGranularity() throws IOExc null, false, new PeriodGranularity(Period.months(3), null, null), - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1015,7 +1085,10 @@ public void testNullSegmentGranularityAndNullKeepSegmentGranularity() throws IOE null, null, null, - objectMapper + objectMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( true @@ -1048,7 +1121,10 @@ public void testUseKeepSegmentGranularityAndSegmentGranularityTogether() objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory ); final CompactionTask task = builder .interval(COMPACTION_INTERVAL) @@ -1222,6 +1298,23 @@ private static void assertIngestionSchema( } } + private static class TestCoordinatorClient extends CoordinatorClient + { + private final Map segmentMap; + + TestCoordinatorClient(Map segmentMap) + { + super(null, null); + this.segmentMap = segmentMap; + } + + @Override + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + return new ArrayList<>(segmentMap.keySet()); + } + } + private static class TestTaskToolbox extends TaskToolbox { private final Map segmentFileMap; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 2c3d3c3d8730..2db92726b4cf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -107,7 +107,6 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -1076,9 +1075,7 @@ public List getLocations() () -> conglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) - ), + new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create(1024), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 3329419cf337..7f44ad6a528e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -29,6 +29,9 @@ import com.google.common.io.Files; import com.google.inject.Binder; import com.google.inject.Module; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -39,21 +42,16 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; -import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; -import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; -import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -61,7 +59,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; @@ -72,16 +69,9 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.loading.DataSegmentArchiver; -import org.apache.druid.segment.loading.DataSegmentKiller; -import org.apache.druid.segment.loading.DataSegmentMover; -import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalLoadSpec; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; -import org.apache.druid.segment.loading.StorageLocationConfig; -import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; +import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; @@ -103,15 +93,12 @@ import java.io.File; import java.io.IOException; -import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -170,157 +157,21 @@ public static Collection constructorFeeder() throws IOException } INDEX_MERGER_V9.persist(index, persistDir, indexSpec, null); - final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) + final CoordinatorClient cc = new CoordinatorClient(null, null) { - private final Set published = new HashSet<>(); - @Override - public List getUsedSegmentsForInterval(String dataSource, Interval interval) + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) { return ImmutableList.copyOf(segmentSet); } - - @Override - public List getUsedSegmentsForIntervals(String dataSource, List interval) - { - return ImmutableList.copyOf(segmentSet); - } - - @Override - public List getUnusedSegmentsForInterval(String dataSource, Interval interval) - { - return ImmutableList.of(); - } - - @Override - public Set announceHistoricalSegments(Set segments) - { - Set added = new HashSet<>(); - for (final DataSegment segment : segments) { - if (published.add(segment)) { - added.add(segment); - } - } - - return ImmutableSet.copyOf(added); - } - - @Override - public void deleteSegments(Set segments) - { - // do nothing - } }; - final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory( - TASK_STORAGE, - new TaskActionToolbox( - TASK_LOCKBOX, - TASK_STORAGE, - mdc, - newMockEmitter(), - EasyMock.createMock(SupervisorManager.class) - ), - new TaskAuditLogConfig(false) - ); + SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }; - final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), - tac, - newMockEmitter(), - new DataSegmentPusher() - { - @Deprecated - @Override - public String getPathForHadoop(String dataSource) - { - return getPathForHadoop(); - } - - @Override - public String getPathForHadoop() - { - throw new UnsupportedOperationException(); - } - - @Override - public DataSegment push(File file, DataSegment segment, boolean useUniquePath) - { - return segment; - } + final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER); + final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); - @Override - public Map makeLoadSpec(URI uri) - { - throw new UnsupportedOperationException(); - } - }, - new DataSegmentKiller() - { - @Override - public void kill(DataSegment segments) - { - - } - - @Override - public void killAll() - { - throw new UnsupportedOperationException("not implemented"); - } - }, - new DataSegmentMover() - { - @Override - public DataSegment move(DataSegment dataSegment, Map targetLoadSpec) - { - return dataSegment; - } - }, - new DataSegmentArchiver() - { - @Override - public DataSegment archive(DataSegment segment) - { - return segment; - } - - @Override - public DataSegment restore(DataSegment segment) - { - return segment; - } - }, - null, // segment announcer - null, - notifierFactory, - null, // query runner factory conglomerate corporation unionized collective - null, // query executor service - null, // monitor scheduler - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER) - ), - MAPPER, - INDEX_IO, - null, - null, - null, - INDEX_MERGER_V9, - null, - null, - null, - null, - new NoopTestTaskFileWriter() - ); Collection values = new ArrayList<>(); for (InputRowParser parser : Arrays.asList( ROW_PARSER, @@ -342,27 +193,35 @@ public DataSegment restore(DataSegment segment) null, ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME) )) { - final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( - TASK.getDataSource(), - Intervals.ETERNITY, - new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), - dim_names, - metric_names, - INDEX_IO - ); - factory.setTaskToolbox(taskToolboxFactory.build(TASK)); - values.add( - new Object[]{ - StringUtils.format( - "DimNames[%s]MetricNames[%s]ParserDimNames[%s]", - dim_names == null ? "null" : "dims", - metric_names == null ? "null" : "metrics", - parser == ROW_PARSER ? "dims" : "null" - ), - factory, - parser - } - ); + for (Boolean wrapInCombining : Arrays.asList(false, true)) { + final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory( + TASK.getDataSource(), + Intervals.ETERNITY, + new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), + dim_names, + metric_names, + INDEX_IO, + cc, + slf, + retryPolicyFactory + ); + final FirehoseFactory factory = wrapInCombining + ? new CombiningFirehoseFactory(ImmutableList.of(isfFactory)) + : isfFactory; + values.add( + new Object[]{ + StringUtils.format( + "DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]", + dim_names == null ? "null" : "dims", + metric_names == null ? "null" : "metrics", + parser == ROW_PARSER ? "dims" : "null", + wrapInCombining + ), + factory, + parser + } + ); + } } } } @@ -407,7 +266,7 @@ public void configure(Binder binder) public IngestSegmentFirehoseFactoryTest( String testName, - IngestSegmentFirehoseFactory factory, + FirehoseFactory factory, InputRowParser rowParser ) { @@ -436,7 +295,7 @@ public IngestSegmentFirehoseFactoryTest( private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile(); private static final List segmentSet = new ArrayList<>(MAX_SHARD_NUMBER); - private final IngestSegmentFirehoseFactory factory; + private final FirehoseFactory factory; private final InputRowParser rowParser; private static final InputRowParser> ROW_PARSER = new MapInputRowParser( @@ -518,15 +377,20 @@ private static void recursivelyDelete(final File dir) @Test public void sanityTest() { - Assert.assertEquals(TASK.getDataSource(), factory.getDataSource()); - if (factory.getDimensions() != null) { - Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray()); + if (factory instanceof CombiningFirehoseFactory) { + // This method tests IngestSegmentFirehoseFactory-specific methods. + return; + } + final IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory) factory; + Assert.assertEquals(TASK.getDataSource(), isfFactory.getDataSource()); + if (isfFactory.getDimensions() != null) { + Assert.assertArrayEquals(new String[]{DIM_NAME}, isfFactory.getDimensions().toArray()); } - Assert.assertEquals(Intervals.ETERNITY, factory.getInterval()); - if (factory.getMetrics() != null) { + Assert.assertEquals(Intervals.ETERNITY, isfFactory.getInterval()); + if (isfFactory.getMetrics() != null) { Assert.assertEquals( ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), - ImmutableSet.copyOf(factory.getMetrics()) + ImmutableSet.copyOf(isfFactory.getMetrics()) ); } } @@ -536,15 +400,17 @@ public void simpleFirehoseReadingTest() throws IOException { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(rowParser, null)) { + try (final Firehose firehose = factory.connect(rowParser, null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME)); - Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001); + Assert.assertEquals( + METRIC_FLOAT_VALUE, + row.getMetric(METRIC_FLOAT_NAME).floatValue(), + METRIC_FLOAT_VALUE * 0.0001 + ); ++rowcount; } } @@ -563,9 +429,8 @@ public void testTransformSpec() throws IOException ) ); int skipped = 0; - try (final IngestSegmentFirehose firehose = - (IngestSegmentFirehose) - factory.connect(transformSpec.decorate(rowParser), null)) { + try (final Firehose firehose = + factory.connect(transformSpec.decorate(rowParser), null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); if (row == null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 70e877bad45b..2cc09481e527 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Iterables; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -34,20 +35,10 @@ import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; -import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; -import org.apache.druid.indexing.common.actions.LockAcquireAction; -import org.apache.druid.indexing.common.actions.SegmentListUsedAction; -import org.apache.druid.indexing.common.actions.TaskAction; -import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.actions.TaskActionClientFactory; -import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter; -import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -59,12 +50,8 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.TransformSpec; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -283,81 +270,34 @@ public static Collection constructorFeeder() final List constructors = new ArrayList<>(); for (final TestCase testCase : testCases) { - final TaskActionClient taskActionClient = new TaskActionClient() - { - @Override - public RetType submit(TaskAction taskAction) - { - if (taskAction instanceof SegmentListUsedAction) { - // Expect the interval we asked for - final SegmentListUsedAction action = (SegmentListUsedAction) taskAction; - if (action.getIntervals().equals(ImmutableList.of(testCase.interval))) { - return (RetType) ImmutableList.copyOf(testCase.segments); - } else { - throw new IllegalArgumentException("WTF"); - } - } else if (taskAction instanceof LockAcquireAction) { - return (RetType) new TaskLock(TaskLockType.EXCLUSIVE, null, DATA_SOURCE, Intervals.of("2000/2001"), "v1", 0); - } else { - throw new UnsupportedOperationException(); - } - } - }; SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() + final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER); + final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); + final CoordinatorClient cc = new CoordinatorClient(null, null) { @Override - public List getLocations() + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) { - return new ArrayList<>(); + // Expect the interval we asked for + if (intervals.equals(ImmutableList.of(testCase.interval))) { + return ImmutableList.copyOf(testCase.segments); + } else { + throw new IllegalArgumentException("WTF"); + } } }; - final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), - new TaskActionClientFactory() - { - @Override - public TaskActionClient create(Task task) - { - return taskActionClient; - } - }, - new NoopServiceEmitter(), - null, // segment pusher - null, // segment killer - null, // segment mover - null, // segment archiver - null, // segment announcer, - null, - notifierFactory, - null, // query runner factory conglomerate corporation unionized collective - null, // query executor service - null, // monitor scheduler - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER) - ), - MAPPER, - INDEX_IO, - null, - null, - null, - INDEX_MERGER_V9, - null, - null, - null, - null, - new NoopTestTaskFileWriter() - ); final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, testCase.interval, new TrueDimFilter(), Arrays.asList(DIMENSIONS), Arrays.asList(METRICS), - INDEX_IO + INDEX_IO, + cc, + slf, + retryPolicyFactory ); - factory.setTaskToolbox(taskToolboxFactory.build(NoopTask.create(DATA_SOURCE))); constructors.add( new Object[]{ diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index c8f9380714ba..49315d3973d0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -37,7 +37,6 @@ import org.apache.druid.segment.loading.NoopDataSegmentKiller; import org.apache.druid.segment.loading.NoopDataSegmentMover; import org.apache.druid.segment.loading.NoopDataSegmentPusher; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.server.initialization.ServerConfig; @@ -94,7 +93,7 @@ public void setup() throws IOException null, null, null, - new SegmentLoaderFactory(EasyMock.createMock(SegmentLoaderLocalCacheManager.class)), + new SegmentLoaderFactory(null, utils.getTestObjectMapper()), utils.getTestObjectMapper(), utils.getTestIndexIO(), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 265150c60564..455c3f02d965 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -106,7 +106,6 @@ import org.apache.druid.segment.loading.LocalDataSegmentKiller; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.FireDepartmentTest; @@ -611,9 +610,7 @@ public void unannounceSegments(Iterable segments) () -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective Execs.directExecutor(), // query executor service monitorScheduler, // monitor scheduler - new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, new DefaultObjectMapper()) - ), + new SegmentLoaderFactory(null, new DefaultObjectMapper()), MAPPER, INDEX_IO, MapCache.create(0), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 749d44af805e..b86b654847a4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -41,7 +41,6 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.coordination.ChangeRequestHistory; @@ -120,7 +119,7 @@ public List getLocations() null, null, null, - new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager(null, loaderConfig, jsonMapper)), + new SegmentLoaderFactory(null, jsonMapper), jsonMapper, indexIO, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 20eb5fc3d675..4afdd3cf9d5e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -46,9 +46,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.IndexerZkConfig; @@ -62,10 +59,10 @@ import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.List; /** + * */ public class WorkerTaskMonitorTest { @@ -169,20 +166,8 @@ private WorkerTaskMonitor createTaskMonitor() new TaskToolboxFactory( taskConfig, taskActionClientFactory, - null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager( - null, - new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }, - jsonMapper - ) - ), + null, null, null, null, null, null, null, notifierFactory, null, null, null, + new SegmentLoaderFactory(null, jsonMapper), jsonMapper, indexIO, null, diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 2a197cfa0d8b..418854059389 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -28,11 +28,13 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.FullResponseHolder; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; import java.util.List; public class CoordinatorClient @@ -95,13 +97,15 @@ public List fetchServerView(String dataSource, Interva { try { FullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.GET, - StringUtils.format( - "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", - dataSource, - interval.toString().replace('/', '_'), - incompleteOk - )) + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", + StringUtils.urlEncode(dataSource), + interval.toString().replace('/', '_'), + incompleteOk + ) + ) ); if (!response.getStatus().equals(HttpResponseStatus.OK)) { @@ -121,4 +125,35 @@ public List fetchServerView(String dataSource, Interva throw new RuntimeException(e); } } + + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + try { + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest( + HttpMethod.POST, + StringUtils.format( + "/druid/coordinator/v1/metadata/datasources/%s/segments?full", + StringUtils.urlEncode(dataSource) + ) + ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(intervals)) + ); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching database segment data source segments status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 68e8a20b62b5..92987354a74b 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -58,6 +58,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader } }; + // Note that we only create this via injection in historical and realtime nodes. Peons create these + // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific + // directories rather than statically configured directories. @Inject public SegmentLoaderLocalCacheManager( IndexIO indexIO, @@ -79,11 +82,6 @@ public SegmentLoaderLocalCacheManager( } } - public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config) - { - return new SegmentLoaderLocalCacheManager(indexIO, config, jsonMapper); - } - @Override public boolean isSegmentLoaded(final DataSegment segment) { diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 71dc3050c285..e4334f4cb8eb 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -91,7 +91,6 @@ import org.apache.druid.segment.loading.OmniDataSegmentArchiver; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.apache.druid.segment.loading.OmniDataSegmentMover; -import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; @@ -109,7 +108,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; @@ -255,12 +253,6 @@ public void configure(Binder binder) .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) .in(LazySingleton.class); - // Override the default SegmentLoaderConfig because we don't actually care about the - // configuration based locations. This will override them anyway. This is also stopping - // configuration of other parameters, but I don't think that's actually a problem. - // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. - binder.bind(SegmentLoaderConfig.class) - .toInstance(new SegmentLoaderConfig().withLocations(Collections.emptyList())); binder.bind(CoordinatorClient.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);