diff --git a/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java new file mode 100644 index 000000000000..639a3f518ba0 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/ConsistentHasherBenchmark.java @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.collect.Sets; + +import io.druid.java.util.common.StringUtils; +import io.druid.server.router.ConsistentHasher; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 15) +@Measurement(iterations = 30) +public class ConsistentHasherBenchmark +{ + @Param({"100000"}) + int numIds; + + ConsistentHasher hasher; + List uuids; + Set servers; + + @Setup + public void setup() throws IOException + { + hasher = new ConsistentHasher(null); + uuids = new ArrayList<>(); + servers = Sets.newHashSet( + "localhost:1", + "localhost:2", + "localhost:3", + "localhost:4", + "localhost:5", + "localhost:6", + "localhost:7", + "localhost:8", + "localhost:9", + "localhost:10" + ); + + for (int i = 0; i < numIds; i++) { + UUID uuid = UUID.randomUUID(); + uuids.add(uuid.toString()); + } + + hasher.updateKeys(servers); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void hash(Blackhole blackhole) throws Exception + { + for (String uuid : uuids) { + String server = hasher.findKey(StringUtils.toUtf8(uuid)); + blackhole.consume(server); + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java new file mode 100644 index 000000000000..20c8373cc892 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/RendezvousHasherBenchmark.java @@ -0,0 +1,93 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.collect.Sets; +import io.druid.java.util.common.StringUtils; +import io.druid.server.router.RendezvousHasher; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 15) +@Measurement(iterations = 30) +public class RendezvousHasherBenchmark +{ + @Param({"100000"}) + int numIds; + + RendezvousHasher hasher; + List uuids; + Set servers; + + @Setup + public void setup() throws IOException + { + hasher = new RendezvousHasher(); + uuids = new ArrayList<>(); + servers = Sets.newHashSet( + "localhost:1", + "localhost:2", + "localhost:3", + "localhost:4", + "localhost:5", + "localhost:6", + "localhost:7", + "localhost:8", + "localhost:9", + "localhost:10" + ); + + + for (int i = 0; i < numIds; i++) { + UUID uuid = UUID.randomUUID(); + uuids.add(uuid.toString()); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void hash(Blackhole blackhole) throws Exception + { + for (String uuid : uuids) { + String server = hasher.chooseNode(servers, StringUtils.toUtf8(uuid)); + blackhole.consume(server); + } + } +} diff --git a/docs/content/development/router.md b/docs/content/development/router.md index 63ba585d4427..98e6220ea974 100644 --- a/docs/content/development/router.md +++ b/docs/content/development/router.md @@ -75,6 +75,7 @@ The router module uses several of the default modules in [Configuration](../conf |`druid.router.coordinatorServiceName`|Any string.|The service discovery name of the coordinator.|druid/coordinator| |`druid.router.pollPeriod`|Any ISO8601 duration.|How often to poll for new rules.|PT1M| |`druid.router.strategies`|An ordered JSON array of objects.|All custom strategies to use for routing.|[{"type":"timeBoundary"},{"type":"priority"}]| +|`druid.router.avatica.balancer.type`|String representing an AvaticaConnectionBalancer name|Class to use for balancing Avatica queries across brokers|rendezvousHash| Router Strategies ----------------- @@ -119,6 +120,40 @@ Allows defining arbitrary routing rules using a JavaScript function. The functio JavaScript-based functionality is disabled by default. Please refer to the Druid JavaScript programming guide for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it. + +Avatica Query Balancing +-------------- + +All Avatica JDBC requests with a given connection ID must be routed to the same broker, since Druid brokers do not share connection state with each other. + +To accomplish this, Druid provides two built-in balancers that use rendezvous hashing and consistent hashing of a request's connection ID respectively to assign requests to brokers. + +Note that when multiple routers are used, all routers should have identical balancer configuration to ensure that they make the same routing decisions. + +### Rendezvous Hash Balancer + +This balancer uses [Rendezvous Hashing](https://en.wikipedia.org/wiki/Rendezvous_hashing) on an Avatica request's connection ID to assign the request to a broker. + +To use this balancer, specify the following property: + +``` +druid.router.avatica.balancer.type=rendezvousHash +``` + +If no `druid.router.avatica.balancer` property is set, the Router will also default to using the Rendezvous Hash Balancer. + +### Consistent Hash Balancer + +This balancer uses [Consistent Hashing](https://en.wikipedia.org/wiki/Consistent_hashing) on an Avatica request's connection ID to assign the request to a broker. + +To use this balancer, specify the following property: + +``` +druid.router.avatica.balancer.type=consistentHash +``` + +This is a non-default implementation that is provided for experimentation purposes. The consistent hasher has longer setup times on initialization and when the set of brokers changes, but has a faster broker assignment time than the rendezous hasher when tested with 5 brokers. Benchmarks for both implementations have been provided in `ConsistentHasherBenchmark` and `RendezvousHasherBenchmark`. The consistent hasher also requires locking, while the rendezvous hasher does not. + HTTP Endpoints -------------- diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 189172ae0a06..b315ea1eb81b 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -388,10 +388,16 @@ Table metadata is available over JDBC using `connection.getMetaData()` or by que ["INFORMATION_SCHEMA" tables](#retrieving-metadata). Parameterized queries (using `?` or other placeholders) don't work properly, so avoid those. +#### Connection Stickiness + Druid's JDBC server does not share connection state between brokers. This means that if you're using JDBC and have multiple Druid brokers, you should either connect to a specific broker, or use a load balancer with sticky sessions enabled. +The Druid Router node provides connection stickiness when balancing JDBC requests. Please see [Router](../development/router.html) documentation for more details. + +Note that the non-JDBC [JSON over HTTP](#json-over-http) API is stateless and does not require stickiness. + ### Connection context Druid SQL supports setting connection parameters on the client. The parameters in the table below affect SQL planning. diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index c7f2b1167b3b..d16673660e93 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -33,6 +33,7 @@ import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.IAE; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; @@ -62,6 +63,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLDecoder; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -78,6 +80,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host"; private static final String SCHEME_ATTRIBUTE = "io.druid.proxy.to.host.scheme"; private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query"; + private static final String AVATICA_QUERY_ATTRIBUTE = "io.druid.proxy.avaticaQuery"; private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper"; private static final int CANCELLATION_TIMEOUT_MILLIS = 500; @@ -186,7 +189,17 @@ protected void service(HttpServletRequest request, HttpServletResponse response) final boolean isQueryEndpoint = request.getRequestURI().startsWith("/druid/v2") && !request.getRequestURI().startsWith("/druid/v2/sql"); - if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { + final boolean isAvatica = request.getRequestURI().startsWith("/druid/v2/sql/avatica"); + + if (isAvatica) { + Map requestMap = objectMapper.readValue(request.getInputStream(), Map.class); + String connectionId = getAvaticaConnectionId(requestMap); + Server targetServer = hostFinder.findServerAvatica(connectionId); + byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap); + request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost()); + request.setAttribute(SCHEME_ATTRIBUTE, targetServer.getScheme()); + request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); + } else if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { // query cancellation request for (final Server server: hostFinder.getAllServers()) { // send query cancellation to all brokers this query may have gone to @@ -263,6 +276,11 @@ protected void sendProxyRequest( proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); + byte[] avaticaQuery = (byte[]) clientRequest.getAttribute(AVATICA_QUERY_ATTRIBUTE); + if (avaticaQuery != null) { + proxyRequest.content(new BytesContentProvider(avaticaQuery)); + } + final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); if (query != null) { final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE); @@ -371,6 +389,18 @@ public long getInterruptedQueryCount() return interruptedQueryCount.get(); } + private static String getAvaticaConnectionId(Map requestMap) throws IOException + { + Object connectionIdObj = requestMap.get("connectionId"); + if (connectionIdObj == null) { + throw new IAE("Received an Avatica request without a connectionId."); + } + if (!(connectionIdObj instanceof String)) { + throw new IAE("Received an Avatica request with a non-String connectionId."); + } + + return (String) connectionIdObj; + } private class MetricsEmittingProxyResponseListener extends ProxyResponseListener { diff --git a/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java new file mode 100644 index 000000000000..c980d151f51b --- /dev/null +++ b/server/src/main/java/io/druid/server/router/AvaticaConnectionBalancer.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.client.selector.Server; + +import java.util.Collection; + +/** + * An AvaticaConnectionBalancer balances Avatica connections across a collection of servers. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RendezvousHashAvaticaConnectionBalancer.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "rendezvousHash", value = RendezvousHashAvaticaConnectionBalancer.class), + @JsonSubTypes.Type(name = "consistentHash", value = ConsistentHashAvaticaConnectionBalancer.class) +}) +public interface AvaticaConnectionBalancer +{ + /** + * @param servers Servers to balance across + * @param connectionId Connection ID to be balanced + * @return Server that connectionId should be assigned to. The process for choosing a server must be deterministic and + * sticky (with a fixed set of servers, the same connectionId should always be assigned to the same server) + */ + Server pickServer(Collection servers, String connectionId); +} diff --git a/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java new file mode 100644 index 000000000000..4245e39ae6a6 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/ConsistentHashAvaticaConnectionBalancer.java @@ -0,0 +1,50 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import io.druid.client.selector.Server; +import io.druid.java.util.common.StringUtils; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class ConsistentHashAvaticaConnectionBalancer implements AvaticaConnectionBalancer +{ + private final ConsistentHasher hasher = new ConsistentHasher(null); + + @Override + public Server pickServer(Collection servers, String connectionId) + { + synchronized (hasher) { + if (servers.isEmpty()) { + return null; + } + Map serverMap = new HashMap<>(); + for (Server server : servers) { + serverMap.put(server.getHost(), server); + } + + hasher.updateKeys(serverMap.keySet()); + String chosenServer = hasher.findKey(StringUtils.toUtf8(connectionId)); + return serverMap.get(chosenServer); + } + } +} diff --git a/server/src/main/java/io/druid/server/router/ConsistentHasher.java b/server/src/main/java/io/druid/server/router/ConsistentHasher.java new file mode 100644 index 000000000000..f3fee3024bea --- /dev/null +++ b/server/src/main/java/io/druid/server/router/ConsistentHasher.java @@ -0,0 +1,148 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.google.common.base.Charsets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Distributes objects across a set of node keys using consistent hashing. + * See https://en.wikipedia.org/wiki/Consistent_hashing + * Not thread-safe. + */ +public class ConsistentHasher +{ + // Determined through tests to provide reasonably equal balancing on a test set of 5-10 brokers + private static final int REPLICATION_FACTOR = 128; + private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128(); + + private final Long2ObjectRBTreeMap nodeKeySlots = new Long2ObjectRBTreeMap<>(); + { + nodeKeySlots.defaultReturnValue(null); + } + private final HashFunction hashFn; + private final Map nodeKeyHashes = new HashMap<>(); + private Set previousKeys = new HashSet<>(); + + public ConsistentHasher( + final HashFunction hashFunction + ) + { + this.hashFn = hashFunction == null ? DEFAULT_HASH_FN : hashFunction; + } + + public void updateKeys(Set currentKeys) + { + Set added = new HashSet<>(currentKeys); + added.removeAll(previousKeys); + + Set removed = new HashSet<>(previousKeys); + removed.removeAll(currentKeys); + + for (String key : added) { + addKey(key); + } + + for (String key : removed) { + removeKey(key); + } + + // store a copy in case the input was immutable + previousKeys = new HashSet<>(currentKeys); + } + + public String findKey(byte[] obj) + { + if (nodeKeySlots.size() == 0) { + return null; + } + + long objHash = hashFn.hashBytes(obj).asLong(); + + Long2ObjectSortedMap subMap = nodeKeySlots.tailMap(objHash); + if (subMap.isEmpty()) { + return nodeKeySlots.long2ObjectEntrySet().first().getValue(); + } + + Long2ObjectMap.Entry firstEntry = subMap.long2ObjectEntrySet().first(); + return firstEntry.getValue(); + } + + private void addKey(String key) + { + if (nodeKeyHashes.containsKey(key)) { + return; + } + + addNodeKeyHashes(key); + addNodeKeySlots(key); + } + + private void removeKey(String key) + { + if (!nodeKeyHashes.containsKey(key)) { + return; + } + + removeNodeKeySlots(key); + removeNodeKeyHashes(key); + } + + private void addNodeKeyHashes(String key) + { + long[] hashes = new long[REPLICATION_FACTOR]; + for (int i = 0; i < REPLICATION_FACTOR; i++) { + String vnode = key + "-" + i; + hashes[i] = hashFn.hashString(vnode, Charsets.UTF_8).asLong(); + } + + nodeKeyHashes.put(key, hashes); + } + + private void addNodeKeySlots(String key) + { + long[] hashes = nodeKeyHashes.get(key); + for (long hash : hashes) { + nodeKeySlots.put(hash, key); + } + } + + private void removeNodeKeyHashes(String key) + { + nodeKeyHashes.remove(key); + } + + private void removeNodeKeySlots(String key) + { + long[] hashes = nodeKeyHashes.get(key); + for (long hash : hashes) { + nodeKeySlots.remove(hash); + } + } +} diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java index 244bd523583f..55fcfd6ffc8b 100644 --- a/server/src/main/java/io/druid/server/router/QueryHostFinder.java +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -38,15 +38,18 @@ public class QueryHostFinder private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class); private final TieredBrokerHostSelector hostSelector; + private final AvaticaConnectionBalancer avaticaConnectionBalancer; private final ConcurrentHashMap serverBackup = new ConcurrentHashMap<>(); @Inject public QueryHostFinder( - TieredBrokerHostSelector hostSelector + TieredBrokerHostSelector hostSelector, + AvaticaConnectionBalancer avaticaConnectionBalancer ) { this.hostSelector = hostSelector; + this.avaticaConnectionBalancer = avaticaConnectionBalancer; } public Server findServer(Query query) @@ -68,6 +71,25 @@ public Collection getAllServers() .collect(Collectors.toList()); } + public Server findServerAvatica(String connectionId) + { + Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId); + if (chosenServer == null) { + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + + throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId); + } + log.debug( + "Balancer class [%s] sending request with connectionId[%s] to server: %s", + avaticaConnectionBalancer.getClass(), + connectionId, + chosenServer.getHost() + ); + return chosenServer; + } + public Server getServer(Query query) { Server server = findServer(query); diff --git a/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java new file mode 100644 index 000000000000..f4b7b400b358 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/RendezvousHashAvaticaConnectionBalancer.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import io.druid.client.selector.Server; +import io.druid.java.util.common.StringUtils; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class RendezvousHashAvaticaConnectionBalancer implements AvaticaConnectionBalancer +{ + private final RendezvousHasher hasher = new RendezvousHasher(); + + @Override + public Server pickServer(Collection servers, String connectionId) + { + if (servers.isEmpty()) { + return null; + } + + Map serverMap = new HashMap<>(); + for (Server server : servers) { + serverMap.put(server.getHost(), server); + } + String chosenServerId = hasher.chooseNode(serverMap.keySet(), StringUtils.toUtf8(connectionId)); + return serverMap.get(chosenServerId); + } +} diff --git a/server/src/main/java/io/druid/server/router/RendezvousHasher.java b/server/src/main/java/io/druid/server/router/RendezvousHasher.java new file mode 100644 index 000000000000..170573ef2d96 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/RendezvousHasher.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +import java.util.List; +import java.util.Set; + +/** + * Distributes objects across a set of node keys using rendezvous hashing + * See https://en.wikipedia.org/wiki/Rendezvous_hashing + */ +public class RendezvousHasher +{ + private static final HashFunction HASH_FN = Hashing.murmur3_128(); + + public String chooseNode(Set nodeIds, byte[] key) + { + if (nodeIds.isEmpty()) { + return null; + } + + final HashCode keyHash = HASH_FN.hashBytes(key); + long maxHash = Long.MIN_VALUE; + String maxNode = null; + + for (String nodeId : nodeIds) { + HashCode nodeHash = HASH_FN.hashString(nodeId, Charsets.UTF_8); + List hashes = Lists.newArrayList(nodeHash, keyHash); + long combinedHash = Hashing.combineOrdered(hashes).asLong(); + if (maxNode == null) { + maxHash = combinedHash; + maxNode = nodeId; + } else if (combinedHash > maxHash) { + maxHash = combinedHash; + maxNode = nodeId; + } + } + + return maxNode; + } +} diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java index 895416f0e25a..329ac6688c8f 100644 --- a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -50,6 +50,7 @@ import io.druid.server.log.RequestLogger; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.router.QueryHostFinder; +import io.druid.server.router.RendezvousHashAvaticaConnectionBalancer; import io.druid.server.security.AllowAllAuthorizer; import io.druid.server.security.AuthTestUtils; import io.druid.server.security.Authorizer; @@ -208,7 +209,7 @@ public void initialize(Server server, Injector injector) final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - final QueryHostFinder hostFinder = new QueryHostFinder(null) + final QueryHostFinder hostFinder = new QueryHostFinder(null, new RendezvousHashAvaticaConnectionBalancer()) { @Override public io.druid.client.selector.Server getServer(Query query) diff --git a/server/src/test/java/io/druid/server/ConsistentHasherTest.java b/server/src/test/java/io/druid/server/ConsistentHasherTest.java new file mode 100644 index 000000000000..ba92f8dec893 --- /dev/null +++ b/server/src/test/java/io/druid/server/ConsistentHasherTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.router.ConsistentHasher; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class ConsistentHasherTest +{ + private static HashFunction TEST_HASH_FN = Hashing.murmur3_128(); + private static int NUM_ITERATIONS = 10000; + private static final Logger log = new Logger(ConsistentHasherTest.class); + + @Test + public void testBasic() throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + hasher.updateKeys(nodes); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + // check that the same UUIDs hash to the same servers on subsequent hashStr() calls + for (int i = 0; i < 2; i++) { + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey())); + Assert.assertEquals(entry.getValue(), targetServer); + } + } + } + + @Test + public void testAddNode() throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + hasher.updateKeys(nodes); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.add("localhost:6"); + hasher.updateKeys(nodes); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey())); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testAddNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + // ~1/6 of the entries should change, check that less than 1/5 of the entries hash differently + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.20); + } + + @Test + public void testRemoveNode() throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + hasher.updateKeys(nodes); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.remove("localhost:3"); + hasher.updateKeys(nodes); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey())); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testRemoveNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + // ~1/5 of the entries should change, check that less than 1/4 of the entries hash differently + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.25); + } + + @Test + public void testInconsistentView1() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:3"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.33); + } + + @Test + public void testInconsistentView2() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:2"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.50); + } + + @Test + public void testInconsistentView3() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.66); + } + + @Test + public void testInconsistentView4() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:2"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.95); + } + + public void testInconsistentViewHelper( + String testName, + Set nodes, + Set nodes2, + double expectedDiffRatio + ) throws Exception + { + ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN); + hasher.updateKeys(nodes); + + Map uuidServerMap = new HashMap<>(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + ConsistentHasher hasher2 = new ConsistentHasher(TEST_HASH_FN); + hasher2.updateKeys(nodes2); + + Map uuidServerMap2 = new HashMap<>(); + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher2.findKey(StringUtils.toUtf8(entry.getKey())); + uuidServerMap2.put(entry.getKey(), targetServer); + } + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String otherServer = uuidServerMap2.get(entry.getKey()); + if (entry.getValue().equals(otherServer)) { + same += 1; + } else { + diff += 1; + } + } + double actualDiffRatio = ((double) diff) / NUM_ITERATIONS; + + log.info(StringUtils.format("%s Total: %s, Same: %s, Diff: %s", testName, NUM_ITERATIONS, same, diff)); + log.info("Expected diff ratio: %s, Actual diff ratio: %s", expectedDiffRatio, actualDiffRatio); + + Assert.assertTrue(actualDiffRatio <= expectedDiffRatio); + } +} diff --git a/server/src/test/java/io/druid/server/RendezvousHasherTest.java b/server/src/test/java/io/druid/server/RendezvousHasherTest.java new file mode 100644 index 000000000000..3e3274c2fc02 --- /dev/null +++ b/server/src/test/java/io/druid/server/RendezvousHasherTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.router.RendezvousHasher; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class RendezvousHasherTest +{ + private static int NUM_ITERATIONS = 10000; + private static final Logger log = new Logger(RendezvousHasherTest.class); + + @Test + public void testBasic() throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + // check that the same UUIDs hash to the same servers on subsequent hashStr() calls + for (int i = 0; i < 2; i++) { + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey())); + Assert.assertEquals(entry.getValue(), targetServer); + } + } + } + + @Test + public void testAddNode() throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.add("localhost:6"); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey())); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testAddNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.33); + } + + @Test + public void testRemoveNode() throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Map uuidServerMap = new HashMap<>(); + + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + nodes.remove("localhost:3"); + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey())); + if (entry.getValue().equals(targetServer)) { + same += 1; + } else { + diff += 1; + } + } + log.info(StringUtils.format("testRemoveNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff)); + + double diffRatio = ((double) diff) / NUM_ITERATIONS; + Assert.assertTrue(diffRatio < 0.33); + } + + @Test + public void testInconsistentView1() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:2"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:3"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.33); + } + + @Test + public void testInconsistentView2() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:1"); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:2"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.55); + } + + @Test + public void testInconsistentView3() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:3"); + nodes.add("localhost:4"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.66); + } + + @Test + public void testInconsistentView4() throws Exception + { + Set nodes = new HashSet<>(); + nodes.add("localhost:2"); + nodes.add("localhost:5"); + + Set nodes2 = new HashSet<>(); + nodes2.add("localhost:1"); + nodes2.add("localhost:4"); + nodes2.add("localhost:5"); + + testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.95); + } + + public void testInconsistentViewHelper( + String testName, + Set nodes, + Set nodes2, + double expectedDiffRatio + ) throws Exception + { + RendezvousHasher hasher = new RendezvousHasher(); + Map uuidServerMap = new HashMap<>(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + UUID objectId = UUID.randomUUID(); + String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString())); + uuidServerMap.put(objectId.toString(), targetServer); + } + + RendezvousHasher hasher2 = new RendezvousHasher(); + Map uuidServerMap2 = new HashMap<>(); + for (Map.Entry entry : uuidServerMap.entrySet()) { + String targetServer = hasher2.chooseNode(nodes2, StringUtils.toUtf8(entry.getKey())); + uuidServerMap2.put(entry.getKey(), targetServer); + } + + int same = 0; + int diff = 0; + for (Map.Entry entry : uuidServerMap.entrySet()) { + String otherServer = uuidServerMap2.get(entry.getKey()); + if (entry.getValue().equals(otherServer)) { + same += 1; + } else { + diff += 1; + } + } + double actualDiffRatio = ((double) diff) / NUM_ITERATIONS; + + log.info(StringUtils.format("%s Total: %s, Same: %s, Diff: %s", testName, NUM_ITERATIONS, same, diff)); + log.info("Expected diff ratio: %s, Actual diff ratio: %s", expectedDiffRatio, actualDiffRatio); + + Assert.assertTrue(actualDiffRatio <= expectedDiffRatio); + } +} diff --git a/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java index 78e22f159f81..2e175800c2db 100644 --- a/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java +++ b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java @@ -90,7 +90,8 @@ public void tearDown() throws Exception public void testFindServer() throws Exception { QueryHostFinder queryRunner = new QueryHostFinder( - brokerSelector + brokerSelector, + new RendezvousHashAvaticaConnectionBalancer() ); Server server = queryRunner.findServer( diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index d400de142c95..6f35939bbc89 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -50,6 +50,7 @@ import io.druid.server.http.RouterResource; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.metrics.QueryCountStatsProvider; +import io.druid.server.router.AvaticaConnectionBalancer; import io.druid.server.router.CoordinatorRuleManager; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; @@ -94,6 +95,7 @@ public void configure(Binder binder) binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9088); JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class); + JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class); binder.bind(CoordinatorRuleManager.class); LifecycleModule.register(binder, CoordinatorRuleManager.class);