From c6909ea059d4fbb4d20ae12fc68101b0b54f9f4a Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 27 May 2020 18:37:53 -0700 Subject: [PATCH 01/14] Load broadcast datasources on broker and tasks --- .../indexing/common/task/AbstractTask.java | 6 + .../AppenderatorDriverRealtimeIndexTask.java | 6 + .../common/task/RealtimeIndexTask.java | 6 + .../druid/indexing/common/task/Task.java | 5 + .../indexing/overlord/ForkingTaskRunner.java | 5 + .../SeekableStreamIndexTask.java | 6 + .../druid/client/CachingClusteredClient.java | 4 +- .../org/apache/druid/client/DruidServer.java | 14 ++- .../SegmentLoaderLocalCacheManager.java | 1 - ...egmentServerAnnouncerLifecycleHandler.java | 104 ------------------ ...oordinatorBasedSegmentHandoffNotifier.java | 2 +- .../coordination/DruidServerMetadata.java | 12 +- .../SegmentChangeRequestLoad.java | 9 +- .../druid/server/coordination/ServerType.java | 8 ++ .../coordination/UnprunedDataSegment.java | 75 +++++++++++++ .../server/coordinator/BalancerStrategy.java | 4 +- .../CachingCostBalancerStrategyFactory.java | 6 +- .../coordinator/CostBalancerStrategy.java | 8 +- .../server/coordinator/DruidCluster.java | 46 ++++++-- .../server/coordinator/DruidCoordinator.java | 2 +- .../DruidCoordinatorRuntimeParams.java | 23 +++- .../coordinator/RandomBalancerStrategy.java | 5 +- .../coordinator/ReservoirSegmentSampler.java | 16 ++- .../server/coordinator/ServerHolder.java | 5 + .../coordinator/duty/BalanceSegments.java | 5 +- .../server/coordinator/duty/RunRules.java | 11 +- .../rules/BroadcastDistributionRule.java | 52 +++++---- .../ForeverBroadcastDistributionRule.java | 21 +--- .../IntervalBroadcastDistributionRule.java | 28 +---- .../PeriodBroadcastDistributionRule.java | 29 +---- .../server/http/DataSourcesResource.java | 2 +- ...inatorBasedSegmentHandoffNotifierTest.java | 2 +- .../coordinator/BalanceSegmentsTest.java | 45 ++++++-- .../coordinator/DruidClusterBuilder.java | 9 +- .../ReservoirSegmentSamplerTest.java | 7 +- .../BroadcastDistributionRuleSerdeTest.java | 19 ++-- .../rules/BroadcastDistributionRuleTest.java | 18 ++- .../java/org/apache/druid/cli/CliBroker.java | 12 ++ .../java/org/apache/druid/cli/CliIndexer.java | 15 +-- .../java/org/apache/druid/cli/CliPeon.java | 15 +++ .../druid/sql/calcite/schema/DruidSchema.java | 6 +- 41 files changed, 410 insertions(+), 264 deletions(-) delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java create mode 100644 server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 40745bfe32f5..728f3dedfd0d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -145,6 +145,12 @@ public QueryRunner getQueryRunner(Query query) return null; } + @Override + public boolean supportsQueries() + { + return false; + } + @Override public String getClasspathPrefix() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 1f9865d69d74..bbe900c4c685 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -245,6 +245,12 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(appenderator, responseContext); } + @Override + public boolean supportsQueries() + { + return true; + } + @Override public boolean isReady(TaskActionClient taskActionClient) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index ed2ddd2c604e..055a3fea3be7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -196,6 +196,12 @@ public QueryRunner getQueryRunner(Query query) return plumber.getQueryRunner(query); } + @Override + public boolean supportsQueries() + { + return true; + } + @Override public boolean isReady(TaskActionClient taskActionClient) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index c069c73cec70..4f18c81bc7c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -146,6 +146,11 @@ default int getPriority() */ QueryRunner getQueryRunner(Query query); + /** + * @return true if this Task type is queryable, such as streaming ingestion tasks + */ + boolean supportsQueries(); + /** * Returns an extra classpath that should be prepended to the default classpath when running this task. If no * extra classpath should be prepended, this should return null or the empty string. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 2f1abc1d7f53..bbf639466333 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -327,6 +327,11 @@ public TaskStatus call() command.add(nodeType); } + if (task.supportsQueries()) { + command.add("--loadBroadcastSegments"); + command.add("true"); + } + if (!taskFile.exists()) { jsonMapper.writeValue(taskFile, task); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 9dd4ece02c9b..029e5f9b8210 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -191,6 +191,12 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } + @Override + public boolean supportsQueries() + { + return true; + } + public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) { return appenderatorsManager.createRealtimeAppenderatorForTask( diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index ae4dc1689ccf..c382be3493e0 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -452,7 +452,7 @@ private String computeCurrentEtag(final Set segments, @Nu Hasher hasher = Hashing.sha1().newHasher(); boolean hasOnlyHistoricalSegments = true; for (SegmentServerSelector p : segments) { - if (!p.getServer().pick().getServer().segmentReplicatable()) { + if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) { hasOnlyHistoricalSegments = false; break; } @@ -633,7 +633,7 @@ private void addSequencesFromServer( if (isBySegment) { serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); - } else if (!server.segmentReplicatable() || !populateCache) { + } else if (!server.isSegmentReplicationTarget() || !populateCache) { serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else { serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java index ddcba54f1c3c..6c52866d0586 100644 --- a/server/src/main/java/org/apache/druid/client/DruidServer.java +++ b/server/src/main/java/org/apache/druid/client/DruidServer.java @@ -137,9 +137,19 @@ public String getTier() return metadata.getTier(); } - public boolean segmentReplicatable() + public boolean isSegmentReplicationTarget() { - return metadata.segmentReplicatable(); + return metadata.isSegmentReplicationTarget(); + } + + public boolean isSegmentBroadcastTarget() + { + return metadata.isSegmentBroadcastTarget(); + } + + public boolean isSegmentReplicationOrBroadcastTarget() + { + return metadata.isSegmentReplicationTarget() || metadata.isSegmentBroadcastTarget(); } @JsonProperty diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 398ad679b484..b2ac7e8f35b0 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -89,7 +89,6 @@ public SegmentLoaderLocalCacheManager( this.indexIO = indexIO; this.config = config; this.jsonMapper = mapper; - this.locations = new ArrayList<>(); for (StorageLocationConfig locationConfig : config.getLocations()) { locations.add( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java b/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java deleted file mode 100644 index e874a30c86d9..000000000000 --- a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.realtime; - -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import org.apache.druid.concurrent.LifecycleLock; -import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; - -import java.io.IOException; - -/** - * Ties the {@link DataSegmentServerAnnouncer} announce/unannounce to the lifecycle start and stop. - * - * Analogous to {@link org.apache.druid.server.coordination.SegmentLoadDropHandler} on the Historicals, - * but without segment cache management. - */ -@ManageLifecycle -public class CliIndexerDataSegmentServerAnnouncerLifecycleHandler -{ - private static final EmittingLogger LOG = new EmittingLogger(CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class); - - private final DataSegmentServerAnnouncer dataSegmentServerAnnouncer; - - private final LifecycleLock lifecycleLock = new LifecycleLock(); - - @Inject - public CliIndexerDataSegmentServerAnnouncerLifecycleHandler( - DataSegmentServerAnnouncer dataSegmentServerAnnouncer - ) - { - this.dataSegmentServerAnnouncer = dataSegmentServerAnnouncer; - } - - @LifecycleStart - public void start() throws IOException - { - if (!lifecycleLock.canStart()) { - throw new RuntimeException("Lifecycle lock could not start"); - } - - try { - if (lifecycleLock.isStarted()) { - return; - } - - LOG.info("Starting..."); - try { - dataSegmentServerAnnouncer.announce(); - } - catch (Exception e) { - Throwables.propagateIfPossible(e, IOException.class); - throw new RuntimeException(e); - } - LOG.info("Started."); - lifecycleLock.started(); - } - finally { - lifecycleLock.exitStart(); - } - } - - @LifecycleStop - public void stop() - { - if (!lifecycleLock.canStop()) { - throw new RuntimeException("Lifecycle lock could not stop"); - } - - if (!lifecycleLock.isStarted()) { - return; - } - - LOG.info("Stopping..."); - try { - dataSegmentServerAnnouncer.unannounce(); - } - catch (Exception e) { - throw new RuntimeException(e); - } - LOG.info("Stopped."); - } -} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java index bbee720a88ce..2e97258a52da 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -142,7 +142,7 @@ static boolean isHandOffComplete(List serverView, Segm && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber() && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 - && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) { + && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::isSegmentReplicationOrBroadcastTarget)) { return true; } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java index e3673bbc9cae..3fda41b08dab 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java @@ -107,11 +107,21 @@ public int getPriority() return priority; } - public boolean segmentReplicatable() + public boolean isSegmentReplicationTarget() { return type.isSegmentReplicationTarget(); } + public boolean isSegmentBroadcastTarget() + { + return type.isSegmentBroadcastTarget(); + } + + public boolean isSegmentReplicationOrBroadcastTarget() + { + return isSegmentReplicationTarget() || isSegmentBroadcastTarget(); + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java index 097e02523032..3f0010e9cb52 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java @@ -35,9 +35,16 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest { private final DataSegment segment; + public SegmentChangeRequestLoad( + DataSegment segment + ) + { + this.segment = segment; + } + @JsonCreator public SegmentChangeRequestLoad( - @JsonUnwrapped DataSegment segment + @JsonUnwrapped UnprunedDataSegment segment ) { this.segment = segment; diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java index 42fb65a3fdfb..0b860a1b0afe 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java @@ -63,6 +63,14 @@ public boolean isSegmentReplicationTarget() { return false; } + }, + + BROKER { + @Override + public boolean isSegmentReplicationTarget() + { + return false; + } }; /** diff --git a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java new file mode 100644 index 000000000000..2c11ada82cb4 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.druid.jackson.CommaListJoinDeserializer; +import org.apache.druid.timeline.CompactionState; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; + +public class UnprunedDataSegment extends DataSegment +{ + @JsonCreator + public UnprunedDataSegment( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("version") String version, + // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution + @JsonProperty("loadSpec") @Nullable Map loadSpec, + @JsonProperty("dimensions") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List dimensions, + @JsonProperty("metrics") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List metrics, + @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, + @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, + @JsonProperty("binaryVersion") Integer binaryVersion, + @JsonProperty("size") long size, + @JacksonInject PruneSpecsHolder pruneSpecsHolder + ) + { + super( + dataSource, + interval, + version, + loadSpec, + dimensions, + metrics, + shardSpec, + lastCompactionState, + binaryVersion, + size, + PruneSpecsHolder.DEFAULT + ); + + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index d9fea81e3a95..cc72b00efdc4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; /** * This interface describes the coordinator balancing strategy, which is responsible for making decisions on where @@ -56,11 +57,12 @@ public interface BalancerStrategy /** * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy. * @param serverHolders set of historicals to consider for moving segments + * @param broadcastDatasources * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). */ @Nullable - BalancerSegmentHolder pickSegmentToMove(List serverHolders); + BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources); /** * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java index 4a1989df24df..1741087e8c5c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategyFactory.java @@ -71,7 +71,7 @@ public CachingCostBalancerStrategyFactory( @Override public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) { - if (server.segmentReplicatable()) { + if (server.isSegmentReplicationTarget()) { clusterCostCacheBuilder.addSegment(server.getName(), segment); } return ServerView.CallbackAction.CONTINUE; @@ -80,7 +80,7 @@ public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSe @Override public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) { - if (server.segmentReplicatable()) { + if (server.isSegmentReplicationTarget()) { clusterCostCacheBuilder.removeSegment(server.getName(), segment); } return ServerView.CallbackAction.CONTINUE; @@ -98,7 +98,7 @@ public ServerView.CallbackAction segmentViewInitialized() serverInventoryView.registerServerRemovedCallback( executor, server -> { - if (server.segmentReplicatable()) { + if (server.isSegmentReplicationTarget()) { clusterCostCacheBuilder.removeServer(server.getName()); } return ServerView.CallbackAction.CONTINUE; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index 4fd4164f3001..5d656d643f99 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -211,9 +212,12 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable @Override - public BalancerSegmentHolder pickSegmentToMove(final List serverHolders) + public BalancerSegmentHolder pickSegmentToMove( + final List serverHolders, + Set broadcastDatasources + ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index 318f663609b0..7724b077e644 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -47,24 +47,28 @@ public class DruidCluster @VisibleForTesting static DruidCluster createDruidClusterFromBuilderInTest( @Nullable Set realtimes, - Map> historicals + Map> historicals, + @Nullable Set brokers ) { - return new DruidCluster(realtimes, historicals); + return new DruidCluster(realtimes, historicals, brokers); } private final Set realtimes; private final Map> historicals; + private final Set brokers; public DruidCluster() { this.realtimes = new HashSet<>(); this.historicals = new HashMap<>(); + this.brokers = new HashSet<>(); } private DruidCluster( @Nullable Set realtimes, - Map> historicals + Map> historicals, + @Nullable Set brokers ) { this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); @@ -72,6 +76,7 @@ private DruidCluster( historicals, holders -> CollectionUtils.newTreeSet(Comparator.reverseOrder(), holders) ); + this.brokers = brokers == null ? new HashSet<>() : new HashSet<>(brokers); } public void add(ServerHolder serverHolder) @@ -87,7 +92,11 @@ public void add(ServerHolder serverHolder) addHistorical(serverHolder); break; case INDEXER_EXECUTOR: - throw new IAE("unsupported server type[%s]", serverHolder.getServer().getType()); + addRealtime(serverHolder); + break; + case BROKER: + addBroker(serverHolder); + break; default: throw new IAE("unknown server type[%s]", serverHolder.getServer().getType()); } @@ -108,6 +117,11 @@ private void addHistorical(ServerHolder serverHolder) tierServers.add(serverHolder); } + private void addBroker(ServerHolder serverHolder) + { + brokers.add(serverHolder); + } + public Set getRealtimes() { return realtimes; @@ -118,6 +132,12 @@ public Map> getHistoricals() return historicals; } + + public Set getBrokers() + { + return brokers; + } + public Iterable getTierNames() { return historicals.keySet(); @@ -131,10 +151,12 @@ public NavigableSet getHistoricalsByTier(String tier) public Collection getAllServers() { final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum(); + final int brokerSize = brokers.size(); final int realtimeSize = realtimes.size(); final List allServers = new ArrayList<>(historicalSize + realtimeSize); historicals.values().forEach(allServers::addAll); + allServers.addAll(brokers); allServers.addAll(realtimes); return allServers; } @@ -146,7 +168,7 @@ public Iterable> getSortedHistoricalsByTier() public boolean isEmpty() { - return historicals.isEmpty() && realtimes.isEmpty(); + return historicals.isEmpty() && realtimes.isEmpty() && brokers.isEmpty(); } public boolean hasHistoricals() @@ -159,9 +181,19 @@ public boolean hasRealtimes() return !realtimes.isEmpty(); } + public boolean hasBrokers() + { + return !brokers.isEmpty(); + } + public boolean hasTier(String tier) { - NavigableSet servers = historicals.get(tier); - return (servers != null) && !servers.isEmpty(); + NavigableSet historicalServers = historicals.get(tier); + boolean historicalsHasTier = (historicalServers != null) && !historicalServers.isEmpty(); + if (historicalsHasTier) { + return true; + } + + return false; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index f8c3f43c76f5..36a414e780d9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -761,7 +761,7 @@ List prepareCurrentServers() List currentServers = serverInventoryView .getInventory() .stream() - .filter(DruidServer::segmentReplicatable) + .filter(DruidServer::isSegmentReplicationOrBroadcastTarget) .map(DruidServer::toImmutableDruidServer) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 7bae11e61db0..3337b8e7647a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -34,7 +34,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; @@ -70,6 +72,7 @@ private static TreeSet createUsedSegmentsSet(Iterable private final CoordinatorStats stats; private final DateTime balancerReferenceTimestamp; private final BalancerStrategy balancerStrategy; + private final Set broadcastDatasources; private DruidCoordinatorRuntimeParams( long startTimeNanos, @@ -85,7 +88,8 @@ private DruidCoordinatorRuntimeParams( CoordinatorCompactionConfig coordinatorCompactionConfig, CoordinatorStats stats, DateTime balancerReferenceTimestamp, - BalancerStrategy balancerStrategy + BalancerStrategy balancerStrategy, + Set broadcastDatasources ) { this.startTimeNanos = startTimeNanos; @@ -102,6 +106,7 @@ private DruidCoordinatorRuntimeParams( this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; this.balancerStrategy = balancerStrategy; + this.broadcastDatasources = broadcastDatasources; } public long getStartTimeNanos() @@ -180,6 +185,11 @@ public BalancerStrategy getBalancerStrategy() return balancerStrategy; } + public Set getBroadcastDatasources() + { + return broadcastDatasources; + } + public boolean coordinatorIsLeadingEnoughTimeToMarkAsUnusedOvershadowedSegements() { long nanosElapsedSinceCoordinatorStart = System.nanoTime() - getStartTimeNanos(); @@ -256,6 +266,7 @@ public static class Builder private CoordinatorStats stats; private DateTime balancerReferenceTimestamp; private BalancerStrategy balancerStrategy; + private Set broadcastDatasources; private Builder() { @@ -272,6 +283,7 @@ private Builder() this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build(); this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty(); this.balancerReferenceTimestamp = DateTimes.nowUtc(); + this.broadcastDatasources = new HashSet<>(); } Builder( @@ -324,7 +336,8 @@ public DruidCoordinatorRuntimeParams build() coordinatorCompactionConfig, stats, balancerReferenceTimestamp, - balancerStrategy + balancerStrategy, + broadcastDatasources ); } @@ -436,5 +449,11 @@ public Builder withBalancerStrategy(BalancerStrategy balancerStrategy) this.balancerStrategy = balancerStrategy; return this; } + + public Builder withBroadcastDatasources(Set broadcastDatasources) + { + this.broadcastDatasources = broadcastDatasources; + return this; + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 8b0b30698175..72fdedf6e453 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; public class RandomBalancerStrategy implements BalancerStrategy @@ -51,9 +52,9 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders) + public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index c2c4a7ad10c4..7181d52e152a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -22,19 +22,33 @@ import org.apache.druid.timeline.DataSegment; import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; final class ReservoirSegmentSampler { - static BalancerSegmentHolder getRandomBalancerSegmentHolder(final List serverHolders) + static BalancerSegmentHolder getRandomBalancerSegmentHolder( + final List serverHolders, + Set broadcastDatasources + ) { ServerHolder fromServerHolder = null; DataSegment proposalSegment = null; int numSoFar = 0; for (ServerHolder server : serverHolders) { + if (!server.getServer().getType().isSegmentReplicationTarget()) { + // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do + continue; + } + for (DataSegment segment : server.getServer().iterateAllSegments()) { + if (broadcastDatasources.contains(segment.getDataSource())) { + // we don't need to rebalance segments that were assigned via broadcast rules + continue; + } + int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1); // w.p. 1 / (numSoFar+1), swap out the server and segment if (randNum == numSoFar) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index ba96566a4dfd..26fa9a54c7ff 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -122,6 +122,11 @@ public boolean isLoadingSegment(DataSegment segment) return peon.getSegmentsToLoad().contains(segment); } + public boolean isDroppingSegment(DataSegment segment) + { + return peon.getSegmentsToDrop().contains(segment); + } + public int getNumberOfSegmentsInQueue() { return peon.getNumberOfSegmentsInQueue(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index d42ca635bd62..a1c5237ddd15 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -187,7 +187,10 @@ private Pair balanceServers( //noinspection ForLoopThatDoesntUseLoopVariable for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { - final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom); + final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove( + toMoveFrom, + params.getBroadcastDatasources() + ); if (segmentToMoveHolder == null) { log.info("All servers to move segments from are empty, ending run."); break; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 5288bb35858f..125fcafbc21a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -28,11 +28,13 @@ import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ReplicationThrottler; +import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -101,6 +103,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final List segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES); int missingRules = 0; + final Set broadcastDatasources = new HashSet<>(); for (DataSegment segment : params.getUsedSegments()) { if (overshadowed.contains(segment.getId())) { // Skipping overshadowed segments @@ -112,6 +115,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if (rule.appliesTo(segment, now)) { stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); foundMatchingRule = true; + if (rule instanceof BroadcastDistributionRule) { + broadcastDatasources.add(segment.getDataSource()); + } break; } } @@ -131,6 +137,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .emit(); } - return params.buildFromExisting().withCoordinatorStats(stats).build(); + return params.buildFromExisting() + .withCoordinatorStats(stats) + .withBroadcastDatasources(broadcastDatasources) + .build(); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index 658171236adc..35ff39ea6505 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.rules; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -27,8 +28,8 @@ import org.apache.druid.timeline.DataSegment; import java.util.HashSet; -import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public abstract class BroadcastDistributionRule implements Rule { @@ -37,30 +38,35 @@ public abstract class BroadcastDistributionRule implements Rule @Override public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) { - // Find servers which holds the segments of co-located data source - final Set loadServerHolders = new HashSet<>(); final Set dropServerHolders = new HashSet<>(); - final List colocatedDataSources = getColocatedDataSources(); - if (colocatedDataSources == null || colocatedDataSources.isEmpty()) { - loadServerHolders.addAll(params.getDruidCluster().getAllServers()); - } else { - params.getDruidCluster().getAllServers().forEach( - eachHolder -> { - if (!eachHolder.isDecommissioning() - && colocatedDataSources.stream() - .anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) { - loadServerHolders.add(eachHolder); - } else if (eachHolder.isServingSegment(segment)) { - if (!eachHolder.getPeon().getSegmentsToDrop().contains(segment)) { - dropServerHolders.add(eachHolder); - } - } - } - ); - } - final CoordinatorStats stats = new CoordinatorStats(); + // Find servers where we need to load the broadcast segments + final Set loadServerHolders = + params.getDruidCluster().getAllServers() + .stream() + .filter( + (serverHolder) -> { + ServerType serverType = serverHolder.getServer().getType(); + if (!serverType.isSegmentBroadcastTarget()) { + return false; + } + + final boolean isServingSegment = + serverHolder.isServingSegment(segment); + + if (serverHolder.isDecommissioning()) { + if (isServingSegment && !serverHolder.isDroppingSegment(segment)) { + dropServerHolders.add(serverHolder); + } + return false; + } + return !isServingSegment && !serverHolder.isLoadingSegment(segment); + } + ) + .collect(Collectors.toSet()); + + final CoordinatorStats stats = new CoordinatorStats(); return stats.accumulate(assign(loadServerHolders, segment)) .accumulate(drop(dropServerHolders, segment)); } @@ -110,6 +116,4 @@ private CoordinatorStats drop( return stats; } - - public abstract List getColocatedDataSources(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java index d095f1100aea..ef5094cbea4a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverBroadcastDistributionRule.java @@ -25,21 +25,16 @@ import org.joda.time.DateTime; import org.joda.time.Interval; -import java.util.List; import java.util.Objects; public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule { static final String TYPE = "broadcastForever"; - private final List colocatedDataSources; - @JsonCreator - public ForeverBroadcastDistributionRule( - @JsonProperty("colocatedDataSources") List colocatedDataSources - ) + public ForeverBroadcastDistributionRule() { - this.colocatedDataSources = colocatedDataSources; + } @Override @@ -49,13 +44,6 @@ public String getType() return TYPE; } - @Override - @JsonProperty - public List getColocatedDataSources() - { - return colocatedDataSources; - } - @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) { @@ -79,13 +67,12 @@ public boolean equals(Object o) return false; } - ForeverBroadcastDistributionRule that = (ForeverBroadcastDistributionRule) o; - return Objects.equals(colocatedDataSources, that.colocatedDataSources); + return true; } @Override public int hashCode() { - return Objects.hash(getType(), colocatedDataSources); + return Objects.hash(getType()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java index c40dff7268aa..b1bf29eedd20 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalBroadcastDistributionRule.java @@ -25,23 +25,19 @@ import org.joda.time.DateTime; import org.joda.time.Interval; -import java.util.List; import java.util.Objects; public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule { static final String TYPE = "broadcastByInterval"; private final Interval interval; - private final List colocatedDataSources; @JsonCreator public IntervalBroadcastDistributionRule( - @JsonProperty("interval") Interval interval, - @JsonProperty("colocatedDataSources") List colocatedDataSources + @JsonProperty("interval") Interval interval ) { this.interval = interval; - this.colocatedDataSources = colocatedDataSources; } @Override @@ -51,13 +47,6 @@ public String getType() return TYPE; } - @Override - @JsonProperty - public List getColocatedDataSources() - { - return colocatedDataSources; - } - @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) { @@ -79,26 +68,19 @@ public Interval getInterval() @Override public boolean equals(Object o) { - if (o == this) { + if (this == o) { return true; } - - if (o == null || o.getClass() != getClass()) { + if (o == null || getClass() != o.getClass()) { return false; } - IntervalBroadcastDistributionRule that = (IntervalBroadcastDistributionRule) o; - - if (!Objects.equals(interval, that.interval)) { - return false; - } - - return Objects.equals(colocatedDataSources, that.colocatedDataSources); + return Objects.equals(getInterval(), that.getInterval()); } @Override public int hashCode() { - return Objects.hash(getType(), interval, colocatedDataSources); + return Objects.hash(getInterval()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java index 97c6e11cfba2..d48353d3e50a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java @@ -26,7 +26,6 @@ import org.joda.time.Interval; import org.joda.time.Period; -import java.util.List; import java.util.Objects; public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule @@ -36,18 +35,15 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule private final Period period; private final boolean includeFuture; - private final List colocatedDataSources; @JsonCreator public PeriodBroadcastDistributionRule( @JsonProperty("period") Period period, - @JsonProperty("includeFuture") Boolean includeFuture, - @JsonProperty("colocatedDataSources") List colocatedDataSources + @JsonProperty("includeFuture") Boolean includeFuture ) { this.period = period; this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture; - this.colocatedDataSources = colocatedDataSources; } @Override @@ -57,13 +53,6 @@ public String getType() return TYPE; } - @Override - @JsonProperty - public List getColocatedDataSources() - { - return colocatedDataSources; - } - @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) { @@ -94,25 +83,17 @@ public boolean equals(Object o) if (this == o) { return true; } - - if (o == null || o.getClass() != getClass()) { + if (o == null || getClass() != o.getClass()) { return false; } - PeriodBroadcastDistributionRule that = (PeriodBroadcastDistributionRule) o; - - if (!Objects.equals(period, that.period)) { - return false; - } - if (includeFuture != that.includeFuture) { - return false; - } - return Objects.equals(colocatedDataSources, that.colocatedDataSources); + return isIncludeFuture() == that.isIncludeFuture() && + Objects.equals(getPeriod(), that.getPeriod()); } @Override public int hashCode() { - return Objects.hash(getType(), period, colocatedDataSources); + return Objects.hash(getPeriod(), isIncludeFuture()); } } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 040297521b80..b6d310f1ba73 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -769,7 +769,7 @@ static boolean isSegmentLoaded(Iterable servedSegments && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber() && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 && Iterables.any( - segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable + segmentLoadInfo.getServers(), DruidServerMetadata::isSegmentReplicationTarget )) { return true; } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java index 47157347a413..f5534bd46a3b 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -174,7 +174,7 @@ public void testHandoffChecksForAssignableServer() ) ); - Assert.assertFalse( + Assert.assertTrue( CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( Collections.singletonList( new ImmutableSegmentLoadInfo( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index 084a119ebe76..f37c92cb1056 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -43,6 +43,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -66,9 +67,11 @@ public class BalanceSegmentsTest private DataSegment segment2; private DataSegment segment3; private DataSegment segment4; + private DataSegment segment5; private List segments; private ListeningExecutorService balancerStrategyExecutor; private BalancerStrategy balancerStrategy; + private Set broadcastDatasources; @Before public void setUp() @@ -82,6 +85,7 @@ public void setUp() segment2 = EasyMock.createMock(DataSegment.class); segment3 = EasyMock.createMock(DataSegment.class); segment4 = EasyMock.createMock(DataSegment.class); + segment5 = EasyMock.createMock(DataSegment.class); DateTime start1 = DateTimes.of("2012-01-01"); DateTime start2 = DateTimes.of("2012-02-01"); @@ -130,12 +134,24 @@ public void setUp() 0, 8L ); + segment5 = new DataSegment( + "datasourceBroadcast", + new Interval(start2, start2.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 8L + ); segments = new ArrayList<>(); segments.add(segment1); segments.add(segment2); segments.add(segment3); segments.add(segment4); + segments.add(segment5); peon1 = new LoadQueuePeonTester(); peon2 = new LoadQueuePeonTester(); @@ -147,6 +163,8 @@ public void setUp() balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor); + + broadcastDatasources = Collections.singleton("datasourceBroadcast"); } @After @@ -187,10 +205,11 @@ public void testMoveToEmptyServerBalancer() ImmutableList.of(peon1, peon2) ) .withBalancerStrategy(predefinedPickOrderStrategy) + .withBroadcastDatasources(broadcastDatasources) .build(); params = new BalanceSegmentsTester(coordinator).run(params); - Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + Assert.assertEquals(3, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); } /** @@ -213,10 +232,10 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)))) + EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources)) .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) .andReturn(new BalancerSegmentHolder(druidServer2, segment4)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)); @@ -237,6 +256,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() .build() // ceil(3 * 0.6) = 2 segments from decommissioning servers ) .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) .build(); params = new BalanceSegmentsTester(coordinator).run(params); @@ -280,7 +300,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)) .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) @@ -303,6 +323,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi .build() ) .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) .build(); params = new BalanceSegmentsTester(coordinator).run(params); @@ -328,7 +349,7 @@ public void testMoveToDecommissioningServer() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -343,6 +364,7 @@ public void testMoveToDecommissioningServer() ImmutableList.of(false, true) ) .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) .build(); params = new BalanceSegmentsTester(coordinator).run(params); @@ -362,7 +384,7 @@ public void testMoveFromDecommissioningServer() ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -377,6 +399,7 @@ public void testMoveFromDecommissioningServer() ) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build()) .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) .build(); params = new BalanceSegmentsTester(coordinator).run(params); @@ -412,6 +435,7 @@ public void testMoveMaxLoadQueueServerBalancer() ImmutableList.of(peon1, peon2) ) .withBalancerStrategy(predefinedPickOrderStrategy) + .withBroadcastDatasources(broadcastDatasources) .withDynamicConfigs( CoordinatorDynamicConfig .builder() @@ -451,6 +475,7 @@ public void testMoveSameSegmentTwice() ImmutableList.of(peon1, peon2) ) .withBalancerStrategy(predefinedPickOrderStrategy) + .withBroadcastDatasources(broadcastDatasources) .withDynamicConfigs( CoordinatorDynamicConfig.builder().withMaxSegmentsToMove( 2 @@ -542,6 +567,7 @@ private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( ) .withUsedSegmentsInTest(segments) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) + .withBroadcastDatasources(broadcastDatasources) .withBalancerStrategy(balancerStrategy); } @@ -611,7 +637,7 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders) + public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) { return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); } @@ -635,9 +661,9 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3]) BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)))) + EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources)) .andReturn(new BalancerSegmentHolder(druidServer2, segment2)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) @@ -656,6 +682,7 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM .build() ) .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) .build(); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java index 772b7aec1401..5fb100073eae 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterBuilder.java @@ -35,6 +35,7 @@ public static DruidClusterBuilder newBuilder() private @Nullable Set realtimes = null; private final Map> historicals = new HashMap<>(); + private @Nullable Set brokers = null; private DruidClusterBuilder() { @@ -46,6 +47,12 @@ public DruidClusterBuilder withRealtimes(ServerHolder... realtimes) return this; } + public DruidClusterBuilder withBrokers(ServerHolder... brokers) + { + this.brokers = new HashSet<>(Arrays.asList(brokers)); + return this; + } + public DruidClusterBuilder addTier(String tierName, ServerHolder... historicals) { if (this.historicals.putIfAbsent(tierName, Arrays.asList(historicals)) != null) { @@ -56,6 +63,6 @@ public DruidClusterBuilder addTier(String tierName, ServerHolder... historicals) public DruidCluster build() { - return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals); + return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals, brokers); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 73e829ce0e0c..8aef2f2e10fb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -23,6 +23,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServerTests; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -136,6 +137,7 @@ public void setUp() @Test public void getRandomBalancerSegmentHolderTest() { + EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); @@ -143,6 +145,7 @@ public void getRandomBalancerSegmentHolderTest() EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer1); + EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce(); @@ -151,6 +154,7 @@ public void getRandomBalancerSegmentHolderTest() EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer2); + EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes(); EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce(); @@ -159,6 +163,7 @@ public void getRandomBalancerSegmentHolderTest() EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer3); + EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce(); EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes(); EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce(); @@ -186,7 +191,7 @@ public void getRandomBalancerSegmentHolderTest() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < 5000; i++) { - segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1); + segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()).getSegment(), 1); } for (DataSegment segment : segments) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java index e3b51a51b02d..0dfe0eab0ac1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; @@ -44,15 +43,15 @@ public class BroadcastDistributionRuleSerdeTest public static List constructorFeeder() { return Lists.newArrayList( - new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of("large_source1", "large_source2"))}, - new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of())}, - new Object[]{new ForeverBroadcastDistributionRule(null)}, - new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of("large_source"))}, - new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of())}, - new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), null)}, - new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of("large_source"))}, - new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of())}, - new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, null)} + new Object[]{new ForeverBroadcastDistributionRule()}, + new Object[]{new ForeverBroadcastDistributionRule()}, + new Object[]{new ForeverBroadcastDistributionRule()}, + new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))}, + new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))}, + new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))}, + new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)}, + new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)}, + new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)} ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index 70ec3ebec052..c2d4fd3d0004 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator.rules; -import com.google.common.collect.ImmutableList; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -269,7 +268,7 @@ public void setUp() public void testBroadcastToSingleDataSource() { final ForeverBroadcastDistributionRule rule = - new ForeverBroadcastDistributionRule(ImmutableList.of("large_source")); + new ForeverBroadcastDistributionRule(); CoordinatorStats stats = rule.run( null, @@ -285,7 +284,7 @@ public void testBroadcastToSingleDataSource() smallSegment ); - Assert.assertEquals(3L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); + Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); Assert.assertFalse(stats.hasPerTierStats()); Assert.assertTrue( @@ -295,10 +294,10 @@ public void testBroadcastToSingleDataSource() Assert.assertTrue( holdersOfLargeSegments2.stream() - .noneMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) ); - Assert.assertFalse(holderOfSmallSegment.getPeon().getSegmentsToLoad().contains(smallSegment)); + Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment)); } private static DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams( @@ -331,7 +330,7 @@ private static DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams( public void testBroadcastDecommissioning() { final ForeverBroadcastDistributionRule rule = - new ForeverBroadcastDistributionRule(ImmutableList.of("large_source")); + new ForeverBroadcastDistributionRule(); CoordinatorStats stats = rule.run( null, @@ -356,7 +355,6 @@ public void testBroadcastDecommissioning() public void testBroadcastToMultipleDataSources() { final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule( - ImmutableList.of("large_source", "large_source2") ); CoordinatorStats stats = rule.run( @@ -392,7 +390,7 @@ public void testBroadcastToMultipleDataSources() @Test public void testBroadcastToAllServers() { - final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(null); + final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); CoordinatorStats stats = rule.run( null, @@ -408,14 +406,14 @@ public void testBroadcastToAllServers() smallSegment ); - Assert.assertEquals(6L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); + Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); Assert.assertFalse(stats.hasPerTierStats()); Assert.assertTrue( druidCluster .getAllServers() .stream() - .allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment)) + .allMatch(holder -> holder.isLoadingSegment(smallSegment) || holder.isServingSegment(smallSegment)) ); } } diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 5badcb0e0e3a..ae954dcab317 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -42,9 +42,11 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; import org.apache.druid.guice.SegmentWranglerModule; +import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.RetryQueryRunnerConfig; @@ -52,7 +54,11 @@ import org.apache.druid.server.BrokerQueryResource; import org.apache.druid.server.ClientInfoResource; import org.apache.druid.server.ClientQuerySegmentWalker; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.ZkCoordinator; import org.apache.druid.server.http.BrokerResource; +import org.apache.druid.server.http.HistoricalResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; @@ -123,6 +129,12 @@ protected List getModules() Jerseys.addResource(binder, HttpServerInventoryViewResource.class); LifecycleModule.register(binder, Server.class); + binder.bind(SegmentManager.class).in(LazySingleton.class); + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.BROKER)); + Jerseys.addResource(binder, HistoricalResource.class); + + LifecycleModule.register(binder, ZkCoordinator.class); bindNodeRoleAndAnnouncer( binder, diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 7483fec93a93..edb7205e53be 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -25,7 +25,6 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; -import com.google.inject.util.Providers; import io.airlift.airline.Command; import org.apache.druid.client.DruidServer; import org.apache.druid.discovery.DataNodeService; @@ -43,6 +42,7 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; import org.apache.druid.guice.QueryablePeonModule; @@ -60,12 +60,13 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.lookup.LookupModule; -import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; import org.apache.druid.server.DruidNode; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.ZkCoordinator; +import org.apache.druid.server.http.HistoricalResource; import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -138,14 +139,14 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); Jerseys.addResource(binder, SegmentListerResource.class); - - LifecycleModule.register(binder, CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class); - Jerseys.addResource(binder, ShuffleResource.class); LifecycleModule.register(binder, Server.class, RemoteChatHandler.class); - binder.bind(SegmentLoadDropHandler.class).toProvider(Providers.of(null)); + binder.bind(SegmentManager.class).in(LazySingleton.class); + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + Jerseys.addResource(binder, HistoricalResource.class); + LifecycleModule.register(binder, ZkCoordinator.class); bindNodeRoleAndAnnouncer( binder, diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 1160eb9240c1..1928578415c0 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -109,8 +109,11 @@ import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.ZkCoordinator; +import org.apache.druid.server.http.HistoricalResource; import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -154,6 +157,10 @@ public class CliPeon extends GuiceRunnable @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK") public String serverType = "indexer-executor"; + + @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") + public String loadBroadcastSegments = "false"; + private static final Logger log = new Logger(CliPeon.class); @Inject @@ -174,6 +181,7 @@ protected List getModules() new JoinableFactoryModule(), new Module() { + @SuppressForbidden(reason = "System#out, System#err") @Override public void configure(Binder binder) { @@ -218,6 +226,13 @@ public void configure(Binder binder) Jerseys.addResource(binder, SegmentListerResource.class); binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType))); LifecycleModule.register(binder, Server.class); + + if ("true".equals(loadBroadcastSegments)) { + binder.bind(SegmentManager.class).in(LazySingleton.class); + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + Jerseys.addResource(binder, HistoricalResource.class); + LifecycleModule.register(binder, ZkCoordinator.class); + } } @Provides diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 9f087eaf9ccd..54e7e5a8f961 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -355,7 +355,7 @@ void addSegment(final DruidServerMetadata server, final DataSegment segment) AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; if (segmentMetadata == null) { // segmentReplicatable is used to determine if segments are served by historical or realtime servers - long isRealtime = server.segmentReplicatable() ? 0 : 1; + long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; segmentMetadata = AvailableSegmentMetadata.builder( segment, isRealtime, @@ -366,7 +366,7 @@ void addSegment(final DruidServerMetadata server, final DataSegment segment) // Unknown segment. setAvailableSegmentMetadata(segment.getId(), segmentMetadata); segmentsNeedingRefresh.add(segment.getId()); - if (!server.segmentReplicatable()) { + if (!server.isSegmentReplicationTarget()) { log.debug("Added new mutable segment[%s].", segment.getId()); mutableSegments.add(segment.getId()); } else { @@ -384,7 +384,7 @@ void addSegment(final DruidServerMetadata server, final DataSegment segment) .withRealtime(recomputeIsRealtime(servers)) .build(); knownSegments.put(segment.getId(), metadataWithNumReplicas); - if (server.segmentReplicatable()) { + if (server.isSegmentReplicationTarget()) { // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, // even if it's also available on non-replicatable (realtime) servers. mutableSegments.remove(segment.getId()); From 5b145d3bcd59f25b9576e8b4d069be8ed9990b9f Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 2 Jun 2020 18:35:52 -0700 Subject: [PATCH 02/14] Add javadocs --- .../druid/indexing/overlord/ForkingTaskRunner.java | 2 ++ .../server/coordination/SegmentChangeRequestLoad.java | 11 ++++++++--- .../server/coordination/UnprunedDataSegment.java | 6 ++++++ .../druid/server/coordinator/BalancerStrategy.java | 7 ++++++- .../src/main/java/org/apache/druid/cli/CliPeon.java | 4 ++++ 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index bbf639466333..10f5e6b5e9c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -327,6 +327,8 @@ public TaskStatus call() command.add(nodeType); } + // If the task type is queryable, we need to load broadcast segments on the peon, used for + // join queries if (task.supportsQueries()) { command.add("--loadBroadcastSegments"); command.add("true"); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java index 3f0010e9cb52..a6db9bd0135f 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java @@ -35,21 +35,26 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest { private final DataSegment segment; + /** + * To avoid pruning of the loadSpec on the broker, needed when the broker is loading broadcast segments, + * we deserialize into an {@link UnprunedDataSegment}, which never removes the loadSpec. + */ + @JsonCreator public SegmentChangeRequestLoad( - DataSegment segment + @JsonUnwrapped UnprunedDataSegment segment ) { this.segment = segment; } - @JsonCreator public SegmentChangeRequestLoad( - @JsonUnwrapped UnprunedDataSegment segment + DataSegment segment ) { this.segment = segment; } + @Override public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCallback callback) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java index 2c11ada82cb4..f5860d055670 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java +++ b/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java @@ -33,6 +33,12 @@ import java.util.List; import java.util.Map; +/** + * A deserialization aid used by {@link SegmentChangeRequestLoad}. The broker prunes the loadSpec from segments + * for efficiency reasons, but the broker does need the loadSpec when it loads broadcast segments. + * + * This class always uses the non-pruning default {@link org.apache.druid.timeline.DataSegment.PruneSpecsHolder}. + */ public class UnprunedDataSegment extends DataSegment { @JsonCreator diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index cc72b00efdc4..889c167c8ef5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -57,7 +57,12 @@ public interface BalancerStrategy /** * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy. * @param serverHolders set of historicals to consider for moving segments - * @param broadcastDatasources + * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules. + * Balancing strategies should avoid rebalancing segments for such datasources, since + * they should be loaded on all servers anyway. + * NOTE: this should really be handled on a per-segment basis, to properly support + * the interval or period-based broadcast rules. For simplicity of the initial + * implementation, only forever broadcast rules are supported. * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). */ diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 1928578415c0..ea5061b33211 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -158,6 +158,10 @@ public class CliPeon extends GuiceRunnable public String serverType = "indexer-executor"; + /** + * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for + * queryable tasks, such as streaming ingestion tasks. + */ @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") public String loadBroadcastSegments = "false"; From a07a561f31112536f543df68729c5d7f39097d8d Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 2 Jun 2020 18:42:49 -0700 Subject: [PATCH 03/14] Support HTTP segment management --- .../druid/discovery/DruidNodeDiscoveryProvider.java | 3 ++- .../main/java/org/apache/druid/cli/CliBroker.java | 5 ++++- .../src/main/java/org/apache/druid/cli/CliPeon.java | 13 ------------- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java index 733202981839..898ce7c75956 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java @@ -44,7 +44,8 @@ public abstract class DruidNodeDiscoveryProvider private static final Map> SERVICE_TO_NODE_TYPES = ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.BROKER, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER), - DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER), + DataNodeService.DISCOVERY_SERVICE_KEY, + ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER, NodeRole.BROKER), WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER) ); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index ae954dcab317..18d2813fa411 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -33,6 +33,7 @@ import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig; import org.apache.druid.client.selector.ServerSelectorStrategy; import org.apache.druid.client.selector.TierSelectorStrategy; +import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.CacheModule; @@ -59,6 +60,7 @@ import org.apache.druid.server.coordination.ZkCoordinator; import org.apache.druid.server.http.BrokerResource; import org.apache.druid.server.http.HistoricalResource; +import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; @@ -133,6 +135,7 @@ protected List getModules() binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.BROKER)); Jerseys.addResource(binder, HistoricalResource.class); + Jerseys.addResource(binder, SegmentListerResource.class); LifecycleModule.register(binder, ZkCoordinator.class); @@ -140,7 +143,7 @@ protected List getModules() binder, DiscoverySideEffectsProvider .builder(NodeRole.BROKER) - .serviceClasses(ImmutableList.of(LookupNodeService.class)) + .serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class)) .useLegacyAnnouncer(true) .build() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index ea5061b33211..59ef48aee96a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -61,7 +61,6 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Parent; import org.apache.druid.guice.annotations.Self; -import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; @@ -110,7 +109,6 @@ import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ZkCoordinator; import org.apache.druid.server.http.HistoricalResource; @@ -120,7 +118,6 @@ import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.eclipse.jetty.server.Server; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -266,16 +263,6 @@ public String getTaskIDFromTask(final Task task) { return task.getId(); } - - @Provides - public SegmentListerResource getSegmentListerResource( - @Json ObjectMapper jsonMapper, - @Smile ObjectMapper smileMapper, - @Nullable BatchDataSegmentAnnouncer announcer - ) - { - return new SegmentListerResource(jsonMapper, smileMapper, announcer, null); - } }, new QueryablePeonModule(), new IndexingServiceFirehoseModule(), From d3252d364415596f5ac550dc5304966cd6abe8c2 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 2 Jun 2020 20:23:26 -0700 Subject: [PATCH 04/14] Fix indexer maxSize --- services/src/main/java/org/apache/druid/cli/CliIndexer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index edb7205e53be..d4a8df18606b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -27,6 +27,7 @@ import com.google.inject.name.Names; import io.airlift.airline.Command; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.DruidServerConfig; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; @@ -187,11 +188,11 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) @Provides @LazySingleton - public DataNodeService getDataNodeService() + public DataNodeService getDataNodeService(DruidServerConfig serverConfig) { return new DataNodeService( DruidServer.DEFAULT_TIER, - 0L, + serverConfig.getMaxSize(), ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_PRIORITY ); From 808d249ffae8a43158874e737d61fb159ee3a2a9 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 2 Jun 2020 20:31:22 -0700 Subject: [PATCH 05/14] inspection fix --- .../java/org/apache/druid/server/coordinator/DruidCluster.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index 7724b077e644..8fb4ccb056d4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -151,7 +151,6 @@ public NavigableSet getHistoricalsByTier(String tier) public Collection getAllServers() { final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum(); - final int brokerSize = brokers.size(); final int realtimeSize = realtimes.size(); final List allServers = new ArrayList<>(historicalSize + realtimeSize); From 9eacac68a8c2f840f7735533bb5ba727c6e137fb Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 3 Jun 2020 19:05:54 -0700 Subject: [PATCH 06/14] Make segment cache optional on non-historicals --- .../indexing/common/SegmentLoaderFactory.java | 6 +++- .../segment/loading/SegmentLoaderConfig.java | 5 ++- .../coordination/SegmentLoadDropHandler.java | 31 +++++++++++++++---- .../SegmentLoadDropHandlerTest.java | 7 +++-- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java index 17b8dc131648..54e5d31b16ad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.guice.annotations.Json; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentLoader; @@ -38,15 +39,18 @@ public class SegmentLoaderFactory { private final IndexIO indexIO; private final ObjectMapper jsonMapper; + private final ServerTypeConfig serverTypeConfig; @Inject public SegmentLoaderFactory( IndexIO indexIO, - @Json ObjectMapper mapper + @Json ObjectMapper mapper, + ServerTypeConfig serverTypeConfig ) { this.indexIO = indexIO; this.jsonMapper = mapper; + this.serverTypeConfig = serverTypeConfig; } public SegmentLoader manufacturate(File storageDir) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index c6c57233738c..39b3bde3129d 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -23,9 +23,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.druid.utils.JvmUtils; -import org.hibernate.validator.constraints.NotEmpty; import java.io.File; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -34,8 +34,7 @@ public class SegmentLoaderConfig { @JsonProperty - @NotEmpty - private List locations = null; + private List locations = Collections.emptyList(); @JsonProperty("lazyLoadOnStart") private boolean lazyLoadOnStart = false; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 51aefc1b8d67..8cb8b0bd7cea 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -34,6 +34,8 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -98,13 +100,16 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler // Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes. private final LinkedHashSet waitingFutures = new LinkedHashSet<>(); + private final ServerTypeConfig serverTypeConfig; + @Inject public SegmentLoadDropHandler( ObjectMapper jsonMapper, SegmentLoaderConfig config, DataSegmentAnnouncer announcer, DataSegmentServerAnnouncer serverAnnouncer, - SegmentManager segmentManager + SegmentManager segmentManager, + ServerTypeConfig serverTypeConfig ) { this( @@ -116,7 +121,8 @@ public SegmentLoadDropHandler( Executors.newScheduledThreadPool( config.getNumLoadingThreads(), Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s") - ) + ), + serverTypeConfig ); } @@ -127,7 +133,8 @@ public SegmentLoadDropHandler( DataSegmentAnnouncer announcer, DataSegmentServerAnnouncer serverAnnouncer, SegmentManager segmentManager, - ScheduledExecutorService exec + ScheduledExecutorService exec, + ServerTypeConfig serverTypeConfig ) { this.jsonMapper = jsonMapper; @@ -138,7 +145,15 @@ public SegmentLoadDropHandler( this.exec = exec; this.segmentsToDelete = new ConcurrentSkipListSet<>(); + this.serverTypeConfig = serverTypeConfig; + if (config.getLocations().isEmpty()) { + if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) { + throw new IAE("Segment cache locations must be set on historicals."); + } else { + log.info("Not starting SegmentLoadDropHandler with empty segment cache locations."); + } + } requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); } @@ -152,8 +167,10 @@ public void start() throws IOException log.info("Starting..."); try { - loadLocalCache(); - serverAnnouncer.announce(); + if (!config.getLocations().isEmpty()) { + loadLocalCache(); + serverAnnouncer.announce(); + } } catch (Exception e) { Throwables.propagateIfPossible(e, IOException.class); @@ -174,7 +191,9 @@ public void stop() log.info("Stopping..."); try { - serverAnnouncer.unannounce(); + if (!config.getLocations().isEmpty()) { + serverAnnouncer.unannounce(); + } } catch (Exception e) { throw new RuntimeException(e); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index f5455fbbfb29..a4d07754bf84 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -182,7 +183,8 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) } }; } - }.create(5, "SegmentLoadDropHandlerTest-[%d]") + }.create(5, "SegmentLoadDropHandlerTest-[%d]"), + new ServerTypeConfig(ServerType.HISTORICAL) ); } @@ -388,7 +390,8 @@ public int getAnnounceIntervalMillis() return 50; } }, - announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager + announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL) ); Set segments = new HashSet<>(); From eebca052d6da21027a3983856398a8a101bfc0fc Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 3 Jun 2020 20:31:17 -0700 Subject: [PATCH 07/14] Fix build --- .../apache/druid/indexing/common/SegmentLoaderFactory.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java index 54e5d31b16ad..17b8dc131648 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; -import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.guice.annotations.Json; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentLoader; @@ -39,18 +38,15 @@ public class SegmentLoaderFactory { private final IndexIO indexIO; private final ObjectMapper jsonMapper; - private final ServerTypeConfig serverTypeConfig; @Inject public SegmentLoaderFactory( IndexIO indexIO, - @Json ObjectMapper mapper, - ServerTypeConfig serverTypeConfig + @Json ObjectMapper mapper ) { this.indexIO = indexIO; this.jsonMapper = mapper; - this.serverTypeConfig = serverTypeConfig; } public SegmentLoader manufacturate(File storageDir) From ca1affea1f8e76d715729ddf6ee690c652a18eab Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 3 Jun 2020 23:27:00 -0700 Subject: [PATCH 08/14] Fix inspections, some coverage, failed tests --- .../druid/indexing/kafka/KafkaIndexTask.java | 6 ++ .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../indexing/kinesis/KinesisIndexTask.java | 6 ++ .../kinesis/KinesisIndexTaskTest.java | 2 +- .../SeekableStreamIndexTask.java | 6 -- ...penderatorDriverRealtimeIndexTaskTest.java | 1 + .../indexing/common/task/IndexTaskTest.java | 2 + .../common/task/RealtimeIndexTaskTest.java | 6 ++ server/pom.xml | 4 -- .../coordination/SegmentLoadDropHandler.java | 3 - .../coordination/UnprunedDataSegment.java | 2 +- .../SegmentLoadDropHandlerTest.java | 24 +++++++ .../coordination/ZkCoordinatorTest.java | 67 ++++++++++++++++++- 13 files changed, 113 insertions(+), 17 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 9f7f9977efb2..6bf40866b5ea 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -191,4 +191,10 @@ public String getType() { return TYPE; } + + @Override + public boolean supportsQueries() + { + return true; + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index eaa2f99a1a54..5e959a7e88c2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -344,6 +344,7 @@ public void testRunAfterDataInserted() throws Exception INPUT_FORMAT ) ); + Assert.assertTrue(task.supportsQueries()); final ListenableFuture future = runTask(task); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index bfd375830497..9eee3bf6274d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -137,6 +137,12 @@ public String getType() return TYPE; } + @Override + public boolean supportsQueries() + { + return true; + } + @VisibleForTesting AWSCredentialsConfig getAwsCredentialsConfig() { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 4daa2fdc5d9c..ab32a7344fee 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -346,8 +346,8 @@ public void testRunAfterDataInserted() throws Exception null, false ) - ); + Assert.assertTrue(task.supportsQueries()); final ListenableFuture future = runTask(task); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 029e5f9b8210..9dd4ece02c9b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -191,12 +191,6 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } - @Override - public boolean supportsQueries() - { - return true; - } - public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) { return appenderatorsManager.createRealtimeAppenderatorForTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index de3fa29a489b..513a590be9fd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -337,6 +337,7 @@ public void testBasics() throws Exception { expectPublishedSegments(1); final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null); + Assert.assertTrue(task.supportsQueries()); final ListenableFuture statusFuture = runTask(task); // Wait for firehose to show up, it starts off null. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 2cb48e67ba3f..be621e4865bf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -229,6 +229,8 @@ public void testDeterminePartitions() throws Exception appenderatorsManager ); + Assert.assertFalse(indexTask.supportsQueries()); + final List segments = runTask(indexTask).rhs; Assert.assertEquals(2, segments.size()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 8cc21d02e19a..12ea2145d582 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -189,6 +189,12 @@ public void testDefaultResource() Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup()); } + @Test(timeout = 60_000L) + public void testSupportsQueries() + { + final RealtimeIndexTask task = makeRealtimeTask(null); + Assert.assertTrue(task.supportsQueries()); + } @Test(timeout = 60_000L, expected = ExecutionException.class) public void testHandoffTimeout() throws Exception diff --git a/server/pom.xml b/server/pom.xml index 5d68a91dfd89..ddce092c1cd4 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -291,10 +291,6 @@ javax.validation validation-api - - org.hibernate - hibernate-validator - com.google.errorprone error_prone_annotations diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 8cb8b0bd7cea..87a19365e6f5 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -100,8 +100,6 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler // Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes. private final LinkedHashSet waitingFutures = new LinkedHashSet<>(); - private final ServerTypeConfig serverTypeConfig; - @Inject public SegmentLoadDropHandler( ObjectMapper jsonMapper, @@ -145,7 +143,6 @@ public SegmentLoadDropHandler( this.exec = exec; this.segmentsToDelete = new ConcurrentSkipListSet<>(); - this.serverTypeConfig = serverTypeConfig; if (config.getLocations().isEmpty()) { if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java index f5860d055670..619f983c8b92 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java +++ b/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java @@ -37,7 +37,7 @@ * A deserialization aid used by {@link SegmentChangeRequestLoad}. The broker prunes the loadSpec from segments * for efficiency reasons, but the broker does need the loadSpec when it loads broadcast segments. * - * This class always uses the non-pruning default {@link org.apache.druid.timeline.DataSegment.PruneSpecsHolder}. + * This class always uses the non-pruning default {@link PruneSpecsHolder}. */ public class UnprunedDataSegment extends DataSegment { diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index a4d07754bf84..d7344a0a8399 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.loading.CacheTestSegmentLoader; import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -46,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -76,6 +78,8 @@ public class SegmentLoadDropHandlerTest private SegmentManager segmentManager; private List scheduledRunnable; + private List locations; + @Before public void setUp() { @@ -91,6 +95,14 @@ public void setUp() throw new RuntimeException(e); } + locations = Collections.singletonList( + new StorageLocationConfig( + infoDir, + 100L, + 100d + ) + ); + scheduledRunnable = new ArrayList<>(); segmentLoader = new CacheTestSegmentLoader(); @@ -155,6 +167,12 @@ public int getAnnounceIntervalMillis() return 50; } + @Override + public List getLocations() + { + return locations; + } + @Override public int getDropSegmentDelayMillis() { @@ -384,6 +402,12 @@ public int getNumLoadingThreads() return 5; } + @Override + public List getLocations() + { + return locations; + } + @Override public int getAnnounceIntervalMillis() { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java index 9bdd84ed70fb..7753753fed02 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java @@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap; import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.CuratorTestBase; +import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -39,7 +42,11 @@ import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; @@ -47,6 +54,8 @@ */ public class ZkCoordinatorTest extends CuratorTestBase { + private static final Logger log = new Logger(ZkCoordinatorTest.class); + private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER; private final DruidServerMetadata me = new DruidServerMetadata( "dummyServer", @@ -67,9 +76,32 @@ public String getBase() }; private ZkCoordinator zkCoordinator; + private File infoDir; + private List locations; + @Before public void setUp() throws Exception { + try { + infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest"); + infoDir.mkdirs(); + for (File file : infoDir.listFiles()) { + file.delete(); + } + log.info("Creating tmp test files in [%s]", infoDir); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + locations = Collections.singletonList( + new StorageLocationConfig( + infoDir, + 100L, + 100d + ) + ); + setupServerAndCurator(); curator.start(); curator.blockUntilConnected(); @@ -102,11 +134,42 @@ public void testLoadDrop() throws Exception SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( ServerTestHelper.MAPPER, - new SegmentLoaderConfig(), + new SegmentLoaderConfig() { + @Override + public File getInfoDir() + { + return infoDir; + } + + @Override + public int getNumLoadingThreads() + { + return 5; + } + + @Override + public int getAnnounceIntervalMillis() + { + return 50; + } + + @Override + public List getLocations() + { + return locations; + } + + @Override + public int getDropSegmentDelayMillis() + { + return 0; + } + }, EasyMock.createNiceMock(DataSegmentAnnouncer.class), EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), EasyMock.createNiceMock(SegmentManager.class), - EasyMock.createNiceMock(ScheduledExecutorService.class) + EasyMock.createNiceMock(ScheduledExecutorService.class), + new ServerTypeConfig(ServerType.HISTORICAL) ) { @Override From ef2feb424f2ade2dd42babe45c497cbf90048ebd Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 3 Jun 2020 23:51:26 -0700 Subject: [PATCH 09/14] More tests --- .../SegmentLoadDropHandlerTest.java | 166 +++++++++++++----- 1 file changed, 118 insertions(+), 48 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index d7344a0a8399..1e879d1ddcf8 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -41,7 +42,9 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; @@ -70,6 +73,7 @@ public class SegmentLoadDropHandlerTest private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); private SegmentLoadDropHandler segmentLoadDropHandler; + private DataSegmentAnnouncer announcer; private File infoDir; private AtomicInteger announceCount; @@ -77,9 +81,14 @@ public class SegmentLoadDropHandlerTest private CacheTestSegmentLoader segmentLoader; private SegmentManager segmentManager; private List scheduledRunnable; - + private SegmentLoaderConfig segmentLoaderConfig; + private SegmentLoaderConfig segmentLoaderConfigNoLocations; + private ScheduledExecutorFactory scheduledExecutorFactory; private List locations; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Before public void setUp() { @@ -145,63 +154,90 @@ public void unannounceSegments(Iterable segments) } }; - segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return infoDir; - } - @Override - public int getNumLoadingThreads() - { - return 5; - } + segmentLoaderConfig = new SegmentLoaderConfig() + { + @Override + public File getInfoDir() + { + return infoDir; + } - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } + @Override + public int getNumLoadingThreads() + { + return 5; + } - @Override - public List getLocations() - { - return locations; - } + @Override + public int getAnnounceIntervalMillis() + { + return 50; + } + + @Override + public List getLocations() + { + return locations; + } + @Override + public int getDropSegmentDelayMillis() + { + return 0; + } + }; + + segmentLoaderConfigNoLocations = new SegmentLoaderConfig() + { + @Override + public int getNumLoadingThreads() + { + return 5; + } + + @Override + public int getAnnounceIntervalMillis() + { + return 50; + } + + + @Override + public int getDropSegmentDelayMillis() + { + return 0; + } + }; + + scheduledExecutorFactory = new ScheduledExecutorFactory() + { + @Override + public ScheduledExecutorService create(int corePoolSize, String nameFormat) + { + /* + Override normal behavoir by adding the runnable to a list so that you can make sure + all the shceduled runnables are executed by explicitly calling run() on each item in the list + */ + return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat)) + { @Override - public int getDropSegmentDelayMillis() + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return 0; + scheduledRunnable.add(command); + return null; } - }, + }; + } + }; + + segmentLoadDropHandler = new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfig, announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager, - new ScheduledExecutorFactory() - { - @Override - public ScheduledExecutorService create(int corePoolSize, String nameFormat) - { - /* - Override normal behavoir by adding the runnable to a list so that you can make sure - all the shceduled runnables are executed by explicitly calling run() on each item in the list - */ - return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat)) - { - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) - { - scheduledRunnable.add(command); - return null; - } - }; - } - }.create(5, "SegmentLoadDropHandlerTest-[%d]"), + scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), new ServerTypeConfig(ServerType.HISTORICAL) ); } @@ -240,6 +276,40 @@ Because another addSegment() call is executed, which removes the segment from se segmentLoadDropHandler.stop(); } + @Test + public void testSegmentLoading1BrokerWithNoLocations() throws Exception + { + SegmentLoadDropHandler segmentLoadDropHandlerBrokerWithNoLocations = new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfigNoLocations, + announcer, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + segmentManager, + scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-brokerNoLocations-[%d]"), + new ServerTypeConfig(ServerType.BROKER) + ); + + segmentLoadDropHandlerBrokerWithNoLocations.start(); + segmentLoadDropHandler.stop(); + } + + @Test + public void testSegmentLoading1HistoricalWithNoLocations() throws Exception + { + expectedException.expect(IAE.class); + expectedException.expectMessage("Segment cache locations must be set on historicals."); + + new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfigNoLocations, + announcer, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + segmentManager, + scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), + new ServerTypeConfig(ServerType.HISTORICAL) + ); + } + /** * Steps: * 1. addSegment() succesfully loads the segment and annouces it From 4ce47af9843f966cb2f15e3f423f1f2c78e98ca5 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 3 Jun 2020 23:56:47 -0700 Subject: [PATCH 10/14] Add CliIndexer to MainTest --- services/src/test/java/org/apache/druid/cli/MainTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/src/test/java/org/apache/druid/cli/MainTest.java b/services/src/test/java/org/apache/druid/cli/MainTest.java index 3e960f69e442..a1fbed581e5d 100644 --- a/services/src/test/java/org/apache/druid/cli/MainTest.java +++ b/services/src/test/java/org/apache/druid/cli/MainTest.java @@ -50,7 +50,9 @@ public static Iterable constructorFeeder() //new Object[]{new CliInternalHadoopIndexer()}, new Object[]{new CliMiddleManager()}, - new Object[]{new CliRouter()} + new Object[]{new CliRouter()}, + + new Object[]{new CliIndexer()} ); } From 6b439d89818c75275ee810bc09ae99617ab52eb5 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 4 Jun 2020 03:42:09 -0700 Subject: [PATCH 11/14] Fix inspection --- .../druid/server/coordination/SegmentLoadDropHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 1e879d1ddcf8..c5192c248c59 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -294,7 +294,7 @@ public void testSegmentLoading1BrokerWithNoLocations() throws Exception } @Test - public void testSegmentLoading1HistoricalWithNoLocations() throws Exception + public void testSegmentLoading1HistoricalWithNoLocations() { expectedException.expect(IAE.class); expectedException.expectMessage("Segment cache locations must be set on historicals."); From 8d2de37b3999600e6150a7d56c8f895996ea6aab Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 5 Jun 2020 13:13:57 -0700 Subject: [PATCH 12/14] Rename UnprunedDataSegment to LoadableDataSegment --- .../{UnprunedDataSegment.java => LoadableDataSegment.java} | 4 ++-- .../druid/server/coordination/SegmentChangeRequestLoad.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename server/src/main/java/org/apache/druid/server/coordination/{UnprunedDataSegment.java => LoadableDataSegment.java} (97%) diff --git a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java similarity index 97% rename from server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java rename to server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java index 619f983c8b92..4f4f7a5b1d19 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/UnprunedDataSegment.java +++ b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java @@ -39,10 +39,10 @@ * * This class always uses the non-pruning default {@link PruneSpecsHolder}. */ -public class UnprunedDataSegment extends DataSegment +public class LoadableDataSegment extends DataSegment { @JsonCreator - public UnprunedDataSegment( + public LoadableDataSegment( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java index a6db9bd0135f..130c7b50d80c 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java @@ -37,11 +37,11 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest /** * To avoid pruning of the loadSpec on the broker, needed when the broker is loading broadcast segments, - * we deserialize into an {@link UnprunedDataSegment}, which never removes the loadSpec. + * we deserialize into an {@link LoadableDataSegment}, which never removes the loadSpec. */ @JsonCreator public SegmentChangeRequestLoad( - @JsonUnwrapped UnprunedDataSegment segment + @JsonUnwrapped LoadableDataSegment segment ) { this.segment = segment; From 5f2e73ee215d37f6cbf254fe12a29f9477f20177 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 8 Jun 2020 17:48:13 -0700 Subject: [PATCH 13/14] Address PR comments --- .../druid/server/coordinator/duty/RunRules.java | 3 +++ .../coordination/SegmentLoadDropHandlerTest.java | 10 +++++----- .../druid/server/coordination/ZkCoordinatorTest.java | 11 ++++++----- .../apache/druid/sql/calcite/CalciteQueryTest.java | 2 +- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 125fcafbc21a..3dc7b4d2f918 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -115,6 +115,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if (rule.appliesTo(segment, now)) { stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); foundMatchingRule = true; + + // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules + // executes before BalanceSegments if (rule instanceof BroadcastDistributionRule) { broadcastDatasources.add(segment.getDataSource()); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index c5192c248c59..6d8ef0a8cc4e 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -45,6 +45,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -89,15 +90,14 @@ public class SegmentLoadDropHandlerTest @Rule public ExpectedException expectedException = ExpectedException.none(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Before public void setUp() { try { - infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest"); - infoDir.mkdirs(); - for (File file : infoDir.listFiles()) { - file.delete(); - } + infoDir = temporaryFolder.newFolder(); log.info("Creating tmp test files in [%s]", infoDir); } catch (IOException e) { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java index 7753753fed02..c30a37cb01ad 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java @@ -40,7 +40,9 @@ import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -79,15 +81,14 @@ public String getBase() private File infoDir; private List locations; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Before public void setUp() throws Exception { try { - infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest"); - infoDir.mkdirs(); - for (File file : infoDir.listFiles()) { - file.delete(); - } + infoDir = temporaryFolder.newFolder(); log.info("Creating tmp test files in [%s]", infoDir); } catch (IOException e) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 94edd222dc67..d0aa643e6100 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -11489,7 +11489,7 @@ public void testNestedGroupByOnInlineDataSourceWithFilterIsNotSupported(Map= '2001-01-02'" + ")" + ", def as" From 2d005ed185100daf89c04097b9631a22d70ad947 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 8 Jun 2020 17:50:00 -0700 Subject: [PATCH 14/14] Fix --- .../java/org/apache/druid/sql/calcite/CalciteQueryTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index d0aa643e6100..09d47f2ec92b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -11484,12 +11484,12 @@ public void testTimeExtractWithTooFewArguments() throws Exception @Test @Parameters(source = QueryContextForJoinProvider.class) - public void testNestedGroupByOnInlineDataSourceWithFilterIsNotSupported(Map queryContext) throws Exception + public void testNestedGroupByOnInlineDataSourceWithFilter(Map queryContext) throws Exception { try { testQuery( "with abc as" - + "(Run" + + "(" + " SELECT dim1, m2 from druid.foo where \"__time\" >= '2001-01-02'" + ")" + ", def as"