-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Add Router connection balancers for Avatica queries #4983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e4941ca
5614894
91abdfd
1d1ff56
c371f60
a96c4a8
ca6e262
7a0a680
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| /* | ||
| * Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. Metamarkets licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package io.druid.benchmark; | ||
|
|
||
| import com.google.common.collect.Sets; | ||
|
|
||
| import io.druid.java.util.common.StringUtils; | ||
| import io.druid.server.router.ConsistentHasher; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
| import org.openjdk.jmh.annotations.Param; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.Setup; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.Warmup; | ||
| import org.openjdk.jmh.infra.Blackhole; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| @State(Scope.Benchmark) | ||
| @Fork(value = 1) | ||
| @Warmup(iterations = 15) | ||
| @Measurement(iterations = 30) | ||
| public class ConsistentHasherBenchmark | ||
| { | ||
| @Param({"100000"}) | ||
| int numIds; | ||
|
|
||
| ConsistentHasher hasher; | ||
| List<String> uuids; | ||
| Set<String> servers; | ||
|
|
||
| @Setup | ||
| public void setup() throws IOException | ||
| { | ||
| hasher = new ConsistentHasher(null); | ||
| uuids = new ArrayList<>(); | ||
| servers = Sets.newHashSet( | ||
| "localhost:1", | ||
| "localhost:2", | ||
| "localhost:3", | ||
| "localhost:4", | ||
| "localhost:5", | ||
| "localhost:6", | ||
| "localhost:7", | ||
| "localhost:8", | ||
| "localhost:9", | ||
| "localhost:10" | ||
| ); | ||
|
|
||
| for (int i = 0; i < numIds; i++) { | ||
| UUID uuid = UUID.randomUUID(); | ||
| uuids.add(uuid.toString()); | ||
| } | ||
|
|
||
| hasher.updateKeys(servers); | ||
| } | ||
|
|
||
| @Benchmark | ||
| @BenchmarkMode(Mode.AverageTime) | ||
| @OutputTimeUnit(TimeUnit.MICROSECONDS) | ||
| public void hash(Blackhole blackhole) throws Exception | ||
| { | ||
| for (String uuid : uuids) { | ||
| String server = hasher.findKey(StringUtils.toUtf8(uuid)); | ||
| blackhole.consume(server); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| /* | ||
| * Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. Metamarkets licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package io.druid.benchmark; | ||
|
|
||
| import com.google.common.collect.Sets; | ||
| import io.druid.java.util.common.StringUtils; | ||
| import io.druid.server.router.RendezvousHasher; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
| import org.openjdk.jmh.annotations.Param; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.Setup; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.Warmup; | ||
| import org.openjdk.jmh.infra.Blackhole; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| @State(Scope.Benchmark) | ||
| @Fork(value = 1) | ||
| @Warmup(iterations = 15) | ||
| @Measurement(iterations = 30) | ||
| public class RendezvousHasherBenchmark | ||
| { | ||
| @Param({"100000"}) | ||
| int numIds; | ||
|
|
||
| RendezvousHasher hasher; | ||
| List<String> uuids; | ||
| Set<String> servers; | ||
|
|
||
| @Setup | ||
| public void setup() throws IOException | ||
| { | ||
| hasher = new RendezvousHasher(); | ||
| uuids = new ArrayList<>(); | ||
| servers = Sets.newHashSet( | ||
| "localhost:1", | ||
| "localhost:2", | ||
| "localhost:3", | ||
| "localhost:4", | ||
| "localhost:5", | ||
| "localhost:6", | ||
| "localhost:7", | ||
| "localhost:8", | ||
| "localhost:9", | ||
| "localhost:10" | ||
| ); | ||
|
|
||
|
|
||
| for (int i = 0; i < numIds; i++) { | ||
| UUID uuid = UUID.randomUUID(); | ||
| uuids.add(uuid.toString()); | ||
| } | ||
| } | ||
|
|
||
| @Benchmark | ||
| @BenchmarkMode(Mode.AverageTime) | ||
| @OutputTimeUnit(TimeUnit.MICROSECONDS) | ||
| public void hash(Blackhole blackhole) throws Exception | ||
| { | ||
| for (String uuid : uuids) { | ||
| String server = hasher.chooseNode(servers, StringUtils.toUtf8(uuid)); | ||
| blackhole.consume(server); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import io.druid.guice.annotations.Smile; | ||
| import io.druid.guice.http.DruidHttpClientConfig; | ||
| import io.druid.java.util.common.DateTimes; | ||
| import io.druid.java.util.common.IAE; | ||
| import io.druid.query.DruidMetrics; | ||
| import io.druid.query.GenericQueryMetricsFactory; | ||
| import io.druid.query.Query; | ||
|
|
@@ -62,6 +63,7 @@ | |
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
| import java.net.URLDecoder; | ||
| import java.util.Map; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
@@ -78,6 +80,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu | |
| private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host"; | ||
| private static final String SCHEME_ATTRIBUTE = "io.druid.proxy.to.host.scheme"; | ||
| private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query"; | ||
| private static final String AVATICA_QUERY_ATTRIBUTE = "io.druid.proxy.avaticaQuery"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this an avatica specific routing policy how about doing it in a different Guice module, my question is if somehow we want to add similar functionality for a different tool do I have to add the same thing here as well and add a branch? IMO this can be done by extending this class and bind it via Guice thought.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I feel like making the query handling for the router into an extensible module system is beyond the scope of this PR. For now, I'd think it's fine if you wanted to add another branch here. What use case did you have in mind? |
||
| private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper"; | ||
|
|
||
| private static final int CANCELLATION_TIMEOUT_MILLIS = 500; | ||
|
|
@@ -186,7 +189,17 @@ protected void service(HttpServletRequest request, HttpServletResponse response) | |
| final boolean isQueryEndpoint = request.getRequestURI().startsWith("/druid/v2") | ||
| && !request.getRequestURI().startsWith("/druid/v2/sql"); | ||
|
|
||
| if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { | ||
| final boolean isAvatica = request.getRequestURI().startsWith("/druid/v2/sql/avatica"); | ||
|
|
||
| if (isAvatica) { | ||
| Map<String, Object> requestMap = objectMapper.readValue(request.getInputStream(), Map.class); | ||
| String connectionId = getAvaticaConnectionId(requestMap); | ||
| Server targetServer = hostFinder.findServerAvatica(connectionId); | ||
| byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap); | ||
| request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost()); | ||
| request.setAttribute(SCHEME_ATTRIBUTE, targetServer.getScheme()); | ||
| request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); | ||
| } else if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) { | ||
| // query cancellation request | ||
| for (final Server server: hostFinder.getAllServers()) { | ||
| // send query cancellation to all brokers this query may have gone to | ||
|
|
@@ -263,6 +276,11 @@ protected void sendProxyRequest( | |
| proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); | ||
| proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); | ||
|
|
||
| byte[] avaticaQuery = (byte[]) clientRequest.getAttribute(AVATICA_QUERY_ATTRIBUTE); | ||
| if (avaticaQuery != null) { | ||
| proxyRequest.content(new BytesContentProvider(avaticaQuery)); | ||
| } | ||
|
|
||
| final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); | ||
| if (query != null) { | ||
| final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE); | ||
|
|
@@ -371,6 +389,18 @@ public long getInterruptedQueryCount() | |
| return interruptedQueryCount.get(); | ||
| } | ||
|
|
||
| private static String getAvaticaConnectionId(Map<String, Object> requestMap) throws IOException | ||
| { | ||
| Object connectionIdObj = requestMap.get("connectionId"); | ||
| if (connectionIdObj == null) { | ||
| throw new IAE("Received an Avatica request without a connectionId."); | ||
| } | ||
| if (!(connectionIdObj instanceof String)) { | ||
| throw new IAE("Received an Avatica request with a non-String connectionId."); | ||
| } | ||
|
|
||
| return (String) connectionIdObj; | ||
| } | ||
|
|
||
| private class MetricsEmittingProxyResponseListener extends ProxyResponseListener | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. Metamarkets licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package io.druid.server.router; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonSubTypes; | ||
| import com.fasterxml.jackson.annotation.JsonTypeInfo; | ||
| import io.druid.client.selector.Server; | ||
|
|
||
| import java.util.Collection; | ||
|
|
||
| /** | ||
| * An AvaticaConnectionBalancer balances Avatica connections across a collection of servers. | ||
| */ | ||
| @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RendezvousHashAvaticaConnectionBalancer.class) | ||
| @JsonSubTypes(value = { | ||
| @JsonSubTypes.Type(name = "rendezvousHash", value = RendezvousHashAvaticaConnectionBalancer.class), | ||
| @JsonSubTypes.Type(name = "consistentHash", value = ConsistentHashAvaticaConnectionBalancer.class) | ||
| }) | ||
| public interface AvaticaConnectionBalancer | ||
| { | ||
| /** | ||
| * @param servers Servers to balance across | ||
| * @param connectionId Connection ID to be balanced | ||
| * @return Server that connectionId should be assigned to. The process for choosing a server must be deterministic and | ||
| * sticky (with a fixed set of servers, the same connectionId should always be assigned to the same server) | ||
| */ | ||
| Server pickServer(Collection<Server> servers, String connectionId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs should help educate the user on when they may want to choose different strategies. Or if there is no common good reason to use anything other than the default, they should say that, and mention that the non-default strategies are only provided as examples, or for experimentation, or for advanced users, or whatever purpose they are provided for.
(Subtext: users get confused by options when they don't understand why they should choose non-default values)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some more info on the non-default consistent hasher