From f4144e7baad24bfcc7b9f6915c356c505ef079f1 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 4 Apr 2017 17:56:46 +0900 Subject: [PATCH 1/7] Add ServerType --- .../java/io/druid/client/DruidServer.java | 3 +- .../io/druid/client/ImmutableDruidServer.java | 3 +- .../BatchDataSegmentAnnouncer.java | 2 +- .../coordination/DruidServerMetadata.java | 8 ++-- .../druid/server/coordination/ServerType.java | 44 +++++++++++++++++++ .../server/coordinator/DruidCluster.java | 2 + .../helper/DruidCoordinatorLogger.java | 2 +- .../server/coordination/ServerTypeTest.java | 42 ++++++++++++++++++ .../java/io/druid/guice/RealtimeModule.java | 4 ++ 9 files changed, 102 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordination/ServerType.java create mode 100644 server/src/test/java/io/druid/server/coordination/ServerTypeTest.java diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index 1a05b29422cc..ca5bb3caecdc 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.server.DruidNode; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import java.util.Collections; @@ -111,7 +112,7 @@ public long getMaxSize() return metadata.getMaxSize(); } - public String getType() + public ServerType getType() { return metadata.getType(); } diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index 551025c148ef..79f054ae89ad 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import java.util.Map; @@ -72,7 +73,7 @@ public long getMaxSize() return metadata.getMaxSize(); } - public String getType() + public ServerType getType() { return metadata.getType(); } diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 08e3bca4cbfd..889ba26258fa 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -231,7 +231,7 @@ private String makeServedSegmentPath() // server.getName() is already in the zk path return makeServedSegmentPath(UUIDUtils.generateUuid( server.getHost(), - server.getType(), + server.getType().toString(), server.getTier(), new DateTime().toString() )); diff --git a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java index 9094d1009980..00441fd1892f 100644 --- a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java @@ -30,7 +30,7 @@ public class DruidServerMetadata private final String host; private final long maxSize; private final String tier; - private final String type; + private final ServerType type; private final int priority; @JsonCreator @@ -47,7 +47,7 @@ public DruidServerMetadata( this.host = host; this.maxSize = maxSize; this.tier = tier; - this.type = type; + this.type = ServerType.fromString(type); this.priority = priority; } @@ -76,7 +76,7 @@ public String getTier() } @JsonProperty - public String getType() + public ServerType getType() { return type; } @@ -89,7 +89,7 @@ public int getPriority() public boolean isAssignable() { - return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge"); + return type.isAssignable(); } @Override diff --git a/server/src/main/java/io/druid/server/coordination/ServerType.java b/server/src/main/java/io/druid/server/coordination/ServerType.java new file mode 100644 index 000000000000..8664b8032d8b --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/ServerType.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +public enum ServerType +{ + HISTORICAL(true), + BRIDGE(true), + REALTIME(false); + + private boolean assignable; + + boolean isAssignable() + { + return assignable; + } + + ServerType(boolean assignable) + { + this.assignable = assignable; + } + + static ServerType fromString(String type) + { + return ServerType.valueOf(type.toUpperCase()); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java index cbca241f9736..8befcf5f77a1 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java @@ -19,6 +19,7 @@ package io.druid.server.coordinator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; @@ -39,6 +40,7 @@ public DruidCluster() this.cluster = Maps.newHashMap(); } + @VisibleForTesting public DruidCluster(Map> cluster) { this.cluster = cluster; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index 55ab2f0a84d7..ede73c20c184 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -194,7 +194,7 @@ public Number transformEntry(String key, AtomicLong value) log.info( "Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d bytes queued, %,d bytes served.", server.getName(), - server.getType(), + server.getType().toString(), server.getTier(), queuePeon.getSegmentsToLoad().size(), queuePeon.getSegmentsToDrop().size(), diff --git a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java new file mode 100644 index 000000000000..39f65dcd52b4 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import org.junit.Assert; +import org.junit.Test; + +public class ServerTypeTest +{ + @Test + public void testAssignable() + { + Assert.assertTrue(ServerType.HISTORICAL.isAssignable()); + Assert.assertTrue(ServerType.BRIDGE.isAssignable()); + Assert.assertFalse(ServerType.REALTIME.isAssignable()); + } + + @Test + public void testFromString() + { + Assert.assertEquals(ServerType.HISTORICAL, ServerType.fromString("historical")); + Assert.assertEquals(ServerType.BRIDGE, ServerType.fromString("bridge")); + Assert.assertEquals(ServerType.REALTIME, ServerType.fromString("realtime")); + } +} \ No newline at end of file diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index aa0e357fc493..f639e0bade31 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -39,6 +39,7 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.coordination.ZkCoordinator; import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -107,5 +108,8 @@ public void configure(Binder binder) Jerseys.addResource(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, Server.class); + + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + LifecycleModule.register(binder, ZkCoordinator.class); } } From ce796e3288a17d3b482fe7a2a050f5d67ae4c973 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 4 Apr 2017 18:09:57 +0900 Subject: [PATCH 2/7] Add realtimes to DruidCluster --- .../server/coordinator/DruidCluster.java | 87 ++++++++++++++----- .../coordinator/SegmentReplicantLookup.java | 2 +- .../helper/DruidCoordinatorBalancer.java | 2 +- .../DruidCoordinatorCleanupOvershadowed.java | 2 +- .../DruidCoordinatorCleanupUnneeded.java | 2 +- .../helper/DruidCoordinatorLogger.java | 8 +- .../server/coordinator/rules/LoadRule.java | 7 +- .../server/coordination/ServerTypeTest.java | 8 +- .../DruidCoordinatorBalancerProfiler.java | 3 + .../DruidCoordinatorBalancerTest.java | 3 + .../DruidCoordinatorRuleRunnerTest.java | 14 +++ ...uidCoordinatorCleanupOvershadowedTest.java | 1 + .../coordinator/rules/LoadRuleTest.java | 4 + 13 files changed, 106 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java index 8befcf5f77a1..ba15a2983018 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java @@ -20,12 +20,16 @@ package io.druid.server.coordinator; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import io.druid.client.ImmutableDruidServer; +import io.druid.java.util.common.IAE; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * Contains a representation of the current state of the cluster by tier. @@ -33,63 +37,100 @@ */ public class DruidCluster { - private final Map> cluster; + private final Set realtimes; + private final Map> historicals; public DruidCluster() { - this.cluster = Maps.newHashMap(); + this.realtimes = new HashSet<>(); + this.historicals = new HashMap<>(); } @VisibleForTesting - public DruidCluster(Map> cluster) + public DruidCluster( + @Nullable Set realtimes, + Map> historicals + ) { - this.cluster = cluster; + this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); + this.historicals = historicals; } public void add(ServerHolder serverHolder) { - ImmutableDruidServer server = serverHolder.getServer(); - MinMaxPriorityQueue tierServers = cluster.get(server.getTier()); - if (tierServers == null) { - tierServers = MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(); - cluster.put(server.getTier(), tierServers); + switch (serverHolder.getServer().getType()) { + case HISTORICAL: + addHistorical(serverHolder); + break; + case REALTIME: + addRealtime(serverHolder); + break; + case BRIDGE: + addHistorical(serverHolder); + break; + default: + throw new IAE("unknown server type[%s]", serverHolder.getServer().getType()); } + } + + private void addRealtime(ServerHolder serverHolder) + { + realtimes.add(serverHolder); + } + + private void addHistorical(ServerHolder serverHolder) + { + final ImmutableDruidServer server = serverHolder.getServer(); + final MinMaxPriorityQueue tierServers = historicals.computeIfAbsent( + server.getTier(), + k -> MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create() + ); tierServers.add(serverHolder); } - public Map> getCluster() + public Set getRealtimes() + { + return realtimes; + } + + public Map> getHistoricals() { - return cluster; + return historicals; } public Iterable getTierNames() { - return cluster.keySet(); + return historicals.keySet(); } - public MinMaxPriorityQueue getServersByTier(String tier) + public MinMaxPriorityQueue getHistoricalsByTier(String tier) { - return cluster.get(tier); + return historicals.get(tier); } - public Iterable> getSortedServersByTier() + public Iterable> getSortedHistoricalsByTier() { - return cluster.values(); + return historicals.values(); } public boolean isEmpty() { - return cluster.isEmpty(); + return historicals.isEmpty() && realtimes.isEmpty(); } - public boolean hasTier(String tier) + public boolean hasHistoricals() { - MinMaxPriorityQueue servers = cluster.get(tier); - return (servers == null) || servers.isEmpty(); + return !historicals.isEmpty(); + } + + public boolean hasRealtimes() + { + return !realtimes.isEmpty(); } - public MinMaxPriorityQueue get(String tier) + public boolean hasTier(String tier) { - return cluster.get(tier); + MinMaxPriorityQueue servers = historicals.get(tier); + return (servers == null) || servers.isEmpty(); } } diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index 64b76b8fd070..785fce77feeb 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -38,7 +38,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster) final Table segmentsInCluster = HashBasedTable.create(); final Table loadingSegments = HashBasedTable.create(); - for (MinMaxPriorityQueue serversByType : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 67e750dd927f..d503c1d6a939 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -88,7 +88,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); for (Map.Entry> entry : - params.getDruidCluster().getCluster().entrySet()) { + params.getDruidCluster().getHistoricals().entrySet()) { String tier = entry.getKey(); if (currentlyMovingSegments.get(tier) == null) { diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 68a83c3a9193..2819a51ec4cd 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -55,7 +55,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) DruidCluster cluster = params.getDruidCluster(); Map> timelines = Maps.newHashMap(); - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java index ee4fcc3dd799..89b4deb3175e 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java @@ -64,7 +64,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // This is done to prevent a race condition in which the coordinator would drop all segments if it started running // cleanup before it finished polling the metadata storage for available segments for the first time. if (!availableSegments.isEmpty()) { - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index ede73c20c184..c8b722adb8ac 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -84,7 +84,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (Map.Entry entry : assigned.entrySet()) { log.info( "[%s] : Assigned %s segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size() + entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() ); } } @@ -99,7 +99,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (Map.Entry entry : dropped.entrySet()) { log.info( "[%s] : Dropped %s segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size() + entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() ); } } @@ -152,7 +152,7 @@ public Number transformEntry(String key, AtomicLong value) for (Map.Entry entry : unneeded.entrySet()) { log.info( "[%s] : Removed %s unneeded segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size() + entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() ); } } @@ -187,7 +187,7 @@ public Number transformEntry(String key, AtomicLong value) } } log.info("Load Queues:"); - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); LoadQueuePeon queuePeon = serverHolder.getPeon(); diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index d7e8e73015dc..5f1ecdeb8719 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -23,7 +23,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; - import io.druid.java.util.common.IAE; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -64,7 +63,7 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim final int loadedReplicantsInTier = params.getSegmentReplicantLookup() .getLoadedReplicants(segment.getIdentifier(), tier); - final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); + final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); if (serverQueue == null) { log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); continue; @@ -176,8 +175,6 @@ private CoordinatorStats drop( } } - final ReplicationThrottler replicationManager = params.getReplicationManager(); - // Find all instances of this segment across tiers Map replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier()); @@ -188,7 +185,7 @@ private CoordinatorStats drop( stats.addToTieredStat(droppedCount, tier, 0); - MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); + MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); if (serverQueue == null) { log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit(); continue; diff --git a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java index 39f65dcd52b4..68b665e05b42 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java @@ -39,4 +39,10 @@ public void testFromString() Assert.assertEquals(ServerType.BRIDGE, ServerType.fromString("bridge")); Assert.assertEquals(ServerType.REALTIME, ServerType.fromString("realtime")); } -} \ No newline at end of file + + @Test(expected = IllegalArgumentException.class) + public void testInvalidName() + { + ServerType.fromString("invalid"); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java index 523a7b08e3f3..afda7460561a 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java @@ -134,6 +134,7 @@ public void bigProfiler() DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -161,6 +162,7 @@ public void bigProfiler() .withSegmentReplicantLookup( SegmentReplicantLookup.make( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -216,6 +218,7 @@ public void profileRun() DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 29ca9b4d33c0..d699f2369df2 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -179,6 +179,7 @@ public void testMoveToEmptyServerBalancer() throws IOException DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -260,6 +261,7 @@ public void testRun1() throws IOException DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -354,6 +356,7 @@ public void testRun2() throws IOException DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index f82cb5eb8507..5d47d8dae14e 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -130,6 +130,7 @@ public void testRunThreeTiersOneReplicant() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -237,6 +238,7 @@ public void testRunTwoTiersTwoReplicants() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -349,6 +351,7 @@ public void testRunTwoTiersWithExistingSegments() throws Exception } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -430,6 +433,7 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -489,6 +493,7 @@ public void testRunRuleDoesNotExist() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -562,6 +567,7 @@ public void testDropRemove() throws Exception } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -642,6 +648,7 @@ public void testDropTooManyInSameTier() throws Exception } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -728,6 +735,7 @@ public void testDropTooManyInDifferentTiers() throws Exception } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -815,6 +823,7 @@ public void testDontDropInDifferentTiers() throws Exception server2.addDataSegment(segment.getIdentifier(), segment); } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -916,6 +925,7 @@ public void testDropServerActuallyServesSegment() throws Exception EasyMock.replay(anotherMockPeon); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -988,6 +998,7 @@ public void testReplicantThrottle() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -1112,6 +1123,7 @@ public void testReplicantThrottleAcrossTiers() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -1231,6 +1243,7 @@ public void testDropReplicantThrottle() throws Exception } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -1317,6 +1330,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( DruidServer.DEFAULT_TIER, MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java index 955d1de29969..34db007c3ae8 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java @@ -70,6 +70,7 @@ public void testRun() availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); druidCluster = new DruidCluster( + null, ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(Arrays.asList( new ServerHolder(druidServer, mockPeon ))))); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 9e6d77dd5599..179e6535bf53 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -157,6 +157,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) }; DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -281,6 +282,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ); server2.addDataSegment(segment.getIdentifier(), segment); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -373,6 +375,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) }; DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -481,6 +484,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) server2.addDataSegment(segment.getIdentifier(), segment); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( From 430021c490bdc756987975ccd0fa5661f2409334 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 4 Apr 2017 21:15:23 +0900 Subject: [PATCH 3/7] fix test fails --- .../client/client/BatchServerInventoryViewTest.java | 4 ++-- .../client/client/ImmutableSegmentLoadInfoTest.java | 2 +- .../java/io/druid/server/ClientInfoResourceTest.java | 2 +- .../druid/server/coordination/ZkCoordinatorTest.java | 4 ++-- .../coordination/BatchDataSegmentAnnouncerTest.java | 2 +- .../server/coordinator/CostBalancerStrategyTest.java | 2 +- .../DiskNormalizedCostBalancerStrategyTest.java | 2 +- .../druid/server/coordinator/DruidCoordinatorTest.java | 4 ++-- .../java/io/druid/server/http/ServersResourceTest.java | 10 +++++----- .../sql/calcite/util/TestServerInventoryView.java | 2 +- 10 files changed, 17 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index 2a0a99a8125b..b1fe39df3d5b 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -122,7 +122,7 @@ public void setUp() throws Exception "id", "host", Long.MAX_VALUE, - "type", + "historical", "tier", 0 ), @@ -435,7 +435,7 @@ public BatchDataSegmentAnnouncer call() "id", "host", Long.MAX_VALUE, - "type", + "historical", "tier", 0 ), diff --git a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java index 26f09e86b8cd..c291a2723182 100644 --- a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java +++ b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java @@ -51,7 +51,7 @@ public void testSerde() throws IOException null, NoneShardSpec.instance(), 0, 0 - ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1)) + ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "historical", "tier", 1)) ); ImmutableSegmentLoadInfo serde = mapper.readValue( diff --git a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java index af02fad8e669..fd0bdd3da721 100644 --- a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java @@ -79,7 +79,7 @@ public class ClientInfoResourceTest public void setup() { VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - DruidServer server = new DruidServer("name", "host", 1234, "type", "tier", 0); + DruidServer server = new DruidServer("name", "host", 1234, "historical", "tier", 0); addSegment(timeline, server, "1960-02-13/1961-02-14", ImmutableList.of("d5"), ImmutableList.of("m5"), "v0"); diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index bfafb38c14c1..974e9d28954d 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -81,7 +81,7 @@ public class ZkCoordinatorTest extends CuratorTestBase "dummyServer", "dummyHost", 0, - "dummyType", + "historical", "normal", 0 ); @@ -516,7 +516,7 @@ public String getBase() } ); binder.bind(DruidServerMetadata.class) - .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0)); + .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "historical", "normal", 0)); binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); binder.bind(CuratorFramework.class).toInstance(curator); binder.bind(ServerManager.class).toInstance(serverManager); diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 359631aa8322..09b3b7a94e58 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -104,7 +104,7 @@ public void setUp() throws Exception "id", "host", Long.MAX_VALUE, - "type", + "historical", "tier", 0 ), diff --git a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java index 2b4a098a79cc..6accfee67817 100644 --- a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java @@ -61,7 +61,7 @@ public static List setupDummyCluster(int serverCount, int maxSegme serverHolderList.add( new ServerHolder( new ImmutableDruidServer( - new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "historical", "hot", 1), 3000L, ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), ImmutableMap.copyOf(segments) diff --git a/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java index b869c83ae1d1..85ff7cf08653 100644 --- a/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java @@ -60,7 +60,7 @@ public static List setupDummyCluster(int serverCount, int maxSegme serverHolderList.add( new ServerHolder( new ImmutableDruidServer( - new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "historical", "hot", 1), 3000L, ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), ImmutableMap.copyOf(segments) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index d48420d6756c..47ddc9c75701 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -217,7 +217,7 @@ public void testMoveSegment() throws Exception EasyMock.replay(metadataRuleManager); EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn( new ImmutableDruidServer( - new DruidServerMetadata("from", null, 5L, null, null, 0), + new DruidServerMetadata("from", null, 5L, "historical", null, 0), 1L, null, ImmutableMap.of("dummySegment", segment) @@ -228,7 +228,7 @@ public void testMoveSegment() throws Exception druidServer2 = EasyMock.createMock(DruidServer.class); EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn( new ImmutableDruidServer( - new DruidServerMetadata("to", null, 5L, null, null, 0), + new DruidServerMetadata("to", null, 5L, "historical", null, 0), 1L, null, ImmutableMap.of("dummySegment2", segment) diff --git a/server/src/test/java/io/druid/server/http/ServersResourceTest.java b/server/src/test/java/io/druid/server/http/ServersResourceTest.java index 0695bf6750bf..a8dd7d0a8f8b 100644 --- a/server/src/test/java/io/druid/server/http/ServersResourceTest.java +++ b/server/src/test/java/io/druid/server/http/ServersResourceTest.java @@ -41,7 +41,7 @@ public class ServersResourceTest { @Before public void setUp() { - DruidServer dummyServer = new DruidServer("dummy", "host", 1234L, "type", "tier", 0); + DruidServer dummyServer = new DruidServer("dummy", "host", 1234L, "historical", "tier", 0); DataSegment segment = DataSegment.builder() .dataSource("dataSource") .interval(new Interval("2016-03-22T14Z/2016-03-22T15Z")) @@ -65,7 +65,7 @@ public void testGetClusterServersFull() throws Exception String result = objectMapper.writeValueAsString(res.getEntity()); String expected = "[{\"host\":\"host\"," + "\"maxSize\":1234," - + "\"type\":\"type\"," + + "\"type\":\"HISTORICAL\"," + "\"tier\":\"tier\"," + "\"priority\":0," + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":" @@ -80,7 +80,7 @@ public void testGetClusterServersSimple() throws Exception { Response res = serversResource.getClusterServers(null, "simple"); String result = objectMapper.writeValueAsString(res.getEntity()); - String expected = "[{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"type\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}]"; + String expected = "[{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"HISTORICAL\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}]"; Assert.assertEquals(expected, result); } @@ -91,7 +91,7 @@ public void testGetServerFull() throws Exception String result = objectMapper.writeValueAsString(res.getEntity()); String expected = "{\"host\":\"host\"," + "\"maxSize\":1234," - + "\"type\":\"type\"," + + "\"type\":\"HISTORICAL\"," + "\"tier\":\"tier\"," + "\"priority\":0," + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":" @@ -106,7 +106,7 @@ public void testGetServerSimple() throws Exception { Response res = serversResource.getServer(server.getName(), "simple"); String result = objectMapper.writeValueAsString(res.getEntity()); - String expected = "{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"type\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}"; + String expected = "{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"HISTORICAL\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}"; Assert.assertEquals(expected, result); } diff --git a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java index 193cc91c18c1..d61a93853fa6 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java @@ -51,7 +51,7 @@ public TimelineLookup getTimeline(DataSource dataSource) @Override public void registerSegmentCallback(Executor exec, final SegmentCallback callback) { - final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", 0, "dummy", "dummy", 0); + final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", 0, "historical", "dummy", 0); for (final DataSegment segment : segments) { exec.execute( From 6a220e10640ba2ffb10d1944adb5249a72c5f598 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 5 Apr 2017 19:11:23 +0900 Subject: [PATCH 4/7] Add SegmentManager --- .../druid/client/CachingClusteredClient.java | 4 +- .../java/io/druid/client/DruidServer.java | 4 +- .../client/ImmutableDruidDataSource.java | 10 + .../io/druid/client/ImmutableDruidServer.java | 10 + .../segment/realtime/RealtimeManager.java | 15 +- ...oordinatorBasedSegmentHandoffNotifier.java | 2 +- .../java/io/druid/server/SegmentManager.java | 191 ++++++++++++++++++ .../coordination/DruidServerMetadata.java | 4 +- .../server/coordination/ServerManager.java | 160 +-------------- .../druid/server/coordination/ServerType.java | 32 ++- .../server/coordination/ZkCoordinator.java | 13 +- .../server/coordinator/DruidCoordinator.java | 2 +- .../metrics/HistoricalMetricsMonitor.java | 12 +- .../segment/realtime/RealtimeManagerTest.java | 7 +- .../coordination/ServerManagerTest.java | 15 +- .../server/coordination/ServerTypeTest.java | 6 +- .../coordination/ZkCoordinatorTest.java | 26 ++- .../metrics/HistoricalMetricsMonitorTest.java | 16 +- 18 files changed, 319 insertions(+), 210 deletions(-) create mode 100644 server/src/main/java/io/druid/server/SegmentManager.java diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index b15e912fd3f3..27d8acb31dc7 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -270,7 +270,7 @@ public ShardSpec apply(PartitionChunk input) Hasher hasher = Hashing.sha1().newHasher(); boolean hasOnlyHistoricalSegments = true; for (Pair p : segments) { - if (!p.lhs.pick().getServer().isAssignable()) { + if (!p.lhs.pick().getServer().segmentReplicatable()) { hasOnlyHistoricalSegments = false; break; } @@ -427,7 +427,7 @@ private void addSequencesFromServer(ArrayList> listOfSequences) final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); final Sequence resultSeqToAdd; - if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable + if (!server.segmentReplicatable() || !populateCache || isBySegment) { // Direct server queryable if (!isBySegment) { resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); } else { diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index ca5bb3caecdc..100895a6f8ba 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -122,9 +122,9 @@ public String getTier() return metadata.getTier(); } - public boolean isAssignable() + public boolean segmentReplicatable() { - return metadata.isAssignable(); + return metadata.segmentReplicatable(); } public int getPriority() diff --git a/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java index 1c7435e0e755..247d1bae0400 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java @@ -72,4 +72,14 @@ public Set getSegments() { return segmentsHolder; } + + @Override + public String toString() + { + return "ImmutableDruidDataSource{" + + "name='" + name + + "', segments='" + segmentsHolder + + "', properties='" + properties + + "'}"; + } } diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index 79f054ae89ad..8f17aa67c497 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -102,4 +102,14 @@ public Map getSegments() { return segments; } + + @Override + public String toString() + { + return "ImmutableDruidServer{" + + "meta='" + metadata + + "', size='" + currSize + + "', sources='" + dataSources + + "'}"; + } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index f13752b6006c..a00f2c5b2e27 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -50,6 +51,7 @@ import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumbers; +import io.druid.server.SegmentManager; import org.joda.time.Interval; import java.io.Closeable; @@ -72,27 +74,34 @@ public class RealtimeManager implements QuerySegmentWalker */ private final Map> chiefs; + private final SegmentManager segmentManager; + @Inject public RealtimeManager( List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate + QueryRunnerFactoryConglomerate conglomerate, + SegmentManager segmentManager ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; this.chiefs = Maps.newHashMap(); + this.segmentManager = segmentManager; } + @VisibleForTesting RealtimeManager( List fireDepartments, QueryRunnerFactoryConglomerate conglomerate, - Map> chiefs + Map> chiefs, + SegmentManager segmentManager ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; - this.chiefs = chiefs; + this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs); + this.segmentManager = segmentManager; } @LifecycleStart diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java index f77c3d92ffa3..b2bdead3ef2f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -148,7 +148,7 @@ static boolean isHandOffComplete(List serverView, Segm @Override public boolean apply(DruidServerMetadata input) { - return input.isAssignable(); + return input.segmentReplicatable(); } } )) { diff --git a/server/src/main/java/io/druid/server/SegmentManager.java b/server/src/main/java/io/druid/server/SegmentManager.java new file mode 100644 index 000000000000..d8c0dbce3f73 --- /dev/null +++ b/server/src/main/java/io/druid/server/SegmentManager.java @@ -0,0 +1,191 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import io.druid.collections.CountingMap; +import io.druid.segment.ReferenceCountingSegment; +import io.druid.segment.Segment; +import io.druid.segment.loading.SegmentLoader; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; +import io.druid.timeline.partition.PartitionHolder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class SegmentManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentManager.class); + + private final Object lock = new Object(); + private final SegmentLoader segmentLoader; + private final Map> dataSources = new HashMap<>(); + private final CountingMap dataSourceSizes = new CountingMap<>(); + private final CountingMap dataSourceCounts = new CountingMap<>(); + + @Inject + public SegmentManager( + SegmentLoader segmentLoader + ) + { + this.segmentLoader = segmentLoader; + } + + public Map getDataSourceSizes() + { + synchronized (dataSourceSizes) { + return dataSourceSizes.snapshot(); + } + } + + public Map getDataSourceCounts() + { + synchronized (dataSourceCounts) { + return dataSourceCounts.snapshot(); + } + } + + public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException + { + return segmentLoader.isSegmentLoaded(segment); + } + + public Map> getDataSources() + { + synchronized (lock) { + return ImmutableMap.copyOf(dataSources); + } + } + + /** + * Load a single segment. + * + * @param segment segment to load + * + * @return true if the segment was newly loaded, false if it was already loaded + * + * @throws SegmentLoadingException if the segment cannot be loaded + */ + public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException + { + final Segment adapter; + try { + adapter = segmentLoader.getSegment(segment); + } + catch (SegmentLoadingException e) { + try { + segmentLoader.cleanup(segment); + } + catch (SegmentLoadingException e1) { + // ignore + } + throw e; + } + + if (adapter == null) { + throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); + } + + synchronized (lock) { + final String dataSource = segment.getDataSource(); + final VersionedIntervalTimeline loadedIntervals = dataSources.computeIfAbsent( + dataSource, + k -> new VersionedIntervalTimeline<>(Ordering.natural()) + ); + + final PartitionHolder entry = loadedIntervals.findEntry( + segment.getInterval(), + segment.getVersion() + ); + if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { + log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); + return false; + } + + loadedIntervals.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)) + ); + synchronized (dataSourceSizes) { + dataSourceSizes.add(dataSource, segment.getSize()); + } + synchronized (dataSourceCounts) { + dataSourceCounts.add(dataSource, 1L); + } + return true; + } + } + + public void dropSegment(final DataSegment segment) throws SegmentLoadingException + { + String dataSource = segment.getDataSource(); + synchronized (lock) { + VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); + + if (loadedIntervals == null) { + log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource); + return; + } + + PartitionChunk removed = loadedIntervals.remove( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(null) + ); + ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); + + if (oldQueryable != null) { + synchronized (dataSourceSizes) { + dataSourceSizes.add(dataSource, -segment.getSize()); + } + synchronized (dataSourceCounts) { + dataSourceCounts.add(dataSource, -1L); + } + + try { + log.info("Attempting to close segment %s", segment.getIdentifier()); + oldQueryable.close(); + } + catch (IOException e) { + log.makeAlert(e, "Exception closing segment") + .addData("dataSource", dataSource) + .addData("segmentId", segment.getIdentifier()) + .emit(); + } + } else { + log.info( + "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", + dataSource, + segment.getInterval(), + segment.getVersion() + ); + } + } + segmentLoader.cleanup(segment); + } +} diff --git a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java index 00441fd1892f..d2f7449670bc 100644 --- a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java @@ -87,9 +87,9 @@ public int getPriority() return priority; } - public boolean isAssignable() + public boolean segmentReplicatable() { - return type.isAssignable(); + return type.segmentReplicatable(); } @Override diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 65d94aab0d16..3613306333af 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -22,14 +22,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.collections.CountingMap; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Smile; @@ -55,10 +53,7 @@ import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; -import io.druid.segment.Segment; -import io.druid.segment.loading.SegmentLoader; -import io.druid.segment.loading.SegmentLoadingException; -import io.druid.timeline.DataSegment; +import io.druid.server.SegmentManager; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -66,10 +61,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; @@ -78,32 +70,27 @@ public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); - private final Object lock = new Object(); - private final SegmentLoader segmentLoader; private final QueryRunnerFactoryConglomerate conglomerate; private final ServiceEmitter emitter; private final ExecutorService exec; private final ExecutorService cachingExec; - private final Map> dataSources; - private final CountingMap dataSourceSizes = new CountingMap(); - private final CountingMap dataSourceCounts = new CountingMap(); private final Cache cache; private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; + private final SegmentManager segmentManager; @Inject public ServerManager( - SegmentLoader segmentLoader, QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, @Processing ExecutorService exec, @BackgroundCaching ExecutorService cachingExec, @Smile ObjectMapper objectMapper, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + SegmentManager segmentManager ) { - this.segmentLoader = segmentLoader; this.conglomerate = conglomerate; this.emitter = emitter; @@ -112,137 +99,8 @@ public ServerManager( this.cache = cache; this.objectMapper = objectMapper; - this.dataSources = new HashMap<>(); this.cacheConfig = cacheConfig; - } - - public Map getDataSourceSizes() - { - synchronized (dataSourceSizes) { - return dataSourceSizes.snapshot(); - } - } - - public Map getDataSourceCounts() - { - synchronized (dataSourceCounts) { - return dataSourceCounts.snapshot(); - } - } - - public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException - { - return segmentLoader.isSegmentLoaded(segment); - } - - /** - * Load a single segment. - * - * @param segment segment to load - * - * @return true if the segment was newly loaded, false if it was already loaded - * - * @throws SegmentLoadingException if the segment cannot be loaded - */ - public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException - { - final Segment adapter; - try { - adapter = segmentLoader.getSegment(segment); - } - catch (SegmentLoadingException e) { - try { - segmentLoader.cleanup(segment); - } - catch (SegmentLoadingException e1) { - // ignore - } - throw e; - } - - if (adapter == null) { - throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); - } - - synchronized (lock) { - String dataSource = segment.getDataSource(); - VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); - - if (loadedIntervals == null) { - loadedIntervals = new VersionedIntervalTimeline<>(Ordering.natural()); - dataSources.put(dataSource, loadedIntervals); - } - - PartitionHolder entry = loadedIntervals.findEntry( - segment.getInterval(), - segment.getVersion() - ); - if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { - log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); - return false; - } - - loadedIntervals.add( - segment.getInterval(), - segment.getVersion(), - segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)) - ); - synchronized (dataSourceSizes) { - dataSourceSizes.add(dataSource, segment.getSize()); - } - synchronized (dataSourceCounts) { - dataSourceCounts.add(dataSource, 1L); - } - return true; - } - } - - public void dropSegment(final DataSegment segment) throws SegmentLoadingException - { - String dataSource = segment.getDataSource(); - synchronized (lock) { - VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); - - if (loadedIntervals == null) { - log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource); - return; - } - - PartitionChunk removed = loadedIntervals.remove( - segment.getInterval(), - segment.getVersion(), - segment.getShardSpec().createChunk((ReferenceCountingSegment) null) - ); - ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); - - if (oldQueryable != null) { - synchronized (dataSourceSizes) { - dataSourceSizes.add(dataSource, -segment.getSize()); - } - synchronized (dataSourceCounts) { - dataSourceCounts.add(dataSource, -1L); - } - - try { - log.info("Attempting to close segment %s", segment.getIdentifier()); - oldQueryable.close(); - } - catch (IOException e) { - log.makeAlert(e, "Exception closing segment") - .addData("dataSource", dataSource) - .addData("segmentId", segment.getIdentifier()) - .emit(); - } - } else { - log.info( - "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", - dataSource, - segment.getInterval(), - segment.getVersion() - ); - } - } - segmentLoader.cleanup(segment); + this.segmentManager = segmentManager; } @Override @@ -262,7 +120,8 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable timeline = dataSources.get(dataSourceName); + final VersionedIntervalTimeline timeline = segmentManager.getDataSources() + .get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); @@ -351,9 +210,8 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable timeline = dataSources.get( - dataSourceName - ); + final VersionedIntervalTimeline timeline = segmentManager.getDataSources() + .get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); diff --git a/server/src/main/java/io/druid/server/coordination/ServerType.java b/server/src/main/java/io/druid/server/coordination/ServerType.java index 8664b8032d8b..6d24d28b4aa5 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerType.java +++ b/server/src/main/java/io/druid/server/coordination/ServerType.java @@ -21,20 +21,36 @@ public enum ServerType { - HISTORICAL(true), - BRIDGE(true), - REALTIME(false); + HISTORICAL, + BRIDGE, + REALTIME { + @Override + public boolean segmentReplicatable() + { + return false; + } + }; - private boolean assignable; + /** + * Indicates this type of node is able to be a target of segment replication. - boolean isAssignable() + * @return true if it is available for replication + * + * @see io.druid.server.coordinator.rules.LoadRule + */ + boolean segmentReplicatable() { - return assignable; + return true; } - ServerType(boolean assignable) + /** + * Indicates this type of node is able to be a target of segment broadcast. + * + * @return true if it is available for broadcast. + */ + boolean segmentBroadcastable() { - this.assignable = assignable; + return true; } static ServerType fromString(String type) diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 0fd4c3f60ac2..eb2af431148c 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -34,6 +34,7 @@ import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.SegmentManager; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -72,7 +73,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final DruidServerMetadata me; private final CuratorFramework curator; private final DataSegmentAnnouncer announcer; - private final ServerManager serverManager; + private final SegmentManager segmentManager; private final ScheduledExecutorService exec; private final ConcurrentSkipListSet segmentsToDelete; @@ -88,7 +89,7 @@ public ZkCoordinator( DruidServerMetadata me, DataSegmentAnnouncer announcer, CuratorFramework curator, - ServerManager serverManager, + SegmentManager segmentManager, ScheduledExecutorFactory factory ) { @@ -98,7 +99,7 @@ public ZkCoordinator( this.me = me; this.curator = curator; this.announcer = announcer; - this.serverManager = serverManager; + this.segmentManager = segmentManager; this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); this.segmentsToDelete = new ConcurrentSkipListSet<>(); @@ -257,7 +258,7 @@ public void loadLocalCache() log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); try { DataSegment segment = jsonMapper.readValue(file, DataSegment.class); - if (serverManager.isSegmentCached(segment)) { + if (segmentManager.isSegmentCached(segment)) { cachedSegments.add(segment); } else { log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); @@ -303,7 +304,7 @@ private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback { final boolean loaded; try { - loaded = serverManager.loadSegment(segment); + loaded = segmentManager.loadSegment(segment); } catch (Exception e) { removeSegment(segment, callback); @@ -465,7 +466,7 @@ public void run() try { synchronized (lock) { if (segmentsToDelete.remove(segment)) { - serverManager.dropSegment(segment); + segmentManager.dropSegment(segment); File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (!segmentInfoCacheFile.delete()) { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 50c693ad0f14..8e46937eef93 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -745,7 +745,7 @@ public boolean apply( DruidServer input ) { - return input.isAssignable(); + return input.segmentReplicatable(); } } ).transform( diff --git a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java index cb3bb6b7f063..ec389cf53bb0 100644 --- a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java @@ -26,7 +26,7 @@ import io.druid.client.DruidServerConfig; import io.druid.java.util.common.collect.CountingMap; import io.druid.query.DruidMetrics; -import io.druid.server.coordination.ServerManager; +import io.druid.server.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.timeline.DataSegment; @@ -35,18 +35,18 @@ public class HistoricalMetricsMonitor extends AbstractMonitor { private final DruidServerConfig serverConfig; - private final ServerManager serverManager; + private final SegmentManager segmentManager; private final ZkCoordinator zkCoordinator; @Inject public HistoricalMetricsMonitor( DruidServerConfig serverConfig, - ServerManager serverManager, + SegmentManager segmentManager, ZkCoordinator zkCoordinator ) { this.serverConfig = serverConfig; - this.serverManager = serverManager; + this.segmentManager = segmentManager; this.zkCoordinator = zkCoordinator; } @@ -73,7 +73,7 @@ public boolean doMonitor(ServiceEmitter emitter) ); } - for (Map.Entry entry : serverManager.getDataSourceSizes().entrySet()) { + for (Map.Entry entry : segmentManager.getDataSourceSizes().entrySet()) { String dataSource = entry.getKey(); long used = entry.getValue(); @@ -88,7 +88,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.build("segment/usedPercent", usedPercent)); } - for (Map.Entry entry : serverManager.getDataSourceCounts().entrySet()) { + for (Map.Entry entry : segmentManager.getDataSourceCounts().entrySet()) { String dataSource = entry.getKey(); long count = entry.getValue(); final ServiceMetricEvent.Builder builder = diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 2484030ae135..7db4025aa28b 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -215,6 +215,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig ) ), + null, + null, null ); plumber2 = new TestPlumber(new Sink( @@ -234,6 +236,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig ) ), + null, + null, null ); @@ -327,7 +331,8 @@ public void run() 1, fireChief_1 ) - ) + ), + null ); startFireChiefWithPartitionNum(fireChief_0, 0); diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 7656e7583af9..608c4a329d8d 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -63,6 +63,7 @@ import io.druid.segment.StorageAdapter; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.SegmentManager; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -94,6 +95,7 @@ public class ServerManagerTest private CountDownLatch queryWaitYieldLatch; private CountDownLatch queryNotifyLatch; private ExecutorService serverManagerExec; + private SegmentManager segmentManager; @Before public void setUp() throws IOException @@ -105,7 +107,7 @@ public void setUp() throws IOException queryNotifyLatch = new CountDownLatch(1); factory = new MyQueryRunnerFactory(queryWaitLatch, queryWaitYieldLatch, queryNotifyLatch); serverManagerExec = Executors.newFixedThreadPool(2); - serverManager = new ServerManager( + segmentManager = new SegmentManager( new SegmentLoader() { @Override @@ -134,7 +136,9 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException { } - }, + } + ); + serverManager = new ServerManager( new QueryRunnerFactoryConglomerate() { @Override @@ -148,7 +152,8 @@ public > QueryRunnerFactory findFact MoreExecutors.sameThreadExecutor(), new DefaultObjectMapper(), new LocalCacheProvider().get(), - new CacheConfig() + new CacheConfig(), + segmentManager ); loadQueryable("test", "1", new Interval("P1d/2011-04-01")); @@ -458,7 +463,7 @@ public void run() public void loadQueryable(String dataSource, String version, Interval interval) throws IOException { try { - serverManager.loadSegment( + segmentManager.loadSegment( new DataSegment( dataSource, interval, @@ -480,7 +485,7 @@ public void loadQueryable(String dataSource, String version, Interval interval) public void dropQueryable(String dataSource, String version, Interval interval) { try { - serverManager.dropSegment( + segmentManager.dropSegment( new DataSegment( dataSource, interval, diff --git a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java index 68b665e05b42..9d6bb200ee81 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java @@ -27,9 +27,9 @@ public class ServerTypeTest @Test public void testAssignable() { - Assert.assertTrue(ServerType.HISTORICAL.isAssignable()); - Assert.assertTrue(ServerType.BRIDGE.isAssignable()); - Assert.assertFalse(ServerType.REALTIME.isAssignable()); + Assert.assertTrue(ServerType.HISTORICAL.segmentReplicatable()); + Assert.assertTrue(ServerType.BRIDGE.segmentReplicatable()); + Assert.assertFalse(ServerType.REALTIME.segmentReplicatable()); } @Test diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 974e9d28954d..83f892e86237 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -44,6 +44,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.loading.CacheTestSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.server.SegmentManager; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -93,6 +94,7 @@ public class ZkCoordinatorTest extends CuratorTestBase private AtomicInteger announceCount; private ConcurrentSkipListSet segmentsAnnouncedByMe; private CacheTestSegmentLoader segmentLoader; + private SegmentManager segmentManager; private List scheduledRunnable; @Before @@ -116,16 +118,17 @@ public void setUp() throws Exception scheduledRunnable = Lists.newArrayList(); segmentLoader = new CacheTestSegmentLoader(); + segmentManager = new SegmentManager(segmentLoader); serverManager = new ServerManager( - segmentLoader, new NoopQueryRunnerFactoryConglomerate(), new NoopServiceEmitter(), MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(), new DefaultObjectMapper(), new LocalCacheProvider().get(), - new CacheConfig() + new CacheConfig(), + segmentManager ); final ZkPathsConfig zkPaths = new ZkPathsConfig() @@ -224,7 +227,7 @@ public int getDropSegmentDelayMillis() me, announcer, curator, - serverManager, + segmentManager, new ScheduledExecutorFactory() { @Override @@ -392,12 +395,12 @@ public void testLoadCache() throws Exception } checkCache(segments); - Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); zkCoordinator.start(); - Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); + Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(13 * COUNT, announceCount.get()); zkCoordinator.stop(); @@ -520,6 +523,7 @@ public String getBase() binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); binder.bind(CuratorFramework.class).toInstance(curator); binder.bind(ServerManager.class).toInstance(serverManager); + binder.bind(SegmentManager.class).toInstance(segmentManager); binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle())); } @@ -543,13 +547,13 @@ public String getBase() } checkCache(segments); - Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); zkCoordinator.start(); - Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(3L, serverManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); + Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(5 * COUNT, announceCount.get()); zkCoordinator.stop(); diff --git a/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java index 68558cc1db9f..20d1d6c05950 100644 --- a/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -27,7 +27,7 @@ import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidServerConfig; -import io.druid.server.coordination.ServerManager; +import io.druid.server.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.timeline.DataSegment; import org.easymock.Capture; @@ -47,7 +47,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport { private DruidServerConfig druidServerConfig; - private ServerManager serverManager; + private SegmentManager segmentManager; private ZkCoordinator zkCoordinator; private ServiceEmitter serviceEmitter; @@ -55,7 +55,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport public void setUp() { druidServerConfig = EasyMock.createStrictMock(DruidServerConfig.class); - serverManager = EasyMock.createStrictMock(ServerManager.class); + segmentManager = EasyMock.createStrictMock(SegmentManager.class); zkCoordinator = EasyMock.createStrictMock(ZkCoordinator.class); serviceEmitter = EasyMock.createStrictMock(ServiceEmitter.class); } @@ -84,17 +84,17 @@ public void testSimple() EasyMock.expect(zkCoordinator.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once(); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); - EasyMock.expect(serverManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size)); + EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size)); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).times(2); - EasyMock.expect(serverManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L)); + EasyMock.expect(segmentManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L)); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); final HistoricalMetricsMonitor monitor = new HistoricalMetricsMonitor( druidServerConfig, - serverManager, + segmentManager, zkCoordinator ); @@ -102,9 +102,9 @@ public void testSimple() serviceEmitter.emit(EasyMock.capture(eventCapture)); EasyMock.expectLastCall().times(5); - EasyMock.replay(druidServerConfig, serverManager, zkCoordinator, serviceEmitter); + EasyMock.replay(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter); monitor.doMonitor(serviceEmitter); - EasyMock.verify(druidServerConfig, serverManager, zkCoordinator, serviceEmitter); + EasyMock.verify(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter); final String host = "host"; final String service = "service"; From 232a2f0194d1382348fcd9d2f66aead605afe4e9 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 5 May 2017 09:26:04 +0900 Subject: [PATCH 5/7] Fix equals and hashCode of ServerHolder --- .../io/druid/server/coordinator/ServerHolder.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java index 7f2f13693744..12b14fc3cba0 100644 --- a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java @@ -23,6 +23,8 @@ import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; +import java.util.Objects; + /** */ public class ServerHolder implements Comparable @@ -122,21 +124,20 @@ public boolean equals(Object o) ServerHolder that = (ServerHolder) o; - if (peon != null ? !peon.equals(that.peon) : that.peon != null) { + if (!this.server.getHost().equals(that.server.getHost())) { return false; } - if (server != null ? !server.equals(that.server) : that.server != null) { + + if (!this.server.getTier().equals(that.getServer().getTier())) { return false; } - return true; + return this.server.getType().equals(that.getServer().getType()); } @Override public int hashCode() { - int result = server != null ? server.hashCode() : 0; - result = 31 * result + (peon != null ? peon.hashCode() : 0); - return result; + return Objects.hash(server.getHost(), server.getTier(), server.getType()); } } From cfd61fcdacc28c78659dd96ee4c71ce33c7227cc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 16 May 2017 12:23:44 +0900 Subject: [PATCH 6/7] Address comments and add more tests --- .../segment/realtime/RealtimeManager.java | 11 +- .../coordination/DruidServerMetadata.java | 2 +- .../druid/server/coordination/ServerType.java | 6 +- .../segment/realtime/RealtimeManagerTest.java | 5 +- .../server/coordination/ServerTypeTest.java | 6 +- .../server/coordinator/DruidClusterTest.java | 204 +++++++++++++++ .../server/coordinator/ServerHolderTest.java | 246 ++++++++++++++++++ 7 files changed, 460 insertions(+), 20 deletions(-) create mode 100644 server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java create mode 100644 server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index a00f2c5b2e27..29a130bc4fb7 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -51,7 +51,6 @@ import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumbers; -import io.druid.server.SegmentManager; import org.joda.time.Interval; import java.io.Closeable; @@ -74,34 +73,28 @@ public class RealtimeManager implements QuerySegmentWalker */ private final Map> chiefs; - private final SegmentManager segmentManager; - @Inject public RealtimeManager( List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate, - SegmentManager segmentManager + QueryRunnerFactoryConglomerate conglomerate ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; this.chiefs = Maps.newHashMap(); - this.segmentManager = segmentManager; } @VisibleForTesting RealtimeManager( List fireDepartments, QueryRunnerFactoryConglomerate conglomerate, - Map> chiefs, - SegmentManager segmentManager + Map> chiefs ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs); - this.segmentManager = segmentManager; } @LifecycleStart diff --git a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java index d2f7449670bc..c209ecc70526 100644 --- a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java @@ -89,7 +89,7 @@ public int getPriority() public boolean segmentReplicatable() { - return type.segmentReplicatable(); + return type.isSegmentReplicationTarget(); } @Override diff --git a/server/src/main/java/io/druid/server/coordination/ServerType.java b/server/src/main/java/io/druid/server/coordination/ServerType.java index 6d24d28b4aa5..39e4d0161c8e 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerType.java +++ b/server/src/main/java/io/druid/server/coordination/ServerType.java @@ -25,7 +25,7 @@ public enum ServerType BRIDGE, REALTIME { @Override - public boolean segmentReplicatable() + public boolean isSegmentReplicationTarget() { return false; } @@ -38,7 +38,7 @@ public boolean segmentReplicatable() * * @see io.druid.server.coordinator.rules.LoadRule */ - boolean segmentReplicatable() + boolean isSegmentReplicationTarget() { return true; } @@ -48,7 +48,7 @@ boolean segmentReplicatable() * * @return true if it is available for broadcast. */ - boolean segmentBroadcastable() + boolean isSegmentBroadcastTarget() { return true; } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 7db4025aa28b..f0d35ed9122f 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -216,7 +216,6 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException ) ), null, - null, null ); plumber2 = new TestPlumber(new Sink( @@ -237,7 +236,6 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException ) ), null, - null, null ); @@ -331,8 +329,7 @@ public void run() 1, fireChief_1 ) - ), - null + ) ); startFireChiefWithPartitionNum(fireChief_0, 0); diff --git a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java index 9d6bb200ee81..add988f3f13b 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java @@ -27,9 +27,9 @@ public class ServerTypeTest @Test public void testAssignable() { - Assert.assertTrue(ServerType.HISTORICAL.segmentReplicatable()); - Assert.assertTrue(ServerType.BRIDGE.segmentReplicatable()); - Assert.assertFalse(ServerType.REALTIME.segmentReplicatable()); + Assert.assertTrue(ServerType.HISTORICAL.isSegmentReplicationTarget()); + Assert.assertTrue(ServerType.BRIDGE.isSegmentReplicationTarget()); + Assert.assertFalse(ServerType.REALTIME.isSegmentReplicationTarget()); } @Test diff --git a/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java new file mode 100644 index 000000000000..be4d5ada89b6 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DruidClusterTest +{ + private static final List segments = ImmutableList.of( + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ), + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ) + ); + + private static final Map dataSources = ImmutableMap.of( + "src1", + new ImmutableDruidDataSource( + "src1", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ), + "src2", + new ImmutableDruidDataSource( + "src2", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ) + ); + + private static final ServerHolder newRealtime = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host2", 100L, ServerType.REALTIME.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + private static final ServerHolder newHistorical = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host2", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + private DruidCluster cluster; + + @Before + public void setup() + { + cluster = new DruidCluster( + ImmutableSet.of( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.REALTIME.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ) + ), + ImmutableMap.of( + "tier1", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + ImmutableList.of( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ) + ) + ) + ) + ); + } + + @Test + public void testAdd() + { + Assert.assertEquals(1, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum()); + Assert.assertEquals(1, cluster.getRealtimes().size()); + + cluster.add(newRealtime); + Assert.assertEquals(1, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum()); + Assert.assertEquals(2, cluster.getRealtimes().size()); + + cluster.add(newHistorical); + Assert.assertEquals(2, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum()); + Assert.assertEquals(2, cluster.getRealtimes().size()); + } + + @Test + public void testGetAllServers() + { + cluster.add(newRealtime); + cluster.add(newHistorical); + final Collection allServers = cluster.getAllServers(); + Assert.assertEquals(4, allServers.size()); + Assert.assertTrue(allServers.containsAll(cluster.getRealtimes())); + Assert.assertTrue( + allServers.containsAll( + cluster.getHistoricals().values().stream().flatMap(Collection::stream).collect(Collectors.toList()) + ) + ); + } + + @Test + public void testIsEmpty() + { + final DruidCluster emptyCluster = new DruidCluster(); + Assert.assertFalse(cluster.isEmpty()); + Assert.assertTrue(emptyCluster.isEmpty()); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java new file mode 100644 index 000000000000..666009d2d496 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class ServerHolderTest +{ + private static final List segments = ImmutableList.of( + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ), + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ) + ); + + private static final Map dataSources = ImmutableMap.of( + "src1", + new ImmutableDruidDataSource( + "src1", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ), + "src2", + new ImmutableDruidDataSource( + "src2", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ) + ); + + @Test + public void testCompareTo() throws Exception + { + // available size of 100 + final ServerHolder h1 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + // available size of 100 + final ServerHolder h2 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 200L, ServerType.HISTORICAL.name(), "tier1", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + // available size of 10 + final ServerHolder h3 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 1000L, ServerType.HISTORICAL.name(), "tier1", 0), + 990L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + // available size of 50 + final ServerHolder h4 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 50L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + Assert.assertEquals(0, h1.compareTo(h2)); + Assert.assertEquals(-1, h3.compareTo(h1)); + Assert.assertEquals(-1, h3.compareTo(h4)); + } + + @Test + public void testEquals() throws Exception + { + final ServerHolder h1 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h2 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name2", "host1", 200L, ServerType.HISTORICAL.name(), "tier1", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h3 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host2", 200L, ServerType.HISTORICAL.name(), "tier1", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h4 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 200L, ServerType.HISTORICAL.name(), "tier2", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h5 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.REALTIME.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + Assert.assertEquals(h1, h2); + Assert.assertNotEquals(h1, h3); + Assert.assertNotEquals(h1, h4); + Assert.assertNotEquals(h1, h5); + } +} From 2ef88defd2471d1e19f410bb5fe4842586581e7e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 16 May 2017 18:45:02 +0900 Subject: [PATCH 7/7] Address comments --- .../client/ImmutableDruidDataSource.java | 1 + .../io/druid/client/ImmutableDruidServer.java | 1 + .../java/io/druid/server/SegmentManager.java | 45 ++++++++++--------- .../server/coordination/ServerManager.java | 10 +++-- 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java index 247d1bae0400..c83d674e4285 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java @@ -76,6 +76,7 @@ public Set getSegments() @Override public String toString() { + // partitionNames is intentionally ignored because it is usually large return "ImmutableDruidDataSource{" + "name='" + name + "', segments='" + segmentsHolder diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index f6b2cdc0657e..1851befe277e 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -111,6 +111,7 @@ public Map getSegments() @Override public String toString() { + // segments is intentionally ignored because it is usually large return "ImmutableDruidServer{" + "meta='" + metadata + "', size='" + currSize diff --git a/server/src/main/java/io/druid/server/SegmentManager.java b/server/src/main/java/io/druid/server/SegmentManager.java index d8c0dbce3f73..9013ed663075 100644 --- a/server/src/main/java/io/druid/server/SegmentManager.java +++ b/server/src/main/java/io/druid/server/SegmentManager.java @@ -19,7 +19,6 @@ package io.druid.server; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; @@ -74,10 +73,10 @@ public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingE return segmentLoader.isSegmentLoaded(segment); } - public Map> getDataSources() + public VersionedIntervalTimeline getTimeline(String dataSource) { synchronized (lock) { - return ImmutableMap.copyOf(dataSources); + return dataSources.get(dataSource); } } @@ -92,23 +91,7 @@ public Map> */ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException { - final Segment adapter; - try { - adapter = segmentLoader.getSegment(segment); - } - catch (SegmentLoadingException e) { - try { - segmentLoader.cleanup(segment); - } - catch (SegmentLoadingException e1) { - // ignore - } - throw e; - } - - if (adapter == null) { - throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); - } + final Segment adapter = getAdapter(segment); synchronized (lock) { final String dataSource = segment.getDataSource(); @@ -141,6 +124,28 @@ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingExcep } } + private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException + { + final Segment adapter; + try { + adapter = segmentLoader.getSegment(segment); + } + catch (SegmentLoadingException e) { + try { + segmentLoader.cleanup(segment); + } + catch (SegmentLoadingException e1) { + e.addSuppressed(e1); + } + throw e; + } + + if (adapter == null) { + throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); + } + return adapter; + } + public void dropSegment(final DataSegment segment) throws SegmentLoadingException { String dataSource = segment.getDataSource(); diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 3613306333af..a403c189caec 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -120,8 +120,9 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable timeline = segmentManager.getDataSources() - .get(dataSourceName); + final VersionedIntervalTimeline timeline = segmentManager.getTimeline( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner(); @@ -210,8 +211,9 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable timeline = segmentManager.getDataSources() - .get(dataSourceName); + final VersionedIntervalTimeline timeline = segmentManager.getTimeline( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner();