Skip to content

Add Router connection balancers for Avatica queries#4983

Merged
jon-wei merged 8 commits intoapache:masterfrom
jon-wei:router_jdbc_balance
Nov 1, 2017
Merged

Add Router connection balancers for Avatica queries#4983
jon-wei merged 8 commits intoapache:masterfrom
jon-wei:router_jdbc_balance

Conversation

@jon-wei
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei commented Oct 20, 2017

All Avatica JDBC requests with a given connection ID must be routed to the same broker, since Druid brokers do not share connection state with each other.

This PR adds balancers for JDBC queries to the Router, one with consistent hashing and the other with rendezvous hashing.

@jon-wei jon-wei force-pushed the router_jdbc_balance branch from dc6f80f to e4941ca Compare October 20, 2017 00:36
/**
* An AvaticaConnectionBalancer balances Avatica connections across a collection of servers.
*/
@ExtensionPoint
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO since router is still "experimental" this should not be a formal extension point.

Making the router an official thing would be good in the future though, as it is cool.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the annotation

@Override
public void configure(Binder binder)
{
String balancerType = props.getProperty(BALANCER_PROPERTY, "");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this stuff necessary? May be simpler (& more similar to the rest of Druid) to use JsonConfigProvider. Similar to the BalancerStrategyFactory, which is defined like:

            JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class);

And then the type names and defaults are annotations on the BalancerStrategyFactory interface. Then the guice module code is much simplified (just one line) and boilerplate is reduced.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this to use JsonConfigProvider


All Avatica JDBC requests with a given connection ID must be routed to the same broker, since Druid brokers do not share connection state with each other.

To accomplish this, Druid provides two built-in balancers that use rendezvous hashing and consistent hashing of a request's connection ID respectively to assign requests to brokers.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs should help educate the user on when they may want to choose different strategies. Or if there is no common good reason to use anything other than the default, they should say that, and mention that the non-default strategies are only provided as examples, or for experimentation, or for advanced users, or whatever purpose they are provided for.

(Subtext: users get confused by options when they don't understand why they should choose non-default values)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more info on the non-default consistent hasher

private static String getAvaticaConnectionId(byte[] requestContent, ObjectMapper objectMapper) throws IOException
{
Map<String, Object> contentMap = objectMapper.readValue(requestContent, Map.class);
return (String) contentMap.get("connectionId");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should do more sanity checking on contentMap, since it's potentially arbitrary user input.

If we know that the avatica protocol always requires a connectionId (I'm not sure, but could look it up) then this could throw a nicely worded exception if it's either not there, or is there but is not a string. In this case it should never return null.

If the avatica protocol might involve messages without connectionIds, then this should probably return null in the above cases, and should be annotated with @Nullable, and the calling code should handle that appropriately (probably send to a random broker).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like connectionId is always required, based on these examples:

https://calcite.apache.org/avatica/docs/json_reference.html#openconnectionrequest

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a null and string-type check here that throw exceptions


private static byte[] getRequestContent(HttpServletRequest request) throws IOException
{
int contentSize = request.getContentLength();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP requests aren't required to specify a content length (for example if they use chunked encoding). So this isn't the right way to figure out how long a request is. See the spec for the getContentLength method: it may return -1.

byte[] requestContent = getRequestContent(request);
String connectionId = getAvaticaConnectionId(requestContent, objectMapper);
Server targetServer = avaticaConnectionBalancer.balance(hostFinder.getAllServers(), connectionId);
if (targetServer == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest moving this logic into a QueryHostFinder method named something like findServerAvatica(String connectionId), and handling the no-brokers-found situation more similarly to how that class handles it for native Druid queries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this into QueryHostFinder

import java.util.List;
import java.util.Set;

// Distributes objects across a set of node keys using rendezvous hashing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be javadoc comments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

import java.util.Map;
import java.util.Set;

// Distributes objects across a set of node keys using consistent hashing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be javadoc comments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public class RendezvousHasher
{
private static final int HASH_FN_SEED = 9999;
private static final HashFunction HASH_FN = Hashing.murmur3_128(HASH_FN_SEED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use Hashing.murmur3_128()? (I think the seed is 0)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, changed this to not pass in a seed


// Distributes objects across a set of node keys using rendezvous hashing
// See https://en.wikipedia.org/wiki/Rendezvous_hashing
public class RendezvousHasher
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a need for this to be separate from RendezvousHashAvaticaConnectionBalancer. It doesn't seem like it will be used by anything else, and neither class is very complex.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly similar comment for the consistent hash version.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Oct 20, 2017

General idea looks good to me, but have some comments on the implementation.

@jon-wei jon-wei force-pushed the router_jdbc_balance branch from 31577c4 to 5614894 Compare October 20, 2017 05:58

throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId);
}
log.info(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be log.debug, not info.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, fixed

{
private static final HashFunction HASH_FN = Hashing.murmur3_128();

public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final, if you choose to keep this field.


public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8);

public String chooseNode(Set<String> nodeIds, String key)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be simpler, I think, for key serialization to be responsibility of the caller, and for this to accept byte[] (and to not have the 3-arg funnel version). But up to you if you want to keep it this way. It's not a major difference either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed these to accept byte[] and dropped the funnels

private static final int REPLICATION_FACTOR = 128;
private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128(9999);

public static Funnel STRING_FUNNEL = Funnels.stringFunnel(Charsets.UTF_8);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

previousKeys = new HashSet<>(currentKeys);
}

public String hashStr(String str)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about API as RendezvousHasher.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host";
private static final String SCHEME_ATTRIBUTE = "io.druid.proxy.to.host.scheme";
private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query";
private static final String AVATICA_QUERY_ATTRIBUTE = "io.druid.proxy.avaticaQuery";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this an avatica specific routing policy how about doing it in a different Guice module, my question is if somehow we want to add similar functionality for a different tool do I have to add the same thing here as well and add a branch? IMO this can be done by extending this class and bind it via Guice thought.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I feel like making the query handling for the router into an extensible module system is beyond the scope of this PR.

For now, I'd think it's fine if you wanted to add another branch here. What use case did you have in mind?

@himanshug
Copy link
Copy Markdown
Contributor

@gianm @jon-wei Do you know if Micostrategy, Looker etc can take multiple broker hosts in their config and can do this type of sticky routing? It appears if we just gave a broker VIP to them then requests would get routed to different brokers and things might not work. Also, This breaks if you have multiple routers, in fact most users don't use routers.

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Oct 25, 2017

@himanshug

Also, This breaks if you have multiple routers, in fact most users don't use routers.

What issue do you see when there are multiple routers?

@himanshug
Copy link
Copy Markdown
Contributor

What issue do you see when there are multiple routers?

@jon-wei haven't seen the code in detail but it seems that purpose of this PR is to do sticky routing for requests in same connection so that all queries for same connection are sent to same broker. if you have multiple routers and client sends queries for same connection to different router then they wouldn't necessarily go to same broker.
Or, Its not an issue as long as algorithm to decide on the broker is deterministic and takes "connectionId" in the input, which is true for provided implementations.
However, it assumes that all routers are running with same value for druid.router.avatica.balancer . I think we should note it in the docs.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Oct 25, 2017

@himanshug The idea is it shouldn't break with multiple routers, due to the use of the same deterministic hash function across the routers.

At least the way it works today, Druid SQL JDBC requires some kind of sticky load balancer, or it requires using routers to provide stickiness in the way that this patch makes possible. It's because Avatica uses multiple HTTP calls for the same logical JDBC connection, and the state is kept on the broker and is not shared between brokers (since the state includes a query result cursor, it is impractical to share it).

Druid SQL over HTTP doesn't require stickiness and so is preferable operationally unless you really want to use JDBC.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Oct 25, 2017

@jon-wei could you edit the SQL docs too, as part of this? This part should mention the router option:

Druid's JDBC server does not share connection state between brokers. This means that if you're using JDBC and have multiple Druid brokers, you should either connect to a specific broker, or use a load balancer with sticky sessions enabled.

And it should probably also mention that the SQL-over-HTTP API is stateless and so no stickiness is needed (just to be super-clear).

@himanshug @b-slim any concerns with this approach?

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Oct 25, 2017

@himanshug

Or, Its not an issue as long as algorithm to decide on the broker is deterministic and takes "connectionId" in the input, which is true for provided implementations.
However, it assumes that all routers are running with same value for druid.router.avatica.balancer . I think we should note it in the docs.

Yes, the provided implementations are deterministic and multiple routers will make the same routing decisions for a given connectionID (assuming they share the same view of the set of active brokers).

I'll add a note to the docs about making sure all routers have the same druid.router.avatica.balancer value.

@gianm Sure, I'll add those notes to the docs as well.

@himanshug
Copy link
Copy Markdown
Contributor

@gianm @jon-wei I wonder if you considered the approach of making brokers smart about it? For example... Like routers , brokers also can see all the brokers in the cluster and can have same "active-brokers" list . same logic could be implemented on broker to forward/redirect request to a peer broker if it gets request for a connectionId that should be fulfilled by other broker.
That kind of approach would save users from deploying router node.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Oct 25, 2017

@himanshug we considered it but thought that the router was the more natural place to put this, since it already handles proxying and it was not too much additional code to add determinism for JDBC sessions. The router also has an advantage over the broker in that it can handle a lot more concurrent connections (because it uses an async model rather than a thread-per-request model like the broker). It removes the risk of taking up excessive broker http threads by having them proxy to each other.

Fwiw we were also considering using the router to proxy to the leader coordinator to avoid having to deal with redirects… which are problematic when users want to put coordinators behind load balancers. The redirects "escape" out of the load balancer and may get blocked by a firewall. If we go down that path, it would make the router the single user-facing node that anyone would have to talk to.

I could see combining the router and broker nodes in the future too, ideally along with preserving the async nature of the router proxying.

@himanshug
Copy link
Copy Markdown
Contributor

himanshug commented Oct 25, 2017

@gianm @jon-wei I will review this PR soon and conversation below is to figure out long term direction rather than blocking this PR.
so we have two directions, one to make router smarter and smarter, make it non-experimental and a node that is recommended to be deployed.
or, to put smartness in specific nodes..
for brokers, async http is doable. in fact, we do async http in historical for segments management http endpoints, see https://github.com/druid-io/druid/blob/master/server/src/main/java/io/druid/server/http/SegmentListerResource.java#L140 .
for coordinators, they could also optionally switch to async http (not critical really) and do the forwards instead of redirects (may be based on some configuration this behavior can be enabled/disabled).

what is the direction you have in mind ?

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Oct 26, 2017

@himanshug in my view, the short term direction is add more smarts to the router, since that's the simplest path and we have some needs for a "smarter" proxy layer.

In the long run, I'm not sure, although I think integrating the broker and router is probably best, so there is just one node type that users need to worry about. (the broker, which could proxy to anything else)

throw new IAE("Received an Avatica request with a non-String connectionId.");
}

return (String) requestMap.get("connectionId");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return (String) connectionIdObj;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to reuse previously retrieved value

previousKeys = new HashSet<>(currentKeys);
}

public String hash(byte[] obj)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a hash(byte[]) returning String key sounds un-intuitive, can we call it something like findKey(..) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to findKey

*/
public class ConsistentHasher
{
private static final int REPLICATION_FACTOR = 128;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to have comment explaining the thought process behind the magic number.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment

weights.defaultReturnValue(null);

for (String nodeId : nodeIds) {
HashCode keyHash = HASH_FN.hashBytes(key);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be computed outside of for loop just once.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved out of the loop

weights.put(combinedHash.asLong(), nodeId);
}

return weights.get(weights.lastLongKey());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we just need the max weight key, we can maintain it in the for-loop above without needing the sorted map.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, good catch, fixed this as well

* @param connectionId Connection ID to be balanced
* @return Server that connectionId should be assigned to
*/
Server balance(Collection<Server> servers, String connectionId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we just call it pickServer(..) and also put in javadocs that it needs to be deterministic/sticky.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to pickServer, added comments about determinism/stickiness

serverMap.put(server.getHost(), server);
}

hasher.updateKeys(serverMap.keySet());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not make sense to be called on each request ... broker appear/disappear very rarely and changes could be handled via the notifications.
also, then, i believe consistent hasher would be higher performing overall because broker-set doesn't change often and finding server only involves a hash and a binary search.

Copy link
Copy Markdown
Contributor Author

@jon-wei jon-wei Oct 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I'm wary of potential bugs with managing state with missed/dropped notifications, etc.

I feel like this is fine as-is for this PR and we could enhance this later (the docs do mention that the consistent hasher is experimental), benchmark shows that this updateKeys() call isn't particularly expensive if none of the servers are changing, the number of brokers in a druid cluster isn't typically very large, with 10 brokers it's adding about <1us overhead per request:

  public Set<String> updateKeysBenchmark(Set<String> currentKeys)
  {
    Set<String> added = new HashSet<>(currentKeys);
    added.removeAll(previousKeys);

    Set<String> removed = new HashSet<>(previousKeys);
    removed.removeAll(currentKeys);

    for (String key : added) {
      addKey(key);
    }

    for (String key : removed) {
      removeKey(key);
    }

    // store a copy in case the input was immutable
    return previousKeys = new HashSet<>(currentKeys);
  }


    @Benchmark
  @BenchmarkMode(Mode.AverageTime)
  @OutputTimeUnit(TimeUnit.MICROSECONDS)
  public void updateKeys(Blackhole blackhole) throws Exception
  {
    for (int i = 0; i < numIds; i++) {
      blackhole.consume(hasher.updateKeysBenchmark(servers));
    }
  }

Benchmark                             (numIds)  Mode  Cnt      Score      Error  Units
ConsistentHasherBenchmark.updateKeys    100000  avgt   30  89350.198 ± 2640.216  us/o

also, then, i believe consistent hasher would be higher performing overall because broker-set doesn't change often and finding server only involves a hash and a binary search.

Benchmark                       (numIds)  Mode  Cnt       Score      Error  Units
RendezvousHasherBenchmark.hash    100000  avgt   30  330105.546 ± 7494.327  us/op
ConsistentHasherBenchmark.hash    100000  avgt   30  43539.036 ± 1134.461  us/op

The consistentHasher does make faster hashing choices in the steady-state, but neither seem to impose much overhead.

When running the RendezvousHasher benchmark on my laptop, I was getting ~3us per hashing request with 10 brokers, so I would expect the load balancing overhead to be dwarfed by the actual query processing times.

I made Rendezvous the default since it's a simpler implementation with less state to manage and less locking.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it have no performance impact then overhead is alright. thanks for benchmark.

@himanshug
Copy link
Copy Markdown
Contributor

brief discussion from dev-sync: @gianm @jon-wei @cheddar
Eric proposed, is it possible to control the end user client so that it asks router/coordinator the list of brokers and then manages the balancing on client side (using any of the hashing mechanisms)? router has such an endpoint already and coordinator would have such endpoint in subsequent releases.

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Oct 31, 2017

@himanshug @cheddar @gianm

is it possible to control the end user client so that it asks router/coordinator the list of brokers and then manages the balancing on client side (using any of the hashing mechanisms)?

This isn't presently possible, the Avatica client (https://calcite.apache.org/avatica/) doesn't support this kind of client-side balancing. It could be patched to do so, but that's a separate project from Druid and I think that would be a longer term effort compared to implementing the balancing on the router.

@cheddar specifically also mentioned using 302 redirects instead of proxying on the Router to provide JDBC connection stickiness, and I had a discussion with @gianm on that.

We haven't tested if the 302 redirects work with the Avatica client; if that works, we think 302 redirects could be a useful alternative to proxying, but it requires the brokers behind the router to be directly accessible by clients, which isn't always the case (e.g., IP routing configurations, network security policies could block such traffic).

With this patch, our goal is to provide a solution that can work without those requirements.

@himanshug
Copy link
Copy Markdown
Contributor

@jon-wei i think this PR is ok in itself. feel free to merge when you're done with any other investigations left. in the long run, it would be really nice for users to not require router or other sticky load balancing component for jdbc connections to work. thanks.

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Nov 1, 2017

@himanshug cool, thanks for the review!

@jon-wei jon-wei merged commit 6840eab into apache:master Nov 1, 2017
gianm pushed a commit to implydata/druid-public that referenced this pull request Nov 14, 2017
* Add Router connection balancers for Avatica queries

* PR comments

* Adjust test bounds

* PR comments

* Add doc comments

* PR comments

* PR comment

* Checkstyle fix
gianm pushed a commit to implydata/druid-public that referenced this pull request Dec 5, 2017
* Add Router connection balancers for Avatica queries

* PR comments

* Adjust test bounds

* PR comments

* Add doc comments

* PR comments

* PR comment

* Checkstyle fix
@jon-wei jon-wei added this to the 0.12.0 milestone Jan 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants