diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 6cf7a2f5181b..2373a59ec926 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1106,7 +1106,7 @@ private DataSegmentTimelineView makeDataSegmentTimelineView() { return (dataSource, intervals) -> { final Collection dataSegments = - context.coordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(dataSource, intervals); + FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource, intervals), true); if (dataSegments.isEmpty()) { return Optional.empty(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index 18d60f6e4daa..c4a9d1ae401f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.exec; import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.guava.FutureUtils; @@ -28,7 +29,6 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.querykit.DataSegmentProvider; -import org.apache.druid.msq.rpc.CoordinatorServiceClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -52,13 +52,13 @@ */ public class TaskDataSegmentProvider implements DataSegmentProvider { - private final CoordinatorServiceClient coordinatorClient; + private final CoordinatorClient coordinatorClient; private final SegmentCacheManager segmentCacheManager; private final IndexIO indexIO; private final ConcurrentHashMap holders; public TaskDataSegmentProvider( - CoordinatorServiceClient coordinatorClient, + CoordinatorClient coordinatorClient, SegmentCacheManager segmentCacheManager, IndexIO indexIO ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQServiceClientModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQServiceClientModule.java deleted file mode 100644 index 436fa7bebbef..000000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQServiceClientModule.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.guice; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Binder; -import com.google.inject.Provides; -import org.apache.druid.client.coordinator.Coordinator; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; -import org.apache.druid.discovery.NodeRole; -import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.guice.annotations.EscalatedGlobal; -import org.apache.druid.guice.annotations.Json; -import org.apache.druid.initialization.DruidModule; -import org.apache.druid.msq.rpc.CoordinatorServiceClient; -import org.apache.druid.msq.rpc.CoordinatorServiceClientImpl; -import org.apache.druid.rpc.DiscoveryServiceLocator; -import org.apache.druid.rpc.ServiceClientFactory; -import org.apache.druid.rpc.ServiceLocator; -import org.apache.druid.rpc.StandardRetryPolicy; - -import java.util.Collections; -import java.util.List; - -/** - * Module for providing {@link CoordinatorServiceClient}. - */ -public class MSQServiceClientModule implements DruidModule -{ - private static final int COORDINATOR_ATTEMPTS = 6; - - @Override - public List getJacksonModules() - { - return Collections.emptyList(); - } - - @Override - public void configure(Binder binder) - { - // Nothing to do. - } - - @Provides - @ManageLifecycle - @Coordinator - public ServiceLocator makeCoordinatorServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider) - { - return new DiscoveryServiceLocator(discoveryProvider, NodeRole.COORDINATOR); - } - - @Provides - public CoordinatorServiceClient makeCoordinatorServiceClient( - @Json final ObjectMapper jsonMapper, - @EscalatedGlobal final ServiceClientFactory clientFactory, - @Coordinator final ServiceLocator serviceLocator - ) - { - return new CoordinatorServiceClientImpl( - clientFactory.makeClient( - NodeRole.COORDINATOR.getJsonName(), - serviceLocator, - StandardRetryPolicy.builder().maxAttempts(COORDINATOR_ATTEMPTS).build() - ), - jsonMapper - ); - } -} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 10855094bde2..43d067dd6c90 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -43,7 +43,6 @@ import org.apache.druid.msq.indexing.client.WorkerChatHandler; import org.apache.druid.msq.kernel.FrameContext; import org.apache.druid.msq.kernel.QueryDefinition; -import org.apache.druid.msq.rpc.CoordinatorServiceClient; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.ServiceLocations; import org.apache.druid.rpc.ServiceLocator; @@ -95,8 +94,6 @@ public IndexerWorkerContext( public static IndexerWorkerContext createProductionInstance(final TaskToolbox toolbox, final Injector injector) { final IndexIO indexIO = injector.getInstance(IndexIO.class); - final CoordinatorServiceClient coordinatorServiceClient = - injector.getInstance(CoordinatorServiceClient.class).withRetryPolicy(StandardRetryPolicy.unlimited()); final SegmentCacheManager segmentCacheManager = injector.getInstance(SegmentCacheManagerFactory.class) .manufacturate(new File(toolbox.getIndexingTmpDir(), "segment-fetch")); @@ -107,7 +104,7 @@ public static IndexerWorkerContext createProductionInstance(final TaskToolbox to toolbox, injector, indexIO, - new TaskDataSegmentProvider(coordinatorServiceClient, segmentCacheManager, indexIO), + new TaskDataSegmentProvider(toolbox.getCoordinatorClient(), segmentCacheManager, indexIO), serviceClientFactory ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/CoordinatorServiceClientImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/CoordinatorServiceClientImpl.java deleted file mode 100644 index b8281578de63..000000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/CoordinatorServiceClientImpl.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.rpc; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; -import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; -import org.apache.druid.rpc.RequestBuilder; -import org.apache.druid.rpc.ServiceClient; -import org.apache.druid.rpc.ServiceRetryPolicy; -import org.apache.druid.timeline.DataSegment; -import org.jboss.netty.handler.codec.http.HttpMethod; - -import java.io.IOException; - -/** - * Production implementation of {@link CoordinatorServiceClient}. - */ -public class CoordinatorServiceClientImpl implements CoordinatorServiceClient -{ - private final ServiceClient client; - private final ObjectMapper jsonMapper; - - public CoordinatorServiceClientImpl(final ServiceClient client, final ObjectMapper jsonMapper) - { - this.client = Preconditions.checkNotNull(client, "client"); - this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); - } - - @Override - public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) - { - final String path = StringUtils.format( - "/druid/coordinator/v1/metadata/datasources/%s/segments/%s", - StringUtils.urlEncode(dataSource), - StringUtils.urlEncode(segmentId) - ); - - return FutureUtils.transform( - client.asyncRequest( - new RequestBuilder(HttpMethod.GET, path), - new BytesFullResponseHandler() - ), - holder -> deserialize(holder, new TypeReference() {}) - ); - } - - @Override - public CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy retryPolicy) - { - return new CoordinatorServiceClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper); - } - - /** - * Deserialize a {@link BytesFullResponseHolder} as JSON. - * - * It would be reasonable to move this to {@link BytesFullResponseHolder} itself, or some shared utility class. - */ - private T deserialize(final BytesFullResponseHolder bytesHolder, final TypeReference typeReference) - { - try { - return jsonMapper.readValue(bytesHolder.getContent(), typeReference); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } -} diff --git a/extensions-core/multi-stage-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/multi-stage-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index 647356f9da6f..cabd131fb758 100644 --- a/extensions-core/multi-stage-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/extensions-core/multi-stage-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -16,6 +16,5 @@ org.apache.druid.msq.guice.MSQExternalDataSourceModule org.apache.druid.msq.guice.MSQIndexingModule org.apache.druid.msq.guice.MSQDurableStorageModule -org.apache.druid.msq.guice.MSQServiceClientModule org.apache.druid.msq.guice.MSQSqlModule org.apache.druid.msq.guice.SqlTaskModule diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java index b1a97b3da005..b6dd751e7dd8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; @@ -41,8 +42,6 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.msq.rpc.CoordinatorServiceClient; -import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Metadata; @@ -149,7 +148,7 @@ public void setUp() throws Exception ); provider = new TaskDataSegmentProvider( - new TestCoordinatorServiceClientImpl(), + new TestCoordinatorClientImpl(), cacheManager, indexIO ); @@ -229,7 +228,7 @@ public void testConcurrency() Assert.assertArrayEquals(new String[]{}, cacheDir.list()); } - private class TestCoordinatorServiceClientImpl implements CoordinatorServiceClient + private class TestCoordinatorClientImpl extends NoopCoordinatorClient { @Override public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) @@ -242,12 +241,6 @@ public ListenableFuture fetchUsedSegment(String dataSource, String return Futures.immediateFailedFuture(new ISE("No such segment[%s] for dataSource[%s]", segmentId, dataSource)); } - - @Override - public CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy retryPolicy) - { - return this; - } } @JsonTypeName("test") diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 58ee01f00872..7c37723ecc80 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -98,16 +98,18 @@ public MSQTestControllerContext( this.injector = injector; this.taskActionClient = taskActionClient; coordinatorClient = Mockito.mock(CoordinatorClient.class); - Mockito.when(coordinatorClient.fetchUsedSegmentsInDataSourceForIntervals( + Mockito.when(coordinatorClient.fetchUsedSegments( ArgumentMatchers.anyString(), ArgumentMatchers.anyList() ) ).thenAnswer(invocation -> - (injector.getInstance(SpecificSegmentsQuerySegmentWalker.class) - .getSegments() - .stream() - .filter(dataSegment -> dataSegment.getDataSource().equals(invocation.getArguments()[0])) - .collect(Collectors.toList()) + Futures.immediateFuture( + injector.getInstance(SpecificSegmentsQuerySegmentWalker.class) + .getSegments() + .stream() + .filter(dataSegment -> dataSegment.getDataSource() + .equals(invocation.getArguments()[0])) + .collect(Collectors.toList()) ) ); this.workerMemoryParameters = workerMemoryParameters; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index d95e5453ffc5..288d89919b98 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9Factory; @@ -245,8 +246,11 @@ public TaskToolbox build(TaskConfig config, Task task) .chatHandlerProvider(chatHandlerProvider) .rowIngestionMetersFactory(rowIngestionMetersFactory) .appenderatorsManager(appenderatorsManager) - .overlordClient(overlordClient) - .coordinatorClient(coordinatorClient) + // Most tasks are written in such a way that if an Overlord or Coordinator RPC fails, the task fails. + // Set the retry policy to "about an hour", so tasks are resilient to brief Coordinator/Overlord problems. + // Calls will still eventually fail if problems persist. + .overlordClient(overlordClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour())) + .coordinatorClient(coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour())) .supervisorTaskClientProvider(supervisorTaskClientProvider) .shuffleClient(shuffleClient) .taskLogPusher(taskLogPusher) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 5b3ea95ca4d5..82bc43eb020d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -166,9 +166,6 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonIgnore private final SegmentCacheManagerFactory segmentCacheManagerFactory; - @JsonIgnore - private final RetryPolicyFactory retryPolicyFactory; - @JsonIgnore private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( (taskObject, config) -> { @@ -193,8 +190,7 @@ public CompactionTask( @JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map context, - @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, - @JacksonInject RetryPolicyFactory retryPolicyFactory + @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory ) { super( @@ -248,7 +244,6 @@ public CompactionTask( this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.segmentCacheManagerFactory = segmentCacheManagerFactory; - this.retryPolicyFactory = retryPolicyFactory; } @VisibleForTesting @@ -470,9 +465,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception metricsSpec, granularitySpec, toolbox.getCoordinatorClient(), - segmentCacheManagerFactory, - retryPolicyFactory, - ioConfig.isDropExisting() + segmentCacheManagerFactory ); final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) @@ -579,9 +572,7 @@ static List createIngestionSchema( @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, final CoordinatorClient coordinatorClient, - final SegmentCacheManagerFactory segmentCacheManagerFactory, - final RetryPolicyFactory retryPolicyFactory, - final boolean dropExisting + final SegmentCacheManagerFactory segmentCacheManagerFactory ) throws IOException { final List> timelineSegments = retrieveRelevantTimelineHolders( @@ -657,7 +648,6 @@ static List createIngestionSchema( interval, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, ioConfig ), compactionTuningConfig @@ -696,7 +686,6 @@ static List createIngestionSchema( segmentProvider.interval, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, ioConfig ), compactionTuningConfig @@ -711,7 +700,6 @@ private static ParallelIndexIOConfig createIoConfig( Interval interval, CoordinatorClient coordinatorClient, SegmentCacheManagerFactory segmentCacheManagerFactory, - RetryPolicyFactory retryPolicyFactory, CompactionIOConfig compactionIOConfig ) { @@ -744,7 +732,6 @@ private static ParallelIndexIOConfig createIoConfig( toolbox.getIndexIO(), coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, toolbox.getConfig() ), null, @@ -1322,8 +1309,7 @@ public CompactionTask build() granularitySpec, tuningConfig, context, - segmentCacheManagerFactory, - retryPolicyFactory + segmentCacheManagerFactory ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 4b65b31ae7d4..216b343babb9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFileAttribute; @@ -44,8 +45,6 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.indexing.common.RetryPolicy; -import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; @@ -63,7 +62,6 @@ import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.utils.Streams; -import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nonnull; @@ -81,7 +79,6 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Stream; /** @@ -138,7 +135,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; private final SegmentCacheManagerFactory segmentCacheManagerFactory; - private final RetryPolicyFactory retryPolicyFactory; private final TaskConfig taskConfig; /** @@ -164,7 +160,6 @@ public DruidInputSource( @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, - @JacksonInject RetryPolicyFactory retryPolicyFactory, @JacksonInject TaskConfig taskConfig ) { @@ -181,7 +176,6 @@ public DruidInputSource( this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory"); - this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig"); } @@ -313,7 +307,7 @@ private List> createTimeline() if (interval == null) { return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); } else { - return getTimelineForInterval(coordinatorClient, retryPolicyFactory, dataSource, interval); + return getTimelineForInterval(coordinatorClient, dataSource, interval); } } @@ -329,7 +323,6 @@ public Stream>> createSplits( return Streams.sequentialStreamFrom( createSplits( coordinatorClient, - retryPolicyFactory, dataSource, interval, splitHintSpec == null ? SplittableInputSource.DEFAULT_SPLIT_HINT_SPEC : splitHintSpec @@ -349,7 +342,6 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp return Iterators.size( createSplits( coordinatorClient, - retryPolicyFactory, dataSource, interval, splitHintSpec == null ? SplittableInputSource.DEFAULT_SPLIT_HINT_SPEC : splitHintSpec @@ -373,7 +365,6 @@ public SplittableInputSource> withSplit(InputSplit>> createSplits( CoordinatorClient coordinatorClient, - RetryPolicyFactory retryPolicyFactory, String dataSource, Interval interval, SplitHintSpec splitHintSpec @@ -442,7 +432,6 @@ public static Iterator>> createSplits( final List> timelineSegments = getTimelineForInterval( coordinatorClient, - retryPolicyFactory, dataSource, interval ); @@ -486,42 +475,17 @@ private static SortedMap createWindowedSegmentIdFromTim public static List> getTimelineForInterval( CoordinatorClient coordinatorClient, - RetryPolicyFactory retryPolicyFactory, String dataSource, Interval interval ) { - Preconditions.checkNotNull(interval); - - // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration - // as TaskActionClient. - final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); - Collection usedSegments; - while (true) { - try { - usedSegments = coordinatorClient.fetchUsedSegmentsInDataSourceForIntervals( + final Collection usedSegments = FutureUtils.getUnchecked( + coordinatorClient.fetchUsedSegments( dataSource, - Collections.singletonList(interval) - ); - break; - } - catch (Throwable e) { - LOG.warn(e, "Exception getting database segments"); - final Duration delay = retryPolicy.getAndIncrementRetryDelay(); - if (delay == null) { - throw e; - } else { - final long sleepTime = jitter(delay.getMillis()); - LOG.info("Will try again in [%s].", new Duration(sleepTime).toString()); - try { - Thread.sleep(sleepTime); - } - catch (InterruptedException e2) { - throw new RuntimeException(e2); - } - } - } - } + Collections.singletonList(Preconditions.checkNotNull(interval, "interval")) + ), + true + ); return SegmentTimeline.forSegments(usedSegments).lookup(interval); } @@ -536,9 +500,9 @@ public static List> getTimelineForSegm Comparators.intervalsByStartThenEnd() ); for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) { - final DataSegment segment = coordinatorClient.fetchUsedSegment( - dataSource, - windowedSegmentId.getSegmentId() + final DataSegment segment = FutureUtils.getUnchecked( + coordinatorClient.fetchUsedSegment(dataSource, windowedSegmentId.getSegmentId()), + true ); for (Interval interval : windowedSegmentId.getIntervals()) { final TimelineObjectHolder existingHolder = timeline.get(interval); @@ -578,11 +542,4 @@ public static List> getTimelineForSegm return new ArrayList<>(timeline.values()); } - - private static long jitter(long input) - { - final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; - long retval = input + (long) jitter; - return retval < 0 ? 0 : retval; - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index 67dec0488bc4..aaeee9fcd543 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -23,6 +23,7 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; @@ -146,7 +147,7 @@ public void setUp() throws IOException new DropwizardRowIngestionMetersFactory(), new TestAppenderatorsManager(), new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 548d5b6a05b7..7ea82b8f57b0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -31,6 +31,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Firehose; @@ -1639,7 +1640,7 @@ public void close() testUtils.getRowIngestionMetersFactory(), new TestAppenderatorsManager(), new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index e1dff0e42989..5c58818529bf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; @@ -73,9 +74,9 @@ public class ClientCompactionTaskQuerySerdeTest { - private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils() - .getRowIngestionMetersFactory(); - private static final CoordinatorClient COORDINATOR_CLIENT = new CoordinatorClient(null, null); + private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = + new TestUtils().getRowIngestionMetersFactory(); + private static final CoordinatorClient COORDINATOR_CLIENT = new NoopCoordinatorClient(); private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index fc55114519a3..677b067f5cfe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -749,7 +749,7 @@ public void testCompactRangeAndDynamicPartitionedSegments() } @Test - public void testDruidInputSourceCreateSplitsWithIndividualSplits() + public void testDruidInputSourceCreateSplitsWithIndividualSplits() throws Exception { allowSegmentFetchesByCompactionTask = true; runIndexTask(null, true); @@ -757,7 +757,6 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits() List>> splits = Lists.newArrayList( DruidInputSource.createSplits( getCoordinatorClient(), - RETRY_POLICY_FACTORY, DATA_SOURCE, INTERVAL_TO_INDEX, new SegmentsSplitHintSpec(null, 1) @@ -765,10 +764,10 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits() ); List segments = new ArrayList<>( - getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals( + getCoordinatorClient().fetchUsedSegments( DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX) - ) + ).get() ); Set segmentIdsFromSplits = new HashSet<>(); @@ -782,15 +781,15 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits() } @Test - public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() + public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() throws Exception { allowSegmentFetchesByCompactionTask = true; runIndexTask(null, true); - Collection usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals( + Collection usedSegments = getCoordinatorClient().fetchUsedSegments( DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX) - ); + ).get(); Assert.assertEquals(3, usedSegments.size()); for (DataSegment segment : usedSegments) { Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval())); @@ -810,10 +809,10 @@ public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() final Set compactedSegments = runTask(compactionTask); - usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals( + usedSegments = getCoordinatorClient().fetchUsedSegments( DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX) - ); + ).get(); // All the HOUR segments got covered by tombstones even if we do not have all MINUTES segments fully covering the 3 HOURS interval. // In fact, we only have 3 minutes of data out of the 3 hours interval. Assert.assertEquals(180, usedSegments.size()); @@ -828,15 +827,15 @@ public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() } @Test - public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() + public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() throws Exception { allowSegmentFetchesByCompactionTask = true; runIndexTask(null, true); - Collection usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals( + Collection usedSegments = getCoordinatorClient().fetchUsedSegments( DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX) - ); + ).get(); Assert.assertEquals(3, usedSegments.size()); for (DataSegment segment : usedSegments) { Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval())); @@ -855,10 +854,10 @@ public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() final Set compactedSegments = runTask(compactionTask); - usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals( + usedSegments = getCoordinatorClient().fetchUsedSegments( DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX) - ); + ).get(); // All the HOUR segments did not get dropped since MINUTES segments did not fully covering the 3 HOURS interval. Assert.assertEquals(6, usedSegments.size()); int hourSegmentCount = 0; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 9a345f122348..275e76706612 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -25,7 +25,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.io.Files; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.NoopOverlordClient; @@ -111,7 +114,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -178,15 +180,19 @@ public CompactionTaskRunTest(LockGranularity lockGranularity) { testUtils = new TestUtils(); overlordClient = new NoopOverlordClient(); - coordinatorClient = new CoordinatorClient(null, null) + coordinatorClient = new NoopCoordinatorClient() { @Override - public Collection fetchUsedSegmentsInDataSourceForIntervals( + public ListenableFuture> fetchUsedSegments( String dataSource, List intervals ) { - return getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); + return Futures.immediateFuture( + ImmutableList.copyOf( + getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE) + ) + ); } }; segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 2f6d53e1a9a2..76ea03a8176e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -36,7 +36,10 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.common.guava.SettableSupplier; @@ -148,7 +151,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -939,9 +941,7 @@ public void testCreateIngestionSchema() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1014,9 +1014,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1090,9 +1088,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1166,9 +1162,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1232,9 +1226,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); ingestionSpecs.sort( @@ -1278,9 +1270,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException customMetricsSpec, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1317,9 +1307,7 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1362,9 +1350,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); } @@ -1388,9 +1374,7 @@ public void testMissingMetadata() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); } @@ -1426,9 +1410,7 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1466,9 +1448,7 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1508,9 +1488,7 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio null ), COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1548,9 +1526,7 @@ public void testNullGranularitySpec() throws IOException null, null, COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1587,9 +1563,7 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity null, new ClientCompactionTaskGranularitySpec(null, null, null), COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1626,9 +1600,7 @@ public void testGranularitySpecWithNotNullRollup() null, new ClientCompactionTaskGranularitySpec(null, null, true), COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); Assert.assertEquals(6, ingestionSpecs.size()); @@ -1652,9 +1624,7 @@ public void testGranularitySpecWithNullRollup() null, new ClientCompactionTaskGranularitySpec(null, null, null), COORDINATOR_CLIENT, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, - BatchIOConfig.DEFAULT_DROP_EXISTING + segmentCacheManagerFactory ); Assert.assertEquals(6, ingestionSpecs.size()); for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) { @@ -1888,23 +1858,22 @@ private void assertIngestionSchema( } } - private static class TestCoordinatorClient extends CoordinatorClient + private static class TestCoordinatorClient extends NoopCoordinatorClient { private final Map segmentMap; TestCoordinatorClient(Map segmentMap) { - super(null, null); this.segmentMap = segmentMap; } @Override - public Collection fetchUsedSegmentsInDataSourceForIntervals( + public ListenableFuture> fetchUsedSegments( String dataSource, List intervals ) { - return ImmutableSet.copyOf(segmentMap.keySet()); + return Futures.immediateFuture(ImmutableList.copyOf(segmentMap.keySet())); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index cd9163fad9ad..9881561d61f1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -30,6 +30,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.FirehoseFactory; @@ -1011,7 +1012,7 @@ public void close() testUtils.getRowIngestionMetersFactory(), new TestAppenderatorsManager(), new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 963757ebdedc..94d286ec2b24 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.data.input.InputFormat; @@ -116,7 +117,6 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1012,34 +1012,30 @@ private ParallelIndexSupervisorTask findSupervisorTask(TaskContainer taskContain } } - class LocalCoordinatorClient extends CoordinatorClient + class LocalCoordinatorClient extends NoopCoordinatorClient { - private final ExecutorService exec; + private final ListeningExecutorService exec; LocalCoordinatorClient(ExecutorService exec) { - super(null, null); - this.exec = exec; + this.exec = MoreExecutors.listeningDecorator(exec); } @Override - public Collection fetchUsedSegmentsInDataSourceForIntervals( + public ListenableFuture> fetchUsedSegments( String dataSource, List intervals ) { - try { - return exec.submit( - () -> getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE) - ).get(); - } - catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + return exec.submit( + () -> ImmutableList.copyOf( + getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE) + ) + ); } @Override - public DataSegment fetchUsedSegment(String dataSource, String segmentId) + public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) { ImmutableDruidDataSource druidDataSource; try { @@ -1057,7 +1053,7 @@ public DataSegment fetchUsedSegment(String dataSource, String segmentId) for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) { DataSegment segment = druidDataSource.getSegment(possibleSegmentId); if (segment != null) { - return segment; + return Futures.immediateFuture(segment); } } throw new ISE("Can't find segment for id[%s]", segmentId); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java index 9f01ba85f010..6c6db80c1c72 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java @@ -99,7 +99,6 @@ public void testSerdeUsingIntervals() throws Exception indexIO, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, taskConfig ), inputSource @@ -133,7 +132,6 @@ public void testSerdeUsingIntervalsAndLegacyDimensionsMetrics() throws Exception indexIO, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, taskConfig ), inputSource @@ -173,7 +171,6 @@ public void testSerdeUsingSegments() throws Exception indexIO, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, taskConfig ), inputSource @@ -256,7 +253,6 @@ public void testReaderColumnsFilterWithMetricGiven() indexIO, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, taskConfig ); InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema); @@ -291,7 +287,6 @@ public void testReaderColumnsFilterWithNoMetricGiven() indexIO, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, taskConfig ); InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema); @@ -315,7 +310,6 @@ public void testGetTypes() indexIO, coordinatorClient, segmentCacheManagerFactory, - retryPolicyFactory, taskConfig ); Assert.assertEquals(ImmutableSet.of(DruidInputSource.TYPE_KEY), druidInputSource.getTypes()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index a185b583c29d..e683c828ddc6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -129,7 +130,7 @@ public void setup() throws IOException utils.getRowIngestionMetersFactory(), new TestAppenderatorsManager(), new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6b5927bf1791..9b9d6d362249 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -34,6 +34,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.Firehose; @@ -693,7 +694,7 @@ public void unannounceSegments(Iterable segments) TEST_UTILS.getRowIngestionMetersFactory(), appenderatorsManager, new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java index 77491f7d4bac..5c6afdbb61b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java @@ -25,6 +25,8 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; +import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; @@ -150,8 +152,8 @@ public static class Builder private ChatHandlerProvider chatHandlerProvider; private RowIngestionMetersFactory rowIngestionMetersFactory; private AppenderatorsManager appenderatorsManager; - private OverlordClient overlordClient; - private CoordinatorClient coordinatorClient; + private OverlordClient overlordClient = new NoopOverlordClient(); + private CoordinatorClient coordinatorClient = new NoopCoordinatorClient(); private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider; private ShuffleClient shuffleClient; private TaskLogPusher taskLogPusher; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 79bb286f7ad8..6be23407a418 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -34,6 +34,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputFormat; @@ -694,7 +695,7 @@ public void close() testUtils.getRowIngestionMetersFactory(), new TestAppenderatorsManager(), new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 2e2a0ee5fdc6..d36bf1c04bab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -99,7 +100,7 @@ public WorkerTaskManagerTest(boolean restoreTasksOnRestart) this.restoreTasksOnRestart = restoreTasksOnRestart; } - @Parameterized.Parameters(name = "restoreTasksOnRestart = {0}, useMultipleBaseTaskDirPaths = {1}") + @Parameterized.Parameters(name = "restoreTasksOnRestart = {0}") public static Collection getParameters() { Object[][] parameters = new Object[][]{{false}, {true}}; @@ -160,7 +161,7 @@ private WorkerTaskManager createWorkerTaskManager() testUtils.getRowIngestionMetersFactory(), new TestAppenderatorsManager(), overlordClient, - null, + new NoopCoordinatorClient(), null, null, null, @@ -200,6 +201,8 @@ public void tearDown() throws Exception @Test(timeout = 60_000L) public void testTaskRun() throws Exception { + EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes(); + EasyMock.replay(overlordClient); Task task1 = createNoopTask("task1-assigned-via-assign-dir"); Task task2 = createNoopTask("task2-completed-already"); Task task3 = createNoopTask("task3-assigned-explicitly"); @@ -450,6 +453,9 @@ private NoopTask createNoopTask(String id) */ private Task setUpCompletedTasksCleanupTest() throws Exception { + EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes(); + EasyMock.replay(overlordClient); + final Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); // Scheduled scheduleCompletedTasksCleanup will not run, because initialDelay is 1 minute, which is longer than @@ -468,6 +474,7 @@ private Task setUpCompletedTasksCleanupTest() throws Exception Assert.assertNotNull(announcement); Assert.assertEquals(TaskState.SUCCESS, announcement.getStatus()); + EasyMock.reset(overlordClient); return task; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 5cedfb4312a1..aecfe29ab1fa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.indexer.TaskState; @@ -202,7 +203,7 @@ private WorkerTaskMonitor createTaskMonitor() testUtils.getRowIngestionMetersFactory(), new TestAppenderatorsManager(), new NoopOverlordClient(), - null, + new NoopCoordinatorClient(), null, null, null, diff --git a/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/CliCustomNodeRole.java b/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/CliCustomNodeRole.java index 97e5e70e2227..087cb24bec5d 100644 --- a/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/CliCustomNodeRole.java +++ b/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/CliCustomNodeRole.java @@ -30,7 +30,6 @@ import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; import org.apache.druid.cli.ServerRunnable; -import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.LazySingleton; @@ -90,8 +89,6 @@ protected List getModules() binder.bindConstant().annotatedWith(Names.named("servicePort")).to(CliCustomNodeRole.PORT); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(CliCustomNodeRole.TLS_PORT); - binder.bind(CoordinatorClient.class).in(LazySingleton.class); - binder.bind(JettyServerInitializer.class).to(CustomJettyServiceInitializer.class).in(LazySingleton.class); LifecycleModule.register(binder, Server.class); diff --git a/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java b/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java index 5542bbea95e0..e0e1605b3026 100644 --- a/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java +++ b/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java @@ -29,7 +29,6 @@ import com.google.inject.Module; import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; -import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.LazySingleton; @@ -89,8 +88,6 @@ protected List getModules() binder.bindConstant().annotatedWith(Names.named("servicePort")).to(CliCustomNodeRole.PORT); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(CliCustomNodeRole.TLS_PORT); - binder.bind(CoordinatorClient.class).in(LazySingleton.class); - binder.bind(JettyServerInitializer.class).to(CustomJettyServiceInitializer.class).in(LazySingleton.class); LifecycleModule.register(binder, Server.class); diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 029663ae8215..c497dcb68940 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -19,176 +19,33 @@ package org.apache.druid.client.coordinator; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import org.apache.druid.client.ImmutableSegmentLoadInfo; -import org.apache.druid.discovery.DruidLeaderClient; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.timeline.DataSegment; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; -import javax.annotation.Nullable; -import javax.ws.rs.core.MediaType; -import java.util.Collection; import java.util.List; -public class CoordinatorClient +public interface CoordinatorClient { - private final DruidLeaderClient druidLeaderClient; - private final ObjectMapper jsonMapper; - - @Inject - public CoordinatorClient( - ObjectMapper jsonMapper, - @Coordinator DruidLeaderClient druidLeaderClient - ) - { - this.jsonMapper = jsonMapper; - this.druidLeaderClient = druidLeaderClient; - } - /** - * Checks the given segment is handed off or not. - * It can return null if the HTTP call returns 404 which can happen during rolling update. + * Checks if the given segment is handed off or not. */ - @Nullable - public Boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor) - { - try { - StringFullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest( - HttpMethod.GET, - StringUtils.format( - "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s", - StringUtils.urlEncode(dataSource), - descriptor.getInterval(), - descriptor.getPartitionNumber(), - descriptor.getVersion() - ) - ) - ); - - if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { - return null; - } - - if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE( - "Error while fetching serverView status[%s] content[%s]", - response.getStatus(), - response.getContent() - ); - } - return jsonMapper.readValue(response.getContent(), new TypeReference() - { - }); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - public List fetchServerView(String dataSource, Interval interval, boolean incompleteOk) - { - try { - StringFullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest( - HttpMethod.GET, - StringUtils.format( - "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", - StringUtils.urlEncode(dataSource), - interval.toString().replace('/', '_'), - incompleteOk - ) - ) - ); - - if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE( - "Error while fetching serverView status[%s] content[%s]", - response.getStatus(), - response.getContent() - ); - } - return jsonMapper.readValue( - response.getContent(), new TypeReference>() - { - } - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - public Collection fetchUsedSegmentsInDataSourceForIntervals(String dataSource, List intervals) - { - try { - StringFullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest( - HttpMethod.POST, - StringUtils.format( - "/druid/coordinator/v1/metadata/datasources/%s/segments?full", - StringUtils.urlEncode(dataSource) - ) - ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(intervals)) - ); + ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor); - if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE( - "Error while fetching used segments in a data source for intervals: status[%s] content[%s]", - response.getStatus(), - response.getContent() - ); - } - return jsonMapper.readValue( - response.getContent(), new TypeReference>() - { - } - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } + /** + * Fetches segment metadata for the given dataSource and segmentId. + */ + ListenableFuture fetchUsedSegment(String dataSource, String segmentId); - public DataSegment fetchUsedSegment(String dataSource, String segmentId) - { - try { - StringFullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest( - HttpMethod.GET, - StringUtils.format( - "/druid/coordinator/v1/metadata/datasources/%s/segments/%s", - StringUtils.urlEncode(dataSource), - StringUtils.urlEncode(segmentId) - ) - ) - ); + /** + * Fetches segment metadata for the given dataSource and intervals. + */ + ListenableFuture> fetchUsedSegments(String dataSource, List intervals); - if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE( - "Error while fetching database segment[%s] in dataSource[%s] with status[%s] content[%s]", - segmentId, - dataSource, - response.getStatus(), - response.getContent() - ); - } - return jsonMapper.readValue( - response.getContent(), new TypeReference() - { - } - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } + /** + * Returns a new instance backed by a ServiceClient which follows the provided retryPolicy + */ + CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy); } diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java new file mode 100644 index 000000000000..e023d9827bd4 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.coordinator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceRetryPolicy; +import org.apache.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Interval; + +import java.util.List; + +public class CoordinatorClientImpl implements CoordinatorClient +{ + private final ServiceClient client; + private final ObjectMapper jsonMapper; + + public CoordinatorClientImpl( + final ServiceClient client, + final ObjectMapper jsonMapper + ) + { + this.client = client; + this.jsonMapper = jsonMapper; + } + + @Override + public ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor) + { + final String path = StringUtils.format( + "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s", + StringUtils.urlEncode(dataSource), + StringUtils.urlEncode(descriptor.getInterval().toString()), + descriptor.getPartitionNumber(), + StringUtils.urlEncode(descriptor.getVersion()) + ); + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, path), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), Boolean.class) + ); + } + + @Override + public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + { + final String path = StringUtils.format( + "/druid/coordinator/v1/metadata/datasources/%s/segments/%s", + StringUtils.urlEncode(dataSource), + StringUtils.urlEncode(segmentId) + ); + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, path), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), DataSegment.class) + ); + } + + @Override + public ListenableFuture> fetchUsedSegments(String dataSource, List intervals) + { + final String path = StringUtils.format( + "/druid/coordinator/v1/metadata/datasources/%s/segments?full", + StringUtils.urlEncode(dataSource) + ); + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, path) + .jsonContent(jsonMapper, intervals), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference>() {}) + ); + } + + @Override + public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy) + { + return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper); + } +} diff --git a/server/src/main/java/org/apache/druid/rpc/StandardRetryPolicy.java b/server/src/main/java/org/apache/druid/rpc/StandardRetryPolicy.java index 787bcae1e694..ceb2a41c8076 100644 --- a/server/src/main/java/org/apache/druid/rpc/StandardRetryPolicy.java +++ b/server/src/main/java/org/apache/druid/rpc/StandardRetryPolicy.java @@ -37,7 +37,15 @@ public class StandardRetryPolicy implements ServiceRetryPolicy private static final long DEFAULT_MIN_WAIT_MS = 100; private static final long DEFAULT_MAX_WAIT_MS = 30_000; + /** + * Number of attempts that leads to about an hour of total waiting, assuming wait time is determined + * by the function in {@link ServiceClientImpl#computeBackoffMs(ServiceRetryPolicy, long)}. + */ + private static final int MAX_ATTEMPTS_ABOUT_AN_HOUR = 125; + private static final StandardRetryPolicy DEFAULT_UNLIMITED_POLICY = new Builder().maxAttempts(UNLIMITED).build(); + private static final StandardRetryPolicy DEFAULT_ABOUT_AN_HOUR_POLICY = + new Builder().maxAttempts(MAX_ATTEMPTS_ABOUT_AN_HOUR).build(); private static final StandardRetryPolicy DEFAULT_NO_RETRIES_POLICY = new Builder().maxAttempts(1).build(); private final long maxAttempts; @@ -62,11 +70,27 @@ public static Builder builder() return new Builder(); } + /** + * Standard unlimited retry policy. Never stops retrying as long as errors remain retryable. + * See {@link ServiceClient} documentation for details on what errors are retryable. + */ public static StandardRetryPolicy unlimited() { return DEFAULT_UNLIMITED_POLICY; } + /** + * Retry policy that uses up to about an hour of total wait time. Note that this is just the total waiting time + * between attempts. It does not include the time that each attempt takes to execute. + */ + public static StandardRetryPolicy aboutAnHour() + { + return DEFAULT_ABOUT_AN_HOUR_POLICY; + } + + /** + * Retry policy that never retries. + */ public static StandardRetryPolicy noRetries() { return DEFAULT_NO_RETRIES_POLICY; diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java index 7ad9f26526ee..51dd2b89d736 100644 --- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java +++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Binder; import com.google.inject.Provides; +import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.CoordinatorClientImpl; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; @@ -45,7 +48,7 @@ public class ServiceClientModule implements DruidModule { private static final int CONNECT_EXEC_THREADS = 4; - private static final int OVERLORD_ATTEMPTS = 6; + private static final int CLIENT_MAX_ATTEMPTS = 6; @Override public void configure(Binder binder) @@ -72,6 +75,7 @@ public ServiceLocator makeOverlordServiceLocator(final DruidNodeDiscoveryProvide } @Provides + @LazySingleton public OverlordClient makeOverlordClient( @Json final ObjectMapper jsonMapper, @EscalatedGlobal final ServiceClientFactory clientFactory, @@ -82,7 +86,33 @@ public OverlordClient makeOverlordClient( clientFactory.makeClient( NodeRole.OVERLORD.getJsonName(), serviceLocator, - StandardRetryPolicy.builder().maxAttempts(OVERLORD_ATTEMPTS).build() + StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build() + ), + jsonMapper + ); + } + + @Provides + @ManageLifecycle + @Coordinator + public ServiceLocator makeCoordinatorServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider) + { + return new DiscoveryServiceLocator(discoveryProvider, NodeRole.COORDINATOR); + } + + @Provides + @LazySingleton + public CoordinatorClient makeCoordinatorClient( + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Coordinator final ServiceLocator serviceLocator + ) + { + return new CoordinatorClientImpl( + clientFactory.makeClient( + NodeRole.COORDINATOR.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build() ), jsonMapper ); diff --git a/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java index 4e4b4debc9c6..bac33b6bb1bd 100644 --- a/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java @@ -19,17 +19,15 @@ package org.apache.druid.segment.handoff; -import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.server.coordination.DruidServerMetadata; import org.joda.time.Duration; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -74,40 +72,26 @@ public void start() { scheduledExecutor = Execs.scheduledSingleThreaded("coordinator_handoff_scheduled_%d"); scheduledExecutor.scheduleAtFixedRate( - new Runnable() - { - @Override - public void run() - { - checkForSegmentHandoffs(); - } - }, 0L, pollDuration.getMillis(), TimeUnit.MILLISECONDS + this::checkForSegmentHandoffs, + 0L, + pollDuration.getMillis(), + TimeUnit.MILLISECONDS ); } void checkForSegmentHandoffs() { try { - Iterator>> itr = handOffCallbacks.entrySet() - .iterator(); + Iterator>> itr = handOffCallbacks.entrySet().iterator(); + while (itr.hasNext()) { Map.Entry> entry = itr.next(); SegmentDescriptor descriptor = entry.getKey(); try { - Boolean handOffComplete = coordinatorClient.isHandOffComplete(dataSource, descriptor); - if (handOffComplete == null) { - log.warn( - "Failed to call the new coordinator API for checking segment handoff. Falling back to the old API" - ); - final List loadedSegments = coordinatorClient.fetchServerView( - dataSource, - descriptor.getInterval(), - true - ); - handOffComplete = isHandOffComplete(loadedSegments, descriptor); - } - if (handOffComplete) { - log.debug("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor); + Boolean handOffComplete = + FutureUtils.getUnchecked(coordinatorClient.isHandoffComplete(dataSource, descriptor), true); + if (Boolean.TRUE.equals(handOffComplete)) { + log.debug("Segment handoff complete for dataSource[%s] segment[%s]", dataSource, descriptor); entry.getValue().lhs.execute(entry.getValue().rhs); itr.remove(); } @@ -123,7 +107,7 @@ void checkForSegmentHandoffs() } } if (!handOffCallbacks.isEmpty()) { - log.warn("Still waiting for Handoff for [%d] Segments", handOffCallbacks.size()); + log.info("Still waiting for handoff for [%d] segments", handOffCallbacks.size()); } } catch (Throwable t) { @@ -136,20 +120,6 @@ void checkForSegmentHandoffs() } } - static boolean isHandOffComplete(List serverView, SegmentDescriptor descriptor) - { - for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) { - if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) - && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() - == descriptor.getPartitionNumber() - && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 - && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::isSegmentReplicationOrBroadcastTarget)) { - return true; - } - } - return false; - } - @Override public void close() { diff --git a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java new file mode 100644 index 000000000000..b7419485b089 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.coordinator; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.rpc.MockServiceClient; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import java.util.Collections; +import java.util.List; + +public class CoordinatorClientImplTest +{ + private ObjectMapper jsonMapper; + private MockServiceClient serviceClient; + private CoordinatorClient coordinatorClient; + + @Before + public void setup() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues( + new InjectableValues.Std(ImmutableMap.of( + DataSegment.PruneSpecsHolder.class.getName(), + DataSegment.PruneSpecsHolder.DEFAULT))); + serviceClient = new MockServiceClient(); + coordinatorClient = new CoordinatorClientImpl(serviceClient, jsonMapper); + } + + @After + public void tearDown() + { + serviceClient.verify(); + } + + @Test + public void test_isHandoffComplete() throws Exception + { + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/datasources/xyz/handoffComplete?" + + "interval=2000-01-01T00%3A00%3A00.000Z%2F3000-01-01T00%3A00%3A00.000Z&" + + "partitionNumber=2&" + + "version=1" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + StringUtils.toUtf8("true") + ); + + Assert.assertEquals( + true, + coordinatorClient.isHandoffComplete( + "xyz", + new SegmentDescriptor(Intervals.of("2000/3000"), "1", 2) + ).get() + ); + } + + @Test + public void test_fetchUsedSegment() throws Exception + { + final DataSegment segment = + DataSegment.builder() + .dataSource("xyz") + .interval(Intervals.of("2000/3000")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 1)) + .size(1) + .build(); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segment) + ); + + Assert.assertEquals( + segment, + coordinatorClient.fetchUsedSegment("xyz", "def").get() + ); + } + + @Test + public void test_fetchUsedSegments() throws Exception + { + final List intervals = Collections.singletonList(Intervals.of("2000/3000")); + final DataSegment segment = + DataSegment.builder() + .dataSource("xyz") + .interval(intervals.get(0)) + .version("1") + .shardSpec(new NumberedShardSpec(0, 1)) + .size(1) + .build(); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/datasources/xyz/segments?full") + .jsonContent(jsonMapper, intervals), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(Collections.singletonList(segment)) + ); + + Assert.assertEquals( + Collections.singletonList(segment), + coordinatorClient.fetchUsedSegments("xyz", intervals).get() + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/CoordinatorServiceClient.java b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java similarity index 52% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/CoordinatorServiceClient.java rename to server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java index 1278a52ae08d..bcaab5c255fb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/CoordinatorServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java @@ -17,24 +17,40 @@ * under the License. */ -package org.apache.druid.msq.rpc; +package org.apache.druid.client.coordinator; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; -/** - * Interface for {@link org.apache.druid.rpc.ServiceClient}-backed communication with the Coordinator. - */ -public interface CoordinatorServiceClient +import java.util.List; + +public class NoopCoordinatorClient implements CoordinatorClient { - /** - * Fetches segment metadata for the given dataSource and segmentId from the Coordinator - */ - ListenableFuture fetchUsedSegment(String dataSource, String segmentId); + @Override + public ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture> fetchUsedSegments(String dataSource, List intervals) + { + throw new UnsupportedOperationException(); + } - /** - * Returns a new CoordinatorServiceClient backed by a ServiceClient which follows the provided retryPolicy - */ - CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy retryPolicy); + @Override + public CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy) + { + // Ignore retryPolicy for the test client. + return this; + } } diff --git a/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java index 31933aa4a824..fde4e13db387 100644 --- a/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java +++ b/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -19,23 +19,17 @@ package org.apache.druid.segment.handoff; -import com.google.common.collect.Sets; -import org.apache.druid.client.ImmutableSegmentLoadInfo; +import com.google.common.util.concurrent.Futures; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; public class CoordinatorBasedSegmentHandoffNotifierTest @@ -57,8 +51,8 @@ public void testHandoffCallbackNotCalled() SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2); CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); - EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor)) - .andReturn(false) + EasyMock.expect(coordinatorClient.isHandoffComplete("test_ds", descriptor)) + .andReturn(Futures.immediateFuture(false)) .anyTimes(); EasyMock.replay(coordinatorClient); CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( @@ -88,8 +82,8 @@ public void testHandoffCallbackCalled() final AtomicBoolean callbackCalled = new AtomicBoolean(false); CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); - EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor)) - .andReturn(true) + EasyMock.expect(coordinatorClient.isHandoffComplete("test_ds", descriptor)) + .andReturn(Futures.immediateFuture(true)) .anyTimes(); EasyMock.replay(coordinatorClient); CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( @@ -111,177 +105,4 @@ public void testHandoffCallbackCalled() Assert.assertTrue(callbackCalled.get()); EasyMock.verify(coordinatorClient); } - - @Test - public void testHandoffChecksForVersion() - { - Interval interval = Intervals.of( - "2011-04-01/2011-04-02" - ); - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v2", 2) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v2", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - } - - @Test - public void testHandoffChecksForAssignableServer() - { - Interval interval = Intervals.of( - "2011-04-01/2011-04-02" - ); - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createRealtimeServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - } - - @Test - public void testHandoffChecksForPartitionNumber() - { - Interval interval = Intervals.of( - "2011-04-01/2011-04-02" - ); - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 1) - ) - ); - - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - } - - @Test - public void testHandoffChecksForInterval() - { - - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1) - ) - ); - } - - private DruidServerMetadata createRealtimeServerMetadata(String name) - { - return createServerMetadata(name, ServerType.REALTIME); - } - - private DruidServerMetadata createHistoricalServerMetadata(String name) - { - return createServerMetadata(name, ServerType.HISTORICAL); - } - - private DruidServerMetadata createServerMetadata(String name, ServerType type) - { - return new DruidServerMetadata( - name, - name, - null, - 10000, - type, - "tier", - 1 - ); - } - - private DataSegment createSegment(Interval interval, String version, int partitionNumber) - { - return new DataSegment( - "test_ds", - interval, - version, - null, - null, - null, - new NumberedShardSpec(partitionNumber, 100), - 0, 0 - ); - } } diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 504f95c8ac0c..aea0922efb26 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -157,7 +157,7 @@ public void configure(Binder binder) CliPeon.bindChatHandler(binder); CliPeon.bindPeonDataSegmentHandlers(binder); CliPeon.bindRealtimeCache(binder); - CliPeon.bindCoordinatorHandoffNotiferAndClient(binder); + CliPeon.bindCoordinatorHandoffNotifer(binder); binder.install(CliMiddleManager.makeWorkerManagementModule(isZkEnabled)); binder.bind(AppenderatorsManager.class) diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 3c4a1c37df03..bad007af6731 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -38,7 +38,6 @@ import com.google.inject.name.Names; import io.netty.util.SuppressForbidden; import org.apache.druid.client.cache.CacheConfig; -import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.Binders; @@ -244,7 +243,7 @@ public void configure(Binder binder) binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class); bindRealtimeCache(binder); - bindCoordinatorHandoffNotiferAndClient(binder); + bindCoordinatorHandoffNotifer(binder); binder.bind(AppenderatorsManager.class) .to(PeonAppenderatorsManager.class) @@ -446,7 +445,7 @@ static void bindRealtimeCache(Binder binder) binder.install(new CacheModule()); } - static void bindCoordinatorHandoffNotiferAndClient(Binder binder) + static void bindCoordinatorHandoffNotifer(Binder binder) { JsonConfigProvider.bind( binder, @@ -456,8 +455,6 @@ static void bindCoordinatorHandoffNotiferAndClient(Binder binder) binder.bind(SegmentHandoffNotifierFactory.class) .to(CoordinatorBasedSegmentHandoffNotifierFactory.class) .in(LazySingleton.class); - - binder.bind(CoordinatorClient.class).in(LazySingleton.class); } static void configureIntermediaryData(Binder binder)