From e4941caeb978c261aa8eafaa706d86daf8fb7612 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 13 Oct 2017 17:40:48 -0700 Subject: [PATCH 1/8] Add Router connection balancers for Avatica queries --- .../benchmark/ConsistentHasherBenchmark.java | 89 ++++++ .../benchmark/RendezvousHasherBenchmark.java | 87 ++++++ docs/content/development/router.md | 32 +++ .../druid/initialization/Initialization.java | 4 +- .../server/AsyncQueryForwardingServlet.java | 47 +++- .../io/druid/server/ConsistentHasher.java | 154 ++++++++++ .../io/druid/server/RendezvousHasher.java | 67 +++++ .../router/AvaticaConnectionBalancer.java | 39 +++ .../AvaticaConnectionBalancerModule.java | 126 +++++++++ ...nsistentHashAvaticaConnectionBalancer.java | 50 ++++ ...ntHashAvaticaConnectionBalancerModule.java | 44 +++ ...ndezvousHashAvaticaConnectionBalancer.java | 47 ++++ ...usHashAvaticaConnectionBalancerModule.java | 44 +++ .../AsyncQueryForwardingServletTest.java | 4 +- .../io/druid/server/ConsistentHasherTest.java | 262 ++++++++++++++++++ .../io/druid/server/RendezvousHasherTest.java | 248 +++++++++++++++++ 16 files changed, 1340 insertions(+), 4 deletions(-) create mode 100644 benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java create mode 100644 benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java create mode 100644 server/src/main/java/io/druid/server/ConsistentHasher.java create mode 100644 server/src/main/java/io/druid/server/RendezvousHasher.java create mode 100644 server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java create mode 100644 server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java create mode 100644 server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java create mode 100644 server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java create mode 100644 server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java create mode 100644 server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java create mode 100644 server/src/test/java/io/druid/server/ConsistentHasherTest.java create mode 100644 server/src/test/java/io/druid/server/RendezvousHasherTest.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java new file mode 100644 index 000000000000..3680ae4e3821 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing;; +import io.druid.server.ConsistentHasher; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 15) +@Measurement(iterations = 30) +public class ConsistentHasherBenchmark +{ + @Param({"100000"}) + int numIds; + + ConsistentHasher hasher; + List uuids; + Set servers; + + @Setup + public void setup() throws IOException + { + hasher = new ConsistentHasher(Hashing.murmur3_128(9999)); + uuids = new ArrayList<>(); + servers = Sets.newHashSet( + "localhost:1", + "localhost:2", + "localhost:3", + "localhost:4", + "localhost:5" + ); + + for (int i = 0; i < numIds; i++) { + UUID uuid = UUID.randomUUID(); + uuids.add(uuid.toString()); + } + + hasher.updateKeys(servers); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void hash(Blackhole blackhole) throws Exception + { + for (String uuid : uuids) { + String server = hasher.hash(uuid, ConsistentHasher.STRING_FUNNEL); + blackhole.consume(server); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java new file mode 100644 index 000000000000..52a724c5d93a --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java @@ -0,0 +1,87 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.collect.Sets; +import io.druid.server.RendezvousHasher; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 15) +@Measurement(iterations = 30) +public class RendezvousHasherBenchmark +{ + @Param({"100000"}) + int numIds; + + RendezvousHasher hasher; + List uuids; + Set servers; + + @Setup + public void setup() throws IOException + { + hasher = new RendezvousHasher(); + uuids = new ArrayList<>(); + servers = Sets.newHashSet( + "localhost:1", + "localhost:2", + "localhost:3", + "localhost:4", + "localhost:5" + ); + + + for (int i = 0; i < numIds; i++) { + UUID uuid = UUID.randomUUID(); + uuids.add(uuid.toString()); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void hash(Blackhole blackhole) throws Exception + { + for (String uuid : uuids) { + String server = hasher.chooseNode(servers, uuid, RendezvousHasher.STRING_FUNNEL); + blackhole.consume(server); + } + } +} diff --git a/docs/content/development/router.md b/docs/content/development/router.md index 63ba585d4427..898595c76297 100644 --- a/docs/content/development/router.md +++ b/docs/content/development/router.md @@ -75,6 +75,7 @@ The router module uses several of the default modules in [Configuration](../conf |`druid.router.coordinatorServiceName`|Any string.|The service discovery name of the coordinator.|druid/coordinator| |`druid.router.pollPeriod`|Any ISO8601 duration.|How often to poll for new rules.|PT1M| |`druid.router.strategies`|An ordered JSON array of objects.|All custom strategies to use for routing.|[{"type":"timeBoundary"},{"type":"priority"}]| +|`druid.router.avatica.balancer`|String representing an AvaticaConnectionBalancer name|Class to use for balancing Avatica queries across brokers|rendezvousHash| Router Strategies ----------------- @@ -119,6 +120,37 @@ Allows defining arbitrary routing rules using a JavaScript function. The functio JavaScript-based functionality is disabled by default. Please refer to the Druid JavaScript programming guide for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it. + +Avatica Query Balancing +-------------- + +All Avatica JDBC requests with a given connection ID must be routed to the same broker, since Druid brokers do not share connection state with each other. + +To accomplish this, Druid provides two built-in balancers that use rendezvous hashing and consistent hashing of a request's connection ID respectively to assign requests to brokers. + +### Rendezvous Hash Balancer + +This balancer uses [Rendezvous Hashing](https://en.wikipedia.org/wiki/Rendezvous_hashing) on an Avatica request's connection ID to assign the request to a broker. + +To use this balancer, specify the following property: + +``` +druid.router.avatica.balancer=rendezvousHash +``` + +If no `druid.router.avatica.balancer` property is set, the Router will also default to using the Rendezvous Hash Balancer. + +### Consistent Hash Balancer + +This balancer uses [Consistent Hashing](https://en.wikipedia.org/wiki/Consistent_hashing) on an Avatica request's connection ID to assign the request to a broker. + +To use this balancer, specify the following property: + +``` +druid.router.avatica.balancer=consistentHash +``` + + HTTP Endpoints -------------- diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index a6ab2a0d155c..08a8a26d2b1e 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -69,6 +69,7 @@ import io.druid.server.emitter.EmitterModule; import io.druid.server.initialization.jetty.JettyServerModule; import io.druid.server.metrics.MetricsModule; +import io.druid.server.router.AvaticaConnectionBalancerModule; import org.apache.commons.io.FileUtils; import org.eclipse.aether.artifact.DefaultArtifact; @@ -381,7 +382,8 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter new AuthenticatorHttpClientWrapperModule(), new AuthorizerModule(), new AuthorizerMapperModule(), - new StartupLoggingModule() + new StartupLoggingModule(), + AvaticaConnectionBalancerModule.class ); ModuleList actualModules = new ModuleList(baseInjector); diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index c7f2b1167b3b..df3ad57a78b4 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -33,6 +33,7 @@ import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.ISE; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; @@ -40,6 +41,7 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.server.log.RequestLogger; import io.druid.server.metrics.QueryCountStatsProvider; +import io.druid.server.router.AvaticaConnectionBalancer; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import io.druid.server.security.AuthConfig; @@ -62,6 +64,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLDecoder; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -78,6 +81,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host"; private static final String SCHEME_ATTRIBUTE = "io.druid.proxy.to.host.scheme"; private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query"; + private static final String AVATICA_QUERY_ATTRIBUTE = "io.druid.proxy.avaticaQuery"; private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper"; private static final int CANCELLATION_TIMEOUT_MILLIS = 500; @@ -112,6 +116,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o private final RequestLogger requestLogger; private final GenericQueryMetricsFactory queryMetricsFactory; private final Authenticator escalatingAuthenticator; + private final AvaticaConnectionBalancer avaticaConnectionBalancer; private HttpClient broadcastClient; @@ -126,7 +131,8 @@ public AsyncQueryForwardingServlet( ServiceEmitter emitter, RequestLogger requestLogger, GenericQueryMetricsFactory queryMetricsFactory, - AuthenticatorMapper authenticatorMapper + AuthenticatorMapper authenticatorMapper, + AvaticaConnectionBalancer avaticaConnectionBalancer ) { this.warehouse = warehouse; @@ -139,6 +145,7 @@ public AsyncQueryForwardingServlet( this.requestLogger = requestLogger; this.queryMetricsFactory = queryMetricsFactory; this.escalatingAuthenticator = authenticatorMapper.getEscalatingAuthenticator(); + this.avaticaConnectionBalancer = avaticaConnectionBalancer; } @Override @@ -186,7 +193,25 @@ protected void service(HttpServletRequest request, HttpServletResponse response) final boolean isQueryEndpoint = request.getRequestURI().startsWith("/druid/v2") && !request.getRequestURI().startsWith("/druid/v2/sql"); - if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { + final boolean isAvatica = request.getRequestURI().startsWith("/druid/v2/sql/avatica"); + + if (isAvatica) { + byte[] requestContent = getRequestContent(request); + String connectionId = getAvaticaConnectionId(requestContent, objectMapper); + Server targetServer = avaticaConnectionBalancer.balance(hostFinder.getAllServers(), connectionId); + if (targetServer == null) { + throw new ISE("Cannot balance request with connectionId[%s], no brokers found.", connectionId); + } + request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost()); + request.setAttribute(SCHEME_ATTRIBUTE, targetServer.getScheme()); + request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestContent); + log.debug( + "Balancer class [%s] sending request with connectionId[%s] to server: %s", + avaticaConnectionBalancer.getClass(), + connectionId, + targetServer.getHost() + ); + } else if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { // query cancellation request for (final Server server: hostFinder.getAllServers()) { // send query cancellation to all brokers this query may have gone to @@ -263,6 +288,11 @@ protected void sendProxyRequest( proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); + byte[] avaticaQuery = (byte[]) clientRequest.getAttribute(AVATICA_QUERY_ATTRIBUTE); + if (avaticaQuery != null) { + proxyRequest.content(new BytesContentProvider(avaticaQuery)); + } + final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); if (query != null) { final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE); @@ -371,6 +401,19 @@ public long getInterruptedQueryCount() return interruptedQueryCount.get(); } + private static byte[] getRequestContent(HttpServletRequest request) throws IOException + { + int contentSize = request.getContentLength(); + byte[] content = new byte[contentSize]; + int bytesRead = request.getInputStream().read(content); + return content; + } + + private static String getAvaticaConnectionId(byte[] requestContent, ObjectMapper objectMapper) throws IOException + { + Map contentMap = objectMapper.readValue(requestContent, Map.class); + return (String) contentMap.get("connectionId"); + } private class MetricsEmittingProxyResponseListener extends ProxyResponseListener { diff --git a/server/src/main/java/io/druid/server/ConsistentHasher.java b/server/src/main/java/io/druid/server/ConsistentHasher.java new file mode 100644 index 000000000000..f81b677314ba --- /dev/null +++ b/server/src/main/java/io/druid/server/ConsistentHasher.java @@ -0,0 +1,154 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.base.Charsets; +import com.google.common.hash.Funnel; +import com.google.common.hash.Funnels; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +// Distributes objects across a set of node keys using consistent hashing +// See https://en.wikipedia.org/wiki/Consistent_hashing +// Not thread-safe. +public class ConsistentHasher +{ + private static final int REPLICATION_FACTOR = 128; + private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128(9999); + + public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8); + + private final Long2ObjectRBTreeMap nodeKeySlots = new Long2ObjectRBTreeMap<>(); + { + nodeKeySlots.defaultReturnValue(null); + } + private final HashFunction hashFn; + private final Map nodeKeyHashes = new HashMap<>(); + private Set previousKeys = new HashSet<>(); + + public ConsistentHasher( + final HashFunction hashFunction + ) + { + this.hashFn = hashFunction == null ? DEFAULT_HASH_FN : hashFunction; + } + + public void updateKeys(Set currentKeys) + { + Set added = new HashSet<>(currentKeys); + added.removeAll(previousKeys); + + Set removed = new HashSet<>(previousKeys); + removed.removeAll(currentKeys); + + for (String key : added) { + addKey(key); + } + + for (String key : removed) { + removeKey(key); + } + + // store a copy in case the input was immutable + previousKeys = new HashSet<>(currentKeys); + } + + public String hashStr(String str) + { + return hash(str, STRING_FUNNEL); + } + + public String hash(T obj, Funnel funnel) + { + if (nodeKeySlots.size() == 0) { + return null; + } + + long objHash = hashFn.hashObject(obj, funnel).asLong(); + + Long2ObjectSortedMap subMap = nodeKeySlots.tailMap(objHash); + if (subMap.isEmpty()) { + return nodeKeySlots.long2ObjectEntrySet().first().getValue(); + } + + Long2ObjectMap.Entry firstEntry = subMap.long2ObjectEntrySet().first(); + return firstEntry.getValue(); + } + + private void addKey(String key) + { + if (nodeKeyHashes.containsKey(key)) { + return; + } + + addNodeKeyHashes(key); + addNodeKeySlots(key); + } + + private void removeKey(String key) + { + if (!nodeKeyHashes.containsKey(key)) { + return; + } + + removeNodeKeySlots(key); + removeNodeKeyHashes(key); + } + + private void addNodeKeyHashes(String key) + { + long[] hashes = new long[REPLICATION_FACTOR]; + for (int i = 0; i < REPLICATION_FACTOR; i++) { + String vnode = key + "-" + i; + hashes[i] = hashFn.hashString(vnode, Charsets.UTF_8).asLong(); + } + + nodeKeyHashes.put(key, hashes); + } + + private void addNodeKeySlots(String key) + { + long[] hashes = nodeKeyHashes.get(key); + for (long hash : hashes) { + nodeKeySlots.put(hash, key); + } + } + + private void removeNodeKeyHashes(String key) + { + nodeKeyHashes.remove(key); + } + + private void removeNodeKeySlots(String key) + { + long[] hashes = nodeKeyHashes.get(key); + for (long hash : hashes) { + nodeKeySlots.remove(hash); + } + } +} diff --git a/server/src/main/java/io/druid/server/RendezvousHasher.java b/server/src/main/java/io/druid/server/RendezvousHasher.java new file mode 100644 index 000000000000..d5ca46fe043c --- /dev/null +++ b/server/src/main/java/io/druid/server/RendezvousHasher.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.hash.Funnel; +import com.google.common.hash.Funnels; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; + +import java.util.List; +import java.util.Set; + +// Distributes objects across a set of node keys using rendezvous hashing +// See https://en.wikipedia.org/wiki/Rendezvous_hashing +public class RendezvousHasher +{ + private static final int HASH_FN_SEED = 9999; + private static final HashFunction HASH_FN = Hashing.murmur3_128(HASH_FN_SEED); + + public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8); + + public String chooseNode(Set nodeIds, String key) + { + return chooseNode(nodeIds, key, STRING_FUNNEL); + } + + public String chooseNode(Set nodeIds, KeyType key, Funnel funnel) + { + if (nodeIds.isEmpty()) { + return null; + } + + Long2ObjectRBTreeMap weights = new Long2ObjectRBTreeMap<>(); + weights.defaultReturnValue(null); + + for (String nodeId : nodeIds) { + HashCode keyHash = HASH_FN.hashObject(key, funnel); + HashCode nodeHash = HASH_FN.hashObject(nodeId, STRING_FUNNEL); + List hashes = Lists.newArrayList(nodeHash, keyHash); + HashCode combinedHash = Hashing.combineOrdered(hashes); + weights.put(combinedHash.asLong(), nodeId); + } + + return weights.get(weights.lastLongKey()); + } +} diff --git a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java new file mode 100644 index 000000000000..1dfe0cc1dc2d --- /dev/null +++ b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java @@ -0,0 +1,39 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import io.druid.client.selector.Server; +import io.druid.guice.annotations.ExtensionPoint; + +import java.util.Collection; + +/** + * An AvaticaConnectionBalancer balances Avatica connections across a collection of servers. + */ +@ExtensionPoint +public interface AvaticaConnectionBalancer +{ + /** + * @param servers Servers to balance across + * @param connectionId Connection ID to be balanced + * @return Server that connectionId should be assigned to + */ + Server balance(Collection servers, String connectionId); +} diff --git a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java new file mode 100644 index 000000000000..fb3e2a33fb2d --- /dev/null +++ b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java @@ -0,0 +1,126 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.google.common.collect.Lists; +import com.google.inject.Binder; +import com.google.inject.Binding; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provider; +import com.google.inject.TypeLiteral; +import com.google.inject.name.Named; +import com.google.inject.name.Names; +import io.druid.guice.LazySingleton; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; + +import java.lang.annotation.Annotation; +import java.util.List; +import java.util.Properties; + +/** + */ +public class AvaticaConnectionBalancerModule implements Module +{ + private static final Logger log = new Logger(AvaticaConnectionBalancerModule.class); + private static final String BALANCER_PROPERTY = "druid.router.avatica.balancer"; + + private final Properties props; + + @Inject + public AvaticaConnectionBalancerModule( + Properties props + ) + { + this.props = props; + } + + @Override + public void configure(Binder binder) + { + String balancerType = props.getProperty(BALANCER_PROPERTY, ""); + binder.install(new ConsistentHashAvaticaConnectionBalancerModule()); + binder.install(new RendezvousHashAvaticaConnectionBalancerModule()); + binder.bind(AvaticaConnectionBalancer.class) + .toProvider(new AvaticaConnectionBalancerProvider(balancerType)) + .in(LazySingleton.class); + } + + private static class AvaticaConnectionBalancerProvider implements Provider + { + private final String balancerType; + + private AvaticaConnectionBalancer balancer = null; + + AvaticaConnectionBalancerProvider( + String balancerType + ) + { + this.balancerType = balancerType; + } + + @Inject + public void inject(Injector injector) + { + final List> balancerBindings = injector.findBindingsByType( + new TypeLiteral(){} + ); + + balancer = findBalancer(balancerType, balancerBindings); + + if (balancer == null) { + balancer = findBalancer(RendezvousHashAvaticaConnectionBalancerModule.BALANCER_TYPE, balancerBindings); + } + + if (balancer == null) { + List knownTypes = Lists.newArrayList(); + for (Binding binding : balancerBindings) { + final Annotation annotation = binding.getKey().getAnnotation(); + if (annotation != null) { + knownTypes.add(((Named) annotation).value()); + } + } + throw new ISE("Unknown balancer type[%s]=[%s], known types[%s]", BALANCER_PROPERTY, balancerType, knownTypes); + } + } + + private AvaticaConnectionBalancer findBalancer(String balancerType, List> balancerBindings) + { + for (Binding binding : balancerBindings) { + if (Names.named(balancerType).equals(binding.getKey().getAnnotation())) { + return binding.getProvider().get(); + } + } + return null; + } + + + @Override + public AvaticaConnectionBalancer get() + { + if (balancer == null) { + throw new ISE("AvaticaConnectionBalancer was null, that's bad!"); + } + return balancer; + } + } +} diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java new file mode 100644 index 000000000000..fdaecfde9ad6 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java @@ -0,0 +1,50 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import io.druid.client.selector.Server; +import io.druid.server.ConsistentHasher; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class ConsistentHashAvaticaConnectionBalancer implements AvaticaConnectionBalancer +{ + private final ConsistentHasher hasher = new ConsistentHasher(null); + + @Override + public Server balance(Collection servers, String connectionId) + { + synchronized (hasher) { + if (servers.isEmpty()) { + return null; + } + Map serverMap = new HashMap<>(); + for (Server server : servers) { + serverMap.put(server.getHost(), server); + } + + hasher.updateKeys(serverMap.keySet()); + String chosenServer = hasher.hashStr(connectionId); + return serverMap.get(chosenServer); + } + } +} diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java new file mode 100644 index 000000000000..75d0384db6da --- /dev/null +++ b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import io.druid.guice.ManageLifecycle; + +public class ConsistentHashAvaticaConnectionBalancerModule implements Module +{ + public static final String BALANCER_TYPE = "consistentHash"; + + @Override + public void configure(Binder binder) + { + } + + @Provides + @ManageLifecycle + @Named(BALANCER_TYPE) + public AvaticaConnectionBalancer makeAvaticaConnectionBalancer() + { + return new ConsistentHashAvaticaConnectionBalancer(); + } +} diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java new file mode 100644 index 000000000000..dfc44de36dc3 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import io.druid.client.selector.Server; +import io.druid.server.RendezvousHasher; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class RendezvousHashAvaticaConnectionBalancer implements AvaticaConnectionBalancer +{ + private final RendezvousHasher hasher = new RendezvousHasher(); + + @Override + public Server balance(Collection servers, String connectionId) + { + if (servers.isEmpty()) { + return null; + } + + Map serverMap = new HashMap<>(); + for (Server server : servers) { + serverMap.put(server.getHost(), server); + } + String chosenServerId = hasher.chooseNode(serverMap.keySet(), connectionId, RendezvousHasher.STRING_FUNNEL); + return serverMap.get(chosenServerId); + } +} diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java new file mode 100644 index 000000000000..ae0b748c386d --- /dev/null +++ b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import io.druid.guice.ManageLifecycle; + +public class RendezvousHashAvaticaConnectionBalancerModule implements Module +{ + public static final String BALANCER_TYPE = "rendezvousHash"; + + @Override + public void configure(Binder binder) + { + } + + @Provides + @ManageLifecycle + @Named(BALANCER_TYPE) + public AvaticaConnectionBalancer makeAvaticaConnectionBalancer() + { + return new RendezvousHashAvaticaConnectionBalancer(); + } +} diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java index 895416f0e25a..bec220f3d67c 100644 --- a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -50,6 +50,7 @@ import io.druid.server.log.RequestLogger; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.router.QueryHostFinder; +import io.druid.server.router.RendezvousHashAvaticaConnectionBalancer; import io.druid.server.security.AllowAllAuthorizer; import io.druid.server.security.AuthTestUtils; import io.druid.server.security.Authorizer; @@ -252,7 +253,8 @@ public void log(RequestLogLine requestLogLine) throws IOException } }, new DefaultGenericQueryMetricsFactory(jsonMapper), - AuthTestUtils.TEST_AUTHENTICATOR_MAPPER + AuthTestUtils.TEST_AUTHENTICATOR_MAPPER, + new RendezvousHashAvaticaConnectionBalancer() ) { @Override diff --git a/server/src/test/java/io/druid/server/ConsistentHasherTest.java b/server/src/test/java/io/druid/server/ConsistentHasherTest.java new file mode 100644 index 000000000000..373349bd5804 --- /dev/null +++ b/server/src/test/java/io/druid/server/ConsistentHasherTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class ConsistentHasherTest +{ + private static HashFunction TEST_HASH_FN = Hashing.murmur3_128(9999); + private static int NUM_ITERATIONS = 10000; + private static final Logger log = new Logger(ConsistentHasherTest.class); + + @Test + public void testBasic() throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + hasher.updateKeys(nodes); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.hashStr(objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + // check that the same UUIDs hash to the same servers on subsequent hashStr() calls + for (int i = 0; i < 2; i++) { + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.hashStr(entry.getKey()); + Assert.assertEquals(entry.getValue(), targetServer); + } + } + } + + @Test + public void testAddNode() throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + hasher.updateKeys(nodes); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.hashStr(objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.add("localhost:6"); + hasher.updateKeys(nodes); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.hashStr(entry.getKey()); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testAddNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + // ~1/6 of the entries should change, check that less than 1/5 of the entries hash differently + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.20); + } + + @Test + public void testRemoveNode() throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + hasher.updateKeys(nodes); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.hashStr(objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.remove("localhost:3"); + hasher.updateKeys(nodes); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.hashStr(entry.getKey()); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testRemoveNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + // ~1/5 of the entries should change, check that less than 1/4 of the entries hash differently + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.25); + } + + @Test + public void testInconsistentView1() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:3"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.3); + } + + @Test + public void testInconsistentView2() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:2"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.45); + } + + @Test + public void testInconsistentView3() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.6); + } + + @Test + public void testInconsistentView4() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:2"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.85); + } + + public void testInconsistentViewHelper( + String testName, + Set nodes, + Set nodes2, + double expectedDiffRatio + ) throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + hasher.updateKeys(nodes); + + Map uuidServerMap = new HashMap<>(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.hashStr(objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + ConsistentHasher hasher2 = new ConsistentHasher(TEST_HASH_FN); + hasher2.updateKeys(nodes2); + + Map uuidServerMap2 = new HashMap<>(); + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher2.hashStr(entry.getKey()); + uuidServerMap2.put(entry.getKey(), targetServer); + } + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String otherServer = uuidServerMap2.get(entry.getKey()); + if (entry.getValue().equals(otherServer)) { + same += 1; + } else { + diff += 1; + } + } + double actualDiffRatio = ((double) diff) / NUM_ITERATIONS; + + log.info(StringUtils.format("%s Total: %s, Same: %s, Diff: %s", testName, NUM_ITERATIONS, same, diff)); + log.info("Expected diff ratio: %s, Actual diff ratio: %s", expectedDiffRatio, actualDiffRatio); + + Assert.assertTrue(actualDiffRatio <= expectedDiffRatio); + } +} diff --git a/server/src/test/java/io/druid/server/RendezvousHasherTest.java b/server/src/test/java/io/druid/server/RendezvousHasherTest.java new file mode 100644 index 000000000000..3fef4633a43e --- /dev/null +++ b/server/src/test/java/io/druid/server/RendezvousHasherTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class RendezvousHasherTest +{ + private static int NUM_ITERATIONS = 10000; + private static final Logger log = new Logger(RendezvousHasherTest.class); + + @Test + public void testBasic() throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + // check that the same UUIDs hash to the same servers on subsequent hashStr() calls + for (int i = 0; i < 2; i++) { + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.chooseNode(nodes, entry.getKey()); + Assert.assertEquals(entry.getValue(), targetServer); + } + } + } + + @Test + public void testAddNode() throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.add("localhost:6"); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.chooseNode(nodes, entry.getKey()); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testAddNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.33); + } + + @Test + public void testRemoveNode() throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.remove("localhost:3"); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.chooseNode(nodes, entry.getKey()); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testRemoveNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.33); + } + + @Test + public void testInconsistentView1() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:3"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.25); + } + + @Test + public void testInconsistentView2() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:2"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.5); + } + + @Test + public void testInconsistentView3() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.6); + } + + @Test + public void testInconsistentView4() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:2"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.85); + } + + public void testInconsistentViewHelper( + String testName, + Set nodes, + Set nodes2, + double expectedDiffRatio + ) throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + Map uuidServerMap = new HashMap<>(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, objectId.toString()); + uuidServerMap.put(objectId.toString(), targetServer); + } + + RendezvousHasher hasher2 = new RendezvousHasher(); + Map uuidServerMap2 = new HashMap<>(); + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher2.chooseNode(nodes2, entry.getKey()); + uuidServerMap2.put(entry.getKey(), targetServer); + } + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String otherServer = uuidServerMap2.get(entry.getKey()); + if (entry.getValue().equals(otherServer)) { + same += 1; + } else { + diff += 1; + } + } + double actualDiffRatio = ((double) diff) / NUM_ITERATIONS; + + log.info(StringUtils.format("%s Total: %s, Same: %s, Diff: %s", testName, NUM_ITERATIONS, same, diff)); + log.info("Expected diff ratio: %s, Actual diff ratio: %s", expectedDiffRatio, actualDiffRatio); + + Assert.assertTrue(actualDiffRatio <= expectedDiffRatio); + } +} From 56148941f633da36bccda34df28e5b1e8500468c Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 19 Oct 2017 20:04:32 -0700 Subject: [PATCH 2/8] PR comments --- .../benchmark/ConsistentHasherBenchmark.java | 2 +- .../benchmark/RendezvousHasherBenchmark.java | 2 +- docs/content/development/router.md | 7 +- .../druid/initialization/Initialization.java | 4 +- .../server/AsyncQueryForwardingServlet.java | 45 +++---- .../router/AvaticaConnectionBalancer.java | 9 +- .../AvaticaConnectionBalancerModule.java | 126 ------------------ ...nsistentHashAvaticaConnectionBalancer.java | 1 - ...ntHashAvaticaConnectionBalancerModule.java | 44 ------ .../server/{ => router}/ConsistentHasher.java | 10 +- .../druid/server/router/QueryHostFinder.java | 24 +++- ...ndezvousHashAvaticaConnectionBalancer.java | 1 - ...usHashAvaticaConnectionBalancerModule.java | 44 ------ .../server/{ => router}/RendezvousHasher.java | 11 +- .../AsyncQueryForwardingServletTest.java | 5 +- .../io/druid/server/ConsistentHasherTest.java | 1 + .../io/druid/server/RendezvousHasherTest.java | 1 + .../server/router/QueryHostFinderTest.java | 3 +- .../src/main/java/io/druid/cli/CliRouter.java | 2 + 19 files changed, 73 insertions(+), 269 deletions(-) delete mode 100644 server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java delete mode 100644 server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java rename server/src/main/java/io/druid/server/{ => router}/ConsistentHasher.java (95%) delete mode 100644 server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java rename server/src/main/java/io/druid/server/{ => router}/RendezvousHasher.java (91%) diff --git a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java index 3680ae4e3821..6731a2a7c330 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java @@ -21,7 +21,7 @@ import com.google.common.collect.Sets; import com.google.common.hash.Hashing;; -import io.druid.server.ConsistentHasher; +import io.druid.server.router.ConsistentHasher; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; diff --git a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java index 52a724c5d93a..c07cd19a2836 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java @@ -20,7 +20,7 @@ package io.druid.benchmark; import com.google.common.collect.Sets; -import io.druid.server.RendezvousHasher; +import io.druid.server.router.RendezvousHasher; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; diff --git a/docs/content/development/router.md b/docs/content/development/router.md index 898595c76297..707528027422 100644 --- a/docs/content/development/router.md +++ b/docs/content/development/router.md @@ -75,7 +75,7 @@ The router module uses several of the default modules in [Configuration](../conf |`druid.router.coordinatorServiceName`|Any string.|The service discovery name of the coordinator.|druid/coordinator| |`druid.router.pollPeriod`|Any ISO8601 duration.|How often to poll for new rules.|PT1M| |`druid.router.strategies`|An ordered JSON array of objects.|All custom strategies to use for routing.|[{"type":"timeBoundary"},{"type":"priority"}]| -|`druid.router.avatica.balancer`|String representing an AvaticaConnectionBalancer name|Class to use for balancing Avatica queries across brokers|rendezvousHash| +|`druid.router.avatica.balancer.type`|String representing an AvaticaConnectionBalancer name|Class to use for balancing Avatica queries across brokers|rendezvousHash| Router Strategies ----------------- @@ -135,7 +135,7 @@ This balancer uses [Rendezvous Hashing](https://en.wikipedia.org/wiki/Rendezvous To use this balancer, specify the following property: ``` -druid.router.avatica.balancer=rendezvousHash +druid.router.avatica.balancer.type=rendezvousHash ``` If no `druid.router.avatica.balancer` property is set, the Router will also default to using the Rendezvous Hash Balancer. @@ -147,9 +147,10 @@ This balancer uses [Consistent Hashing](https://en.wikipedia.org/wiki/Consistent To use this balancer, specify the following property: ``` -druid.router.avatica.balancer=consistentHash +druid.router.avatica.balancer.type=consistentHash ``` +This is a non-default implementation that is provided for experimentation purposes. The consistent hasher has longer setup times on initialization and when the set of brokers changes, but has a faster broker assignment time than the rendezous hasher when tested with 5 brokers. Benchmarks for both implementations have been provided in `ConsistentHasherBenchmark` and `RendezvousHasherBenchmark`. The consistent hasher also requires locking, while the rendezvous hasher does not. HTTP Endpoints -------------- diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 08a8a26d2b1e..a6ab2a0d155c 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -69,7 +69,6 @@ import io.druid.server.emitter.EmitterModule; import io.druid.server.initialization.jetty.JettyServerModule; import io.druid.server.metrics.MetricsModule; -import io.druid.server.router.AvaticaConnectionBalancerModule; import org.apache.commons.io.FileUtils; import org.eclipse.aether.artifact.DefaultArtifact; @@ -382,8 +381,7 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter new AuthenticatorHttpClientWrapperModule(), new AuthorizerModule(), new AuthorizerMapperModule(), - new StartupLoggingModule(), - AvaticaConnectionBalancerModule.class + new StartupLoggingModule() ); ModuleList actualModules = new ModuleList(baseInjector); diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index df3ad57a78b4..04b20c6e0cfe 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -33,7 +33,7 @@ import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.java.util.common.DateTimes; -import io.druid.java.util.common.ISE; +import io.druid.java.util.common.IAE; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; @@ -41,7 +41,6 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.server.log.RequestLogger; import io.druid.server.metrics.QueryCountStatsProvider; -import io.druid.server.router.AvaticaConnectionBalancer; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import io.druid.server.security.AuthConfig; @@ -116,7 +115,6 @@ private static void handleException(HttpServletResponse response, ObjectMapper o private final RequestLogger requestLogger; private final GenericQueryMetricsFactory queryMetricsFactory; private final Authenticator escalatingAuthenticator; - private final AvaticaConnectionBalancer avaticaConnectionBalancer; private HttpClient broadcastClient; @@ -131,8 +129,7 @@ public AsyncQueryForwardingServlet( ServiceEmitter emitter, RequestLogger requestLogger, GenericQueryMetricsFactory queryMetricsFactory, - AuthenticatorMapper authenticatorMapper, - AvaticaConnectionBalancer avaticaConnectionBalancer + AuthenticatorMapper authenticatorMapper ) { this.warehouse = warehouse; @@ -145,7 +142,6 @@ public AsyncQueryForwardingServlet( this.requestLogger = requestLogger; this.queryMetricsFactory = queryMetricsFactory; this.escalatingAuthenticator = authenticatorMapper.getEscalatingAuthenticator(); - this.avaticaConnectionBalancer = avaticaConnectionBalancer; } @Override @@ -196,21 +192,13 @@ protected void service(HttpServletRequest request, HttpServletResponse response) final boolean isAvatica = request.getRequestURI().startsWith("/druid/v2/sql/avatica"); if (isAvatica) { - byte[] requestContent = getRequestContent(request); - String connectionId = getAvaticaConnectionId(requestContent, objectMapper); - Server targetServer = avaticaConnectionBalancer.balance(hostFinder.getAllServers(), connectionId); - if (targetServer == null) { - throw new ISE("Cannot balance request with connectionId[%s], no brokers found.", connectionId); - } + Map requestMap = objectMapper.readValue(request.getInputStream(), Map.class); + String connectionId = getAvaticaConnectionId(requestMap); + Server targetServer = hostFinder.findServerAvatica(connectionId); + byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap); request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost()); request.setAttribute(SCHEME_ATTRIBUTE, targetServer.getScheme()); - request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestContent); - log.debug( - "Balancer class [%s] sending request with connectionId[%s] to server: %s", - avaticaConnectionBalancer.getClass(), - connectionId, - targetServer.getHost() - ); + request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); } else if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { // query cancellation request for (final Server server: hostFinder.getAllServers()) { @@ -401,18 +389,17 @@ public long getInterruptedQueryCount() return interruptedQueryCount.get(); } - private static byte[] getRequestContent(HttpServletRequest request) throws IOException + private static String getAvaticaConnectionId(Map requestMap) throws IOException { - int contentSize = request.getContentLength(); - byte[] content = new byte[contentSize]; - int bytesRead = request.getInputStream().read(content); - return content; - } + Object connectionIdObj = requestMap.get("connectionId"); + if (connectionIdObj == null) { + throw new IAE("Received an Avatica request without a connectionId."); + } + if (!(connectionIdObj instanceof String)) { + throw new IAE("Received an Avatica request with a non-String connectionId."); + } - private static String getAvaticaConnectionId(byte[] requestContent, ObjectMapper objectMapper) throws IOException - { - Map contentMap = objectMapper.readValue(requestContent, Map.class); - return (String) contentMap.get("connectionId"); + return (String) requestMap.get("connectionId"); } private class MetricsEmittingProxyResponseListener extends ProxyResponseListener diff --git a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java index 1dfe0cc1dc2d..8d858ecf4f7d 100644 --- a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java @@ -19,15 +19,20 @@ package io.druid.server.router; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.client.selector.Server; -import io.druid.guice.annotations.ExtensionPoint; import java.util.Collection; /** * An AvaticaConnectionBalancer balances Avatica connections across a collection of servers. */ -@ExtensionPoint +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RendezvousHashAvaticaConnectionBalancer.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "rendezvousHash", value = RendezvousHashAvaticaConnectionBalancer.class), + @JsonSubTypes.Type(name = "consistentHash", value = ConsistentHashAvaticaConnectionBalancer.class) +}) public interface AvaticaConnectionBalancer { /** diff --git a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java deleted file mode 100644 index fb3e2a33fb2d..000000000000 --- a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancerModule.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.router; - -import com.google.common.collect.Lists; -import com.google.inject.Binder; -import com.google.inject.Binding; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Module; -import com.google.inject.Provider; -import com.google.inject.TypeLiteral; -import com.google.inject.name.Named; -import com.google.inject.name.Names; -import io.druid.guice.LazySingleton; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.logger.Logger; - -import java.lang.annotation.Annotation; -import java.util.List; -import java.util.Properties; - -/** - */ -public class AvaticaConnectionBalancerModule implements Module -{ - private static final Logger log = new Logger(AvaticaConnectionBalancerModule.class); - private static final String BALANCER_PROPERTY = "druid.router.avatica.balancer"; - - private final Properties props; - - @Inject - public AvaticaConnectionBalancerModule( - Properties props - ) - { - this.props = props; - } - - @Override - public void configure(Binder binder) - { - String balancerType = props.getProperty(BALANCER_PROPERTY, ""); - binder.install(new ConsistentHashAvaticaConnectionBalancerModule()); - binder.install(new RendezvousHashAvaticaConnectionBalancerModule()); - binder.bind(AvaticaConnectionBalancer.class) - .toProvider(new AvaticaConnectionBalancerProvider(balancerType)) - .in(LazySingleton.class); - } - - private static class AvaticaConnectionBalancerProvider implements Provider - { - private final String balancerType; - - private AvaticaConnectionBalancer balancer = null; - - AvaticaConnectionBalancerProvider( - String balancerType - ) - { - this.balancerType = balancerType; - } - - @Inject - public void inject(Injector injector) - { - final List> balancerBindings = injector.findBindingsByType( - new TypeLiteral(){} - ); - - balancer = findBalancer(balancerType, balancerBindings); - - if (balancer == null) { - balancer = findBalancer(RendezvousHashAvaticaConnectionBalancerModule.BALANCER_TYPE, balancerBindings); - } - - if (balancer == null) { - List knownTypes = Lists.newArrayList(); - for (Binding binding : balancerBindings) { - final Annotation annotation = binding.getKey().getAnnotation(); - if (annotation != null) { - knownTypes.add(((Named) annotation).value()); - } - } - throw new ISE("Unknown balancer type[%s]=[%s], known types[%s]", BALANCER_PROPERTY, balancerType, knownTypes); - } - } - - private AvaticaConnectionBalancer findBalancer(String balancerType, List> balancerBindings) - { - for (Binding binding : balancerBindings) { - if (Names.named(balancerType).equals(binding.getKey().getAnnotation())) { - return binding.getProvider().get(); - } - } - return null; - } - - - @Override - public AvaticaConnectionBalancer get() - { - if (balancer == null) { - throw new ISE("AvaticaConnectionBalancer was null, that's bad!"); - } - return balancer; - } - } -} diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java index fdaecfde9ad6..ad1d647510cd 100644 --- a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java @@ -20,7 +20,6 @@ package io.druid.server.router; import io.druid.client.selector.Server; -import io.druid.server.ConsistentHasher; import java.util.Collection; import java.util.HashMap; diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java deleted file mode 100644 index 75d0384db6da..000000000000 --- a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancerModule.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.router; - -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.Provides; -import com.google.inject.name.Named; -import io.druid.guice.ManageLifecycle; - -public class ConsistentHashAvaticaConnectionBalancerModule implements Module -{ - public static final String BALANCER_TYPE = "consistentHash"; - - @Override - public void configure(Binder binder) - { - } - - @Provides - @ManageLifecycle - @Named(BALANCER_TYPE) - public AvaticaConnectionBalancer makeAvaticaConnectionBalancer() - { - return new ConsistentHashAvaticaConnectionBalancer(); - } -} diff --git a/server/src/main/java/io/druid/server/ConsistentHasher.java b/server/src/main/java/io/druid/server/router/ConsistentHasher.java similarity index 95% rename from server/src/main/java/io/druid/server/ConsistentHasher.java rename to server/src/main/java/io/druid/server/router/ConsistentHasher.java index f81b677314ba..7dea67ce3e98 100644 --- a/server/src/main/java/io/druid/server/ConsistentHasher.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHasher.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.server; +package io.druid.server.router; import com.google.common.base.Charsets; import com.google.common.hash.Funnel; @@ -33,9 +33,11 @@ import java.util.Map; import java.util.Set; -// Distributes objects across a set of node keys using consistent hashing -// See https://en.wikipedia.org/wiki/Consistent_hashing -// Not thread-safe. +/** + * Distributes objects across a set of node keys using consistent hashing. + * See https://en.wikipedia.org/wiki/Consistent_hashing + * Not thread-safe. + */ public class ConsistentHasher { private static final int REPLICATION_FACTOR = 128; diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java index 244bd523583f..ceb2a273d10e 100644 --- a/server/src/main/java/io/druid/server/router/QueryHostFinder.java +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -38,15 +38,18 @@ public class QueryHostFinder private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class); private final TieredBrokerHostSelector hostSelector; + private final AvaticaConnectionBalancer avaticaConnectionBalancer; private final ConcurrentHashMap serverBackup = new ConcurrentHashMap<>(); @Inject public QueryHostFinder( - TieredBrokerHostSelector hostSelector + TieredBrokerHostSelector hostSelector, + AvaticaConnectionBalancer avaticaConnectionBalancer ) { this.hostSelector = hostSelector; + this.avaticaConnectionBalancer = avaticaConnectionBalancer; } public Server findServer(Query query) @@ -68,6 +71,25 @@ public Collection getAllServers() .collect(Collectors.toList()); } + public Server findServerAvatica(String connectionId) + { + Server chosenServer = avaticaConnectionBalancer.balance(getAllServers(), connectionId); + if (chosenServer == null) { + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + + throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId); + } + log.info( + "Balancer class [%s] sending request with connectionId[%s] to server: %s", + avaticaConnectionBalancer.getClass(), + connectionId, + chosenServer.getHost() + ); + return chosenServer; + } + public Server getServer(Query query) { Server server = findServer(query); diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java index dfc44de36dc3..2cba55529579 100644 --- a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java @@ -20,7 +20,6 @@ package io.druid.server.router; import io.druid.client.selector.Server; -import io.druid.server.RendezvousHasher; import java.util.Collection; import java.util.HashMap; diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java deleted file mode 100644 index ae0b748c386d..000000000000 --- a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancerModule.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.router; - -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.Provides; -import com.google.inject.name.Named; -import io.druid.guice.ManageLifecycle; - -public class RendezvousHashAvaticaConnectionBalancerModule implements Module -{ - public static final String BALANCER_TYPE = "rendezvousHash"; - - @Override - public void configure(Binder binder) - { - } - - @Provides - @ManageLifecycle - @Named(BALANCER_TYPE) - public AvaticaConnectionBalancer makeAvaticaConnectionBalancer() - { - return new RendezvousHashAvaticaConnectionBalancer(); - } -} diff --git a/server/src/main/java/io/druid/server/RendezvousHasher.java b/server/src/main/java/io/druid/server/router/RendezvousHasher.java similarity index 91% rename from server/src/main/java/io/druid/server/RendezvousHasher.java rename to server/src/main/java/io/druid/server/router/RendezvousHasher.java index d5ca46fe043c..63fddd83a000 100644 --- a/server/src/main/java/io/druid/server/RendezvousHasher.java +++ b/server/src/main/java/io/druid/server/router/RendezvousHasher.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.server; +package io.druid.server.router; import com.google.common.base.Charsets; import com.google.common.collect.Lists; @@ -31,12 +31,13 @@ import java.util.List; import java.util.Set; -// Distributes objects across a set of node keys using rendezvous hashing -// See https://en.wikipedia.org/wiki/Rendezvous_hashing +/** + * Distributes objects across a set of node keys using rendezvous hashing + * See https://en.wikipedia.org/wiki/Rendezvous_hashing + */ public class RendezvousHasher { - private static final int HASH_FN_SEED = 9999; - private static final HashFunction HASH_FN = Hashing.murmur3_128(HASH_FN_SEED); + private static final HashFunction HASH_FN = Hashing.murmur3_128(); public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8); diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java index bec220f3d67c..329ac6688c8f 100644 --- a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -209,7 +209,7 @@ public void initialize(Server server, Injector injector) final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - final QueryHostFinder hostFinder = new QueryHostFinder(null) + final QueryHostFinder hostFinder = new QueryHostFinder(null, new RendezvousHashAvaticaConnectionBalancer()) { @Override public io.druid.client.selector.Server getServer(Query query) @@ -253,8 +253,7 @@ public void log(RequestLogLine requestLogLine) throws IOException } }, new DefaultGenericQueryMetricsFactory(jsonMapper), - AuthTestUtils.TEST_AUTHENTICATOR_MAPPER, - new RendezvousHashAvaticaConnectionBalancer() + AuthTestUtils.TEST_AUTHENTICATOR_MAPPER ) { @Override diff --git a/server/src/test/java/io/druid/server/ConsistentHasherTest.java b/server/src/test/java/io/druid/server/ConsistentHasherTest.java index 373349bd5804..7b42b0b02c3f 100644 --- a/server/src/test/java/io/druid/server/ConsistentHasherTest.java +++ b/server/src/test/java/io/druid/server/ConsistentHasherTest.java @@ -23,6 +23,7 @@ import com.google.common.hash.Hashing; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; +import io.druid.server.router.ConsistentHasher; import org.junit.Assert; import org.junit.Test; diff --git a/server/src/test/java/io/druid/server/RendezvousHasherTest.java b/server/src/test/java/io/druid/server/RendezvousHasherTest.java index 3fef4633a43e..4b32b93efb6d 100644 --- a/server/src/test/java/io/druid/server/RendezvousHasherTest.java +++ b/server/src/test/java/io/druid/server/RendezvousHasherTest.java @@ -21,6 +21,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; +import io.druid.server.router.RendezvousHasher; import org.junit.Assert; import org.junit.Test; diff --git a/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java index 78e22f159f81..2e175800c2db 100644 --- a/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java +++ b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java @@ -90,7 +90,8 @@ public void tearDown() throws Exception public void testFindServer() throws Exception { QueryHostFinder queryRunner = new QueryHostFinder( - brokerSelector + brokerSelector, + new RendezvousHashAvaticaConnectionBalancer() ); Server server = queryRunner.findServer( diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index d400de142c95..6f35939bbc89 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -50,6 +50,7 @@ import io.druid.server.http.RouterResource; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.metrics.QueryCountStatsProvider; +import io.druid.server.router.AvaticaConnectionBalancer; import io.druid.server.router.CoordinatorRuleManager; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; @@ -94,6 +95,7 @@ public void configure(Binder binder) binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9088); JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class); + JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class); binder.bind(CoordinatorRuleManager.class); LifecycleModule.register(binder, CoordinatorRuleManager.class); From 91abdfd1a8ca9d24ed10be6d17a5e87a936d64fe Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 20 Oct 2017 11:26:37 -0700 Subject: [PATCH 3/8] Adjust test bounds --- .../test/java/io/druid/server/ConsistentHasherTest.java | 8 ++++---- .../test/java/io/druid/server/RendezvousHasherTest.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/io/druid/server/ConsistentHasherTest.java b/server/src/test/java/io/druid/server/ConsistentHasherTest.java index 7b42b0b02c3f..2e235b25bbd4 100644 --- a/server/src/test/java/io/druid/server/ConsistentHasherTest.java +++ b/server/src/test/java/io/druid/server/ConsistentHasherTest.java @@ -165,7 +165,7 @@ public void testInconsistentView1() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.3); + testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.33); } @Test @@ -183,7 +183,7 @@ public void testInconsistentView2() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.45); + testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.50); } @Test @@ -199,7 +199,7 @@ public void testInconsistentView3() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.6); + testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.66); } @Test @@ -214,7 +214,7 @@ public void testInconsistentView4() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.85); + testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.95); } public void testInconsistentViewHelper( diff --git a/server/src/test/java/io/druid/server/RendezvousHasherTest.java b/server/src/test/java/io/druid/server/RendezvousHasherTest.java index 4b32b93efb6d..c171c2b51921 100644 --- a/server/src/test/java/io/druid/server/RendezvousHasherTest.java +++ b/server/src/test/java/io/druid/server/RendezvousHasherTest.java @@ -155,7 +155,7 @@ public void testInconsistentView1() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.25); + testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.33); } @Test @@ -173,7 +173,7 @@ public void testInconsistentView2() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.5); + testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.55); } @Test @@ -189,7 +189,7 @@ public void testInconsistentView3() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.6); + testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.66); } @Test @@ -204,7 +204,7 @@ public void testInconsistentView4() throws Exception nodes2.add("localhost:4"); nodes2.add("localhost:5"); - testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.85); + testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.95); } public void testInconsistentViewHelper( From 1d1ff5686d129025fe0dcc278ac7d0d9f3525095 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 20 Oct 2017 13:09:45 -0700 Subject: [PATCH 4/8] PR comments --- .../benchmark/ConsistentHasherBenchmark.java | 3 ++- .../benchmark/RendezvousHasherBenchmark.java | 3 ++- .../ConsistentHashAvaticaConnectionBalancer.java | 3 ++- .../io/druid/server/router/ConsistentHasher.java | 15 +++------------ .../io/druid/server/router/QueryHostFinder.java | 2 +- .../RendezvousHashAvaticaConnectionBalancer.java | 3 ++- .../io/druid/server/router/RendezvousHasher.java | 15 +++------------ .../io/druid/server/ConsistentHasherTest.java | 16 ++++++++-------- .../io/druid/server/RendezvousHasherTest.java | 16 ++++++++-------- 9 files changed, 31 insertions(+), 45 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java index 6731a2a7c330..a2d3597c0f27 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java @@ -21,6 +21,7 @@ import com.google.common.collect.Sets; import com.google.common.hash.Hashing;; +import io.druid.java.util.common.StringUtils; import io.druid.server.router.ConsistentHasher; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -82,7 +83,7 @@ public void setup() throws IOException public void hash(Blackhole blackhole) throws Exception { for (String uuid : uuids) { - String server = hasher.hash(uuid, ConsistentHasher.STRING_FUNNEL); + String server = hasher.hash(StringUtils.toUtf8(uuid)); blackhole.consume(server); } } diff --git a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java index c07cd19a2836..92dbfb2c8100 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java @@ -20,6 +20,7 @@ package io.druid.benchmark; import com.google.common.collect.Sets; +import io.druid.java.util.common.StringUtils; import io.druid.server.router.RendezvousHasher; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -80,7 +81,7 @@ public void setup() throws IOException public void hash(Blackhole blackhole) throws Exception { for (String uuid : uuids) { - String server = hasher.chooseNode(servers, uuid, RendezvousHasher.STRING_FUNNEL); + String server = hasher.chooseNode(servers, StringUtils.toUtf8(uuid)); blackhole.consume(server); } } diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java index ad1d647510cd..87bfcf1e3b95 100644 --- a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java @@ -20,6 +20,7 @@ package io.druid.server.router; import io.druid.client.selector.Server; +import io.druid.java.util.common.StringUtils; import java.util.Collection; import java.util.HashMap; @@ -42,7 +43,7 @@ public Server balance(Collection servers, String connectionId) } hasher.updateKeys(serverMap.keySet()); - String chosenServer = hasher.hashStr(connectionId); + String chosenServer = hasher.hash(StringUtils.toUtf8(connectionId)); return serverMap.get(chosenServer); } } diff --git a/server/src/main/java/io/druid/server/router/ConsistentHasher.java b/server/src/main/java/io/druid/server/router/ConsistentHasher.java index 7dea67ce3e98..e61813ac2723 100644 --- a/server/src/main/java/io/druid/server/router/ConsistentHasher.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHasher.java @@ -20,8 +20,6 @@ package io.druid.server.router; import com.google.common.base.Charsets; -import com.google.common.hash.Funnel; -import com.google.common.hash.Funnels; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; @@ -41,9 +39,7 @@ public class ConsistentHasher { private static final int REPLICATION_FACTOR = 128; - private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128(9999); - - public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8); + private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128(); private final Long2ObjectRBTreeMap nodeKeySlots = new Long2ObjectRBTreeMap<>(); { @@ -80,18 +76,13 @@ public void updateKeys(Set currentKeys) previousKeys = new HashSet<>(currentKeys); } - public String hashStr(String str) - { - return hash(str, STRING_FUNNEL); - } - - public String hash(T obj, Funnel funnel) + public String hash(byte[] obj) { if (nodeKeySlots.size() == 0) { return null; } - long objHash = hashFn.hashObject(obj, funnel).asLong(); + long objHash = hashFn.hashBytes(obj).asLong(); Long2ObjectSortedMap subMap = nodeKeySlots.tailMap(objHash); if (subMap.isEmpty()) { diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java index ceb2a273d10e..2570106e987c 100644 --- a/server/src/main/java/io/druid/server/router/QueryHostFinder.java +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -81,7 +81,7 @@ public Server findServerAvatica(String connectionId) throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId); } - log.info( + log.debug( "Balancer class [%s] sending request with connectionId[%s] to server: %s", avaticaConnectionBalancer.getClass(), connectionId, diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java index 2cba55529579..a252d6e3cbda 100644 --- a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java @@ -20,6 +20,7 @@ package io.druid.server.router; import io.druid.client.selector.Server; +import io.druid.java.util.common.StringUtils; import java.util.Collection; import java.util.HashMap; @@ -40,7 +41,7 @@ public Server balance(Collection servers, String connectionId) for (Server server : servers) { serverMap.put(server.getHost(), server); } - String chosenServerId = hasher.chooseNode(serverMap.keySet(), connectionId, RendezvousHasher.STRING_FUNNEL); + String chosenServerId = hasher.chooseNode(serverMap.keySet(), StringUtils.toUtf8(connectionId)); return serverMap.get(chosenServerId); } } diff --git a/server/src/main/java/io/druid/server/router/RendezvousHasher.java b/server/src/main/java/io/druid/server/router/RendezvousHasher.java index 63fddd83a000..95fc5c72e38d 100644 --- a/server/src/main/java/io/druid/server/router/RendezvousHasher.java +++ b/server/src/main/java/io/druid/server/router/RendezvousHasher.java @@ -21,8 +21,6 @@ import com.google.common.base.Charsets; import com.google.common.collect.Lists; -import com.google.common.hash.Funnel; -import com.google.common.hash.Funnels; import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -39,14 +37,7 @@ public class RendezvousHasher { private static final HashFunction HASH_FN = Hashing.murmur3_128(); - public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8); - - public String chooseNode(Set nodeIds, String key) - { - return chooseNode(nodeIds, key, STRING_FUNNEL); - } - - public String chooseNode(Set nodeIds, KeyType key, Funnel funnel) + public String chooseNode(Set nodeIds, byte[] key) { if (nodeIds.isEmpty()) { return null; @@ -56,8 +47,8 @@ public String chooseNode(Set nodeIds, KeyType key, Funnel hashes = Lists.newArrayList(nodeHash, keyHash); HashCode combinedHash = Hashing.combineOrdered(hashes); weights.put(combinedHash.asLong(), nodeId); diff --git a/server/src/test/java/io/druid/server/ConsistentHasherTest.java b/server/src/test/java/io/druid/server/ConsistentHasherTest.java index 2e235b25bbd4..98936a918d2a 100644 --- a/server/src/test/java/io/druid/server/ConsistentHasherTest.java +++ b/server/src/test/java/io/druid/server/ConsistentHasherTest.java @@ -56,14 +56,14 @@ public void testBasic() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hashStr(objectId.toString()); + String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } // check that the same UUIDs hash to the same servers on subsequent hashStr() calls for (int i = 0; i < 2; i++) { for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.hashStr(entry.getKey()); + String targetServer = hasher.hash(StringUtils.toUtf8(entry.getKey())); Assert.assertEquals(entry.getValue(), targetServer); } } @@ -85,7 +85,7 @@ public void testAddNode() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hashStr(objectId.toString()); + String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -95,7 +95,7 @@ public void testAddNode() throws Exception int same = 0; int diff = 0; for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.hashStr(entry.getKey()); + String targetServer = hasher.hash(StringUtils.toUtf8(entry.getKey())); if (entry.getValue().equals(targetServer)) { same += 1; } else { @@ -125,7 +125,7 @@ public void testRemoveNode() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hashStr(objectId.toString()); + String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -135,7 +135,7 @@ public void testRemoveNode() throws Exception int same = 0; int diff = 0; for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.hashStr(entry.getKey()); + String targetServer = hasher.hash(StringUtils.toUtf8(entry.getKey())); if (entry.getValue().equals(targetServer)) { same += 1; } else { @@ -230,7 +230,7 @@ public void testInconsistentViewHelper( Map uuidServerMap = new HashMap<>(); for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hashStr(objectId.toString()); + String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -239,7 +239,7 @@ public void testInconsistentViewHelper( Map uuidServerMap2 = new HashMap<>(); for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher2.hashStr(entry.getKey()); + String targetServer = hasher2.hash(StringUtils.toUtf8(entry.getKey())); uuidServerMap2.put(entry.getKey(), targetServer); } diff --git a/server/src/test/java/io/druid/server/RendezvousHasherTest.java b/server/src/test/java/io/druid/server/RendezvousHasherTest.java index c171c2b51921..3e3274c2fc02 100644 --- a/server/src/test/java/io/druid/server/RendezvousHasherTest.java +++ b/server/src/test/java/io/druid/server/RendezvousHasherTest.java @@ -52,14 +52,14 @@ public void testBasic() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.chooseNode(nodes, objectId.toString()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } // check that the same UUIDs hash to the same servers on subsequent hashStr() calls for (int i = 0; i < 2; i++) { for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.chooseNode(nodes, entry.getKey()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey())); Assert.assertEquals(entry.getValue(), targetServer); } } @@ -80,7 +80,7 @@ public void testAddNode() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.chooseNode(nodes, objectId.toString()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -89,7 +89,7 @@ public void testAddNode() throws Exception int same = 0; int diff = 0; for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.chooseNode(nodes, entry.getKey()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey())); if (entry.getValue().equals(targetServer)) { same += 1; } else { @@ -117,7 +117,7 @@ public void testRemoveNode() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.chooseNode(nodes, objectId.toString()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -126,7 +126,7 @@ public void testRemoveNode() throws Exception int same = 0; int diff = 0; for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.chooseNode(nodes, entry.getKey()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey())); if (entry.getValue().equals(targetServer)) { same += 1; } else { @@ -218,14 +218,14 @@ public void testInconsistentViewHelper( Map uuidServerMap = new HashMap<>(); for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.chooseNode(nodes, objectId.toString()); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } RendezvousHasher hasher2 = new RendezvousHasher(); Map uuidServerMap2 = new HashMap<>(); for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher2.chooseNode(nodes2, entry.getKey()); + String targetServer = hasher2.chooseNode(nodes2, StringUtils.toUtf8(entry.getKey())); uuidServerMap2.put(entry.getKey(), targetServer); } From c371f6043391b337b77ab5185ac8f705c16434f5 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 25 Oct 2017 14:15:10 -0700 Subject: [PATCH 5/8] Add doc comments --- docs/content/development/router.md | 2 ++ docs/content/querying/sql.md | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/docs/content/development/router.md b/docs/content/development/router.md index 707528027422..98e6220ea974 100644 --- a/docs/content/development/router.md +++ b/docs/content/development/router.md @@ -128,6 +128,8 @@ All Avatica JDBC requests with a given connection ID must be routed to the same To accomplish this, Druid provides two built-in balancers that use rendezvous hashing and consistent hashing of a request's connection ID respectively to assign requests to brokers. +Note that when multiple routers are used, all routers should have identical balancer configuration to ensure that they make the same routing decisions. + ### Rendezvous Hash Balancer This balancer uses [Rendezvous Hashing](https://en.wikipedia.org/wiki/Rendezvous_hashing) on an Avatica request's connection ID to assign the request to a broker. diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 189172ae0a06..b315ea1eb81b 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -388,10 +388,16 @@ Table metadata is available over JDBC using `connection.getMetaData()` or by que ["INFORMATION_SCHEMA" tables](#retrieving-metadata). Parameterized queries (using `?` or other placeholders) don't work properly, so avoid those. +#### Connection Stickiness + Druid's JDBC server does not share connection state between brokers. This means that if you're using JDBC and have multiple Druid brokers, you should either connect to a specific broker, or use a load balancer with sticky sessions enabled. +The Druid Router node provides connection stickiness when balancing JDBC requests. Please see [Router](../development/router.html) documentation for more details. + +Note that the non-JDBC [JSON over HTTP](#json-over-http) API is stateless and does not require stickiness. + ### Connection context Druid SQL supports setting connection parameters on the client. The parameters in the table below affect SQL planning. From a96c4a8e42bc664025b263bd3cfabc6e95268077 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 30 Oct 2017 18:12:41 -0700 Subject: [PATCH 6/8] PR comments --- .../benchmark/ConsistentHasherBenchmark.java | 13 +++++++++---- .../benchmark/RendezvousHasherBenchmark.java | 7 ++++++- .../server/AsyncQueryForwardingServlet.java | 2 +- .../router/AvaticaConnectionBalancer.java | 5 +++-- ...nsistentHashAvaticaConnectionBalancer.java | 4 ++-- .../druid/server/router/ConsistentHasher.java | 2 +- .../druid/server/router/QueryHostFinder.java | 2 +- ...ndezvousHashAvaticaConnectionBalancer.java | 2 +- .../druid/server/router/RendezvousHasher.java | 19 ++++++++++++------- .../io/druid/server/ConsistentHasherTest.java | 18 +++++++++--------- 10 files changed, 45 insertions(+), 29 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java index a2d3597c0f27..8f6ec5425e6d 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java @@ -20,7 +20,7 @@ package io.druid.benchmark; import com.google.common.collect.Sets; -import com.google.common.hash.Hashing;; +; import io.druid.java.util.common.StringUtils; import io.druid.server.router.ConsistentHasher; import org.openjdk.jmh.annotations.Benchmark; @@ -59,14 +59,19 @@ public class ConsistentHasherBenchmark @Setup public void setup() throws IOException { - hasher = new ConsistentHasher(Hashing.murmur3_128(9999)); + hasher = new ConsistentHasher(null); uuids = new ArrayList<>(); servers = Sets.newHashSet( "localhost:1", "localhost:2", "localhost:3", "localhost:4", - "localhost:5" + "localhost:5", + "localhost:6", + "localhost:7", + "localhost:8", + "localhost:9", + "localhost:10" ); for (int i = 0; i < numIds; i++) { @@ -83,7 +88,7 @@ public void setup() throws IOException public void hash(Blackhole blackhole) throws Exception { for (String uuid : uuids) { - String server = hasher.hash(StringUtils.toUtf8(uuid)); + String server = hasher.findKey(StringUtils.toUtf8(uuid)); blackhole.consume(server); } } diff --git a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java index 92dbfb2c8100..20c8373cc892 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java @@ -65,7 +65,12 @@ public void setup() throws IOException "localhost:2", "localhost:3", "localhost:4", - "localhost:5" + "localhost:5", + "localhost:6", + "localhost:7", + "localhost:8", + "localhost:9", + "localhost:10" ); diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 04b20c6e0cfe..d16673660e93 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -399,7 +399,7 @@ private static String getAvaticaConnectionId(Map requestMap) thr throw new IAE("Received an Avatica request with a non-String connectionId."); } - return (String) requestMap.get("connectionId"); + return (String) connectionIdObj; } private class MetricsEmittingProxyResponseListener extends ProxyResponseListener diff --git a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java index 8d858ecf4f7d..c980d151f51b 100644 --- a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java @@ -38,7 +38,8 @@ public interface AvaticaConnectionBalancer /** * @param servers Servers to balance across * @param connectionId Connection ID to be balanced - * @return Server that connectionId should be assigned to + * @return Server that connectionId should be assigned to. The process for choosing a server must be deterministic and + * sticky (with a fixed set of servers, the same connectionId should always be assigned to the same server) */ - Server balance(Collection servers, String connectionId); + Server pickServer(Collection servers, String connectionId); } diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java index 87bfcf1e3b95..4245e39ae6a6 100644 --- a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java @@ -31,7 +31,7 @@ public class ConsistentHashAvaticaConnectionBalancer implements AvaticaConnectio private final ConsistentHasher hasher = new ConsistentHasher(null); @Override - public Server balance(Collection servers, String connectionId) + public Server pickServer(Collection servers, String connectionId) { synchronized (hasher) { if (servers.isEmpty()) { @@ -43,7 +43,7 @@ public Server balance(Collection servers, String connectionId) } hasher.updateKeys(serverMap.keySet()); - String chosenServer = hasher.hash(StringUtils.toUtf8(connectionId)); + String chosenServer = hasher.findKey(StringUtils.toUtf8(connectionId)); return serverMap.get(chosenServer); } } diff --git a/server/src/main/java/io/druid/server/router/ConsistentHasher.java b/server/src/main/java/io/druid/server/router/ConsistentHasher.java index e61813ac2723..9c2b42eba1fc 100644 --- a/server/src/main/java/io/druid/server/router/ConsistentHasher.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHasher.java @@ -76,7 +76,7 @@ public void updateKeys(Set currentKeys) previousKeys = new HashSet<>(currentKeys); } - public String hash(byte[] obj) + public String findKey(byte[] obj) { if (nodeKeySlots.size() == 0) { return null; diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java index 2570106e987c..55fcfd6ffc8b 100644 --- a/server/src/main/java/io/druid/server/router/QueryHostFinder.java +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -73,7 +73,7 @@ public Collection getAllServers() public Server findServerAvatica(String connectionId) { - Server chosenServer = avaticaConnectionBalancer.balance(getAllServers(), connectionId); + Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId); if (chosenServer == null) { log.makeAlert( "Catastrophic failure! No servers found at all! Failing request!" diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java index a252d6e3cbda..f4b7b400b358 100644 --- a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java +++ b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java @@ -31,7 +31,7 @@ public class RendezvousHashAvaticaConnectionBalancer implements AvaticaConnectio private final RendezvousHasher hasher = new RendezvousHasher(); @Override - public Server balance(Collection servers, String connectionId) + public Server pickServer(Collection servers, String connectionId) { if (servers.isEmpty()) { return null; diff --git a/server/src/main/java/io/druid/server/router/RendezvousHasher.java b/server/src/main/java/io/druid/server/router/RendezvousHasher.java index 95fc5c72e38d..170573ef2d96 100644 --- a/server/src/main/java/io/druid/server/router/RendezvousHasher.java +++ b/server/src/main/java/io/druid/server/router/RendezvousHasher.java @@ -24,7 +24,6 @@ import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; -import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; import java.util.List; import java.util.Set; @@ -43,17 +42,23 @@ public String chooseNode(Set nodeIds, byte[] key) return null; } - Long2ObjectRBTreeMap weights = new Long2ObjectRBTreeMap<>(); - weights.defaultReturnValue(null); + final HashCode keyHash = HASH_FN.hashBytes(key); + long maxHash = Long.MIN_VALUE; + String maxNode = null; for (String nodeId : nodeIds) { - HashCode keyHash = HASH_FN.hashBytes(key); HashCode nodeHash = HASH_FN.hashString(nodeId, Charsets.UTF_8); List hashes = Lists.newArrayList(nodeHash, keyHash); - HashCode combinedHash = Hashing.combineOrdered(hashes); - weights.put(combinedHash.asLong(), nodeId); + long combinedHash = Hashing.combineOrdered(hashes).asLong(); + if (maxNode == null) { + maxHash = combinedHash; + maxNode = nodeId; + } else if (combinedHash > maxHash) { + maxHash = combinedHash; + maxNode = nodeId; + } } - return weights.get(weights.lastLongKey()); + return maxNode; } } diff --git a/server/src/test/java/io/druid/server/ConsistentHasherTest.java b/server/src/test/java/io/druid/server/ConsistentHasherTest.java index 98936a918d2a..ba92f8dec893 100644 --- a/server/src/test/java/io/druid/server/ConsistentHasherTest.java +++ b/server/src/test/java/io/druid/server/ConsistentHasherTest.java @@ -35,7 +35,7 @@ public class ConsistentHasherTest { - private static HashFunction TEST_HASH_FN = Hashing.murmur3_128(9999); + private static HashFunction TEST_HASH_FN = Hashing.murmur3_128(); private static int NUM_ITERATIONS = 10000; private static final Logger log = new Logger(ConsistentHasherTest.class); @@ -56,14 +56,14 @@ public void testBasic() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } // check that the same UUIDs hash to the same servers on subsequent hashStr() calls for (int i = 0; i < 2; i++) { for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.hash(StringUtils.toUtf8(entry.getKey())); + String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey())); Assert.assertEquals(entry.getValue(), targetServer); } } @@ -85,7 +85,7 @@ public void testAddNode() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -95,7 +95,7 @@ public void testAddNode() throws Exception int same = 0; int diff = 0; for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.hash(StringUtils.toUtf8(entry.getKey())); + String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey())); if (entry.getValue().equals(targetServer)) { same += 1; } else { @@ -125,7 +125,7 @@ public void testRemoveNode() throws Exception for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -135,7 +135,7 @@ public void testRemoveNode() throws Exception int same = 0; int diff = 0; for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher.hash(StringUtils.toUtf8(entry.getKey())); + String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey())); if (entry.getValue().equals(targetServer)) { same += 1; } else { @@ -230,7 +230,7 @@ public void testInconsistentViewHelper( Map uuidServerMap = new HashMap<>(); for (int i = 0; i < NUM_ITERATIONS; i++) { UUID objectId = UUID.randomUUID(); - String targetServer = hasher.hash(StringUtils.toUtf8(objectId.toString())); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); uuidServerMap.put(objectId.toString(), targetServer); } @@ -239,7 +239,7 @@ public void testInconsistentViewHelper( Map uuidServerMap2 = new HashMap<>(); for (Map.Entry entry : uuidServerMap.entrySet()) { - String targetServer = hasher2.hash(StringUtils.toUtf8(entry.getKey())); + String targetServer = hasher2.findKey(StringUtils.toUtf8(entry.getKey())); uuidServerMap2.put(entry.getKey(), targetServer); } From ca6e262c52f16dcda7125e842def56b34400d6c6 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 30 Oct 2017 19:15:05 -0700 Subject: [PATCH 7/8] PR comment --- .../src/main/java/io/druid/server/router/ConsistentHasher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/io/druid/server/router/ConsistentHasher.java b/server/src/main/java/io/druid/server/router/ConsistentHasher.java index 9c2b42eba1fc..f3fee3024bea 100644 --- a/server/src/main/java/io/druid/server/router/ConsistentHasher.java +++ b/server/src/main/java/io/druid/server/router/ConsistentHasher.java @@ -38,6 +38,7 @@ */ public class ConsistentHasher { + // Determined through tests to provide reasonably equal balancing on a test set of 5-10 brokers private static final int REPLICATION_FACTOR = 128; private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128(); From 7a0a680195ca792743194c9eea63ea8bacf2acca Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 31 Oct 2017 12:14:21 -0700 Subject: [PATCH 8/8] Checkstyle fix --- .../main/java/io/druid/benchmark/ConsistentHasherBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java index 8f6ec5425e6d..639a3f518ba0 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java @@ -20,7 +20,7 @@ package io.druid.benchmark; import com.google.common.collect.Sets; -; + import io.druid.java.util.common.StringUtils; import io.druid.server.router.ConsistentHasher; import org.openjdk.jmh.annotations.Benchmark;