Skip to content

Loadbalancer: Refactor socket/factory Map into Lists + bugfix.#112

Merged
NiteshKant merged 6 commits intomasterfrom
stevegury/loadbalancer-with-queues
Jul 1, 2016
Merged

Loadbalancer: Refactor socket/factory Map into Lists + bugfix.#112
NiteshKant merged 6 commits intomasterfrom
stevegury/loadbalancer-with-queues

Conversation

@stevegury
Copy link
Copy Markdown
Member

@stevegury stevegury commented Jun 28, 2016

Problem
The loadbalancer uses internally 2 maps, one for the list of ReactiveSocketFactory
it can use for creating a new ReactiveSocket, and one for the active ReactiveSocket.
The link is made between the 2 maps through the SocketAddress of the associated
remote server. This is kind of clunky and requires explicit knowledge of
the SocketAddress (result type of remote()).

Also, when we select a new factory to connect to or when we select the slowest
ReactiveSocket to quick out, the sorting is made via the sorted() method
on the java stream. The comparison function is actually unstable because a
tcp socket can be closed while we do the sorting.

Solution
Use two lists, one for factories and one for ReactiveSocket. Selecting a factory
is made using the "Power of 2 Choices" technique. The WeightedSocket now contains
a reference to the associated factory, and closing it add back the factory to the
factory list.

Modification
Move the WeightedSocket class inside the LoadBalancer, no

I created a proper constructor to LoadBalancer with javadoc explaining all
parameters the algorithm requires.

The LatencySubscriber inside the LoadBalancer treats some exceptions
particularly:

  • TimeoutException latency is used for predicating the next latency
  • TransportException & ClosedChannelException removes the ReactiveSocket
    from the active list.

Publishers.onError now also cancel the subscription.

I created a StressTest file, which stresses most of the part of the Client:

  • Adding/Removing new servers
  • High request concurrency
  • Dealing with failing/black-hole servers

**Problem**
The loadbalancer uses internally 2 maps, one for the list of `ReactiveSocketFactory`
it can use for creating a new `ReactiveSocket`, and one for the active `ReactiveSocket`.
The link is made between the 2 maps through the `SocketAddress` of the associated
remote server. This is kind of clunky and requires explicit knowledge of
the `SocketAddress` (result type of `remote()`).

Also, when we select a new factory to connect to or when we select the slowest
ReactiveSocket to quick out, the sorting is made via the `sorted()` method
on the java stream. The comparison function is actually unstable because a
tcp socket can be closed while we do the sorting.

**Solution**
Use two queues, one for factories and one for ReactiveSocket. Selecting a factory
is made using the "Power of 2 Choices" technique, and we also "age" the unselected
factories by moving them at the end of the queue. The WeightedSocket now contains
a reference to the associated factory, and closing it add back the factory to the
factory queue.

**Modification**
Move the `WeightedSocket` class inside the `LoadBalancer`, no

I created a proper constructor to `LoadBalancer` with javadoc explaining all
parameters the algorithm requires.

The `LatencySubscriber` inside the LoadBalancer treats some exceptions
particularly:

- `TimeoutException` latency is used for predicating the next latency
- `TransportException` & `ClosedChannelException` removes the ReactiveSocket
   from the active list.

`Publishers.onError` now also `cancel` the subscription.

I created a StressTest file, which stress most of the part of the Client:

- Adding/Removing new servers
- High request concurrency
- Dealing with failing/black-hole servers
@yschimke
Copy link
Copy Markdown
Member

Apologies for some basic questions, I'm getting up to speed on the code, but will ask anyway.

  1. It doesn't look like you a replenishing both activeFactories, so eventually you will start using the bad factories anyway. i.e. given 100 factories, 2 of which are unhealthy (low availability), ideally those 2 are statistically rarely used unless both unhealthy factories are chosen at the same time.

  2. Doesn't Power of 2 Choices rely on those initial 2 being chosen randomly? You seem to pick the first two from the Deque.

@stevegury
Copy link
Copy Markdown
Member Author

No worries about your basic questions.

  1. The activeFactories is replenished when we close a ReactiveSocket or when a new server is coming from the discovery mechanism. ReactiveSocket beeing a multiplexed protocol we don't necessarily need tons of connection, and we don't need multiple connections to the same server.
    Furthermore, if 2 out of 100 connections are bad, this algorithm tends (still possible that they are in consecutive positions in the queue) to only use them when no other options are possible.
  2. Yes, I should have said that the algorithm is based on "the power of 2 choices" but doesn't really respect it. Right now it is "aging" (picking in the front, adding back to the end) the factories via the Deque, so that we guarantee considering all factories.

@NiteshKant NiteshKant added this to the 0.2.2 milestone Jun 29, 2016
@NiteshKant NiteshKant self-assigned this Jun 29, 2016
}

List<ReactiveSocketFactory<SocketAddress>> factories =
List<ReactiveSocketFactory<?>> factories =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't related, but do we need a type argument for ReactiveSocketFactory?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type argument represents the identifier of a server, which is, in our implementation, always SocketAddress.
I can refactor the code to remove it (later).

- Add type parameter to loadBalancer (matching ReactiveSocketFactory)
- Move from a queue implementation to a list
- Refactor the queue/aging into proper "Power of Two Choices"

Fix Bug:
Computing the server list delta (added/removed) was wrong.
@stevegury
Copy link
Copy Markdown
Member Author

Here's a summary of the last commit:

Address comments:

  • Add static named default initial values
  • Add type parameter to loadBalancer (matching ReactiveSocketFactory)
  • Move from a queue implementation to a list
  • Refactor the queue/aging into proper "Power of Two Choices"

Fix Bug:

  • Computing the server list delta (added/removed) was wrong.

int n = numberOfNewSocket;
if (n > activeFactories.size()) {
n = activeFactories.size();
logger.info("addSockets(" + numberOfNewSocket
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: May be we can use templated messages here?

logger.info("addSockets({}) restricted", numberOfNewSocket)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants