Skip to content

Conversation

@codelipenghui
Copy link
Contributor

Motivation

Improve Key_Shared subscription message dispatching performance.

Modifications

  1. Reduce make hash for the message key
  2. Reduce select consumer

Verifying this change

Performance improves.

On my laptop, publish messages with rate 150k/s and create 1500 consumers. The consumption rate can't catchup the message publish rate.

After this change, 3000 consumers still can catchup the message publish rate.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui codelipenghui self-assigned this Apr 1, 2020
@codelipenghui codelipenghui added this to the 2.6.0 milestone Apr 1, 2020
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

4 similar comments
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui force-pushed the key_shared_performance_improve branch from 09db1b0 to 7f8af69 Compare April 2, 2020 03:53
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

4 similar comments
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui force-pushed the key_shared_performance_improve branch from 7f8af69 to b69d0e8 Compare April 5, 2020 00:36
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

4 similar comments
@tuteng
Copy link
Member

tuteng commented Apr 5, 2020

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

3 similar comments
@tuteng
Copy link
Member

tuteng commented Apr 5, 2020

/pulsarbot run-failure-checks

@tuteng
Copy link
Member

tuteng commented Apr 5, 2020

/pulsarbot run-failure-checks

@tuteng
Copy link
Member

tuteng commented Apr 5, 2020

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit c555eb7 into apache:master Apr 5, 2020
tuteng pushed a commit that referenced this pull request Apr 6, 2020
* Improve Key_Shared subscription message dispatching performance.

* Fix unit tests.

* Remove system.out.println

(cherry picked from commit c555eb7)
tuteng pushed a commit that referenced this pull request Apr 13, 2020
* Improve Key_Shared subscription message dispatching performance.

* Fix unit tests.

* Remove system.out.println

(cherry picked from commit c555eb7)
@merlimat
Copy link
Contributor

Hi @codelipenghui. I'm failing to see a practical difference in the code here compared to the previous implementation.

I think the only difference is when the % is done, but there's no performance or efficiency difference.

@codelipenghui
Copy link
Contributor Author

@merlimat

The difference is I hashed the key twice before, this change is reduced one hash calculation in the selector.

Another difference is, I group the messages by the message key, but when users use random keys, the dispatcher can get benefit from the group operation. So the new way is grouped by hash(key) % ranges, so that we can also get group benefit for random keys. The benefit is we can reduce operates of the LinkedHashMap.

I have analyzed by yourkit, the managed ledger worker thread in high CPU workload with random keys and non-batched messages.

@codelipenghui codelipenghui deleted the key_shared_performance_improve branch April 22, 2020 08:07
@merlimat
Copy link
Contributor

The difference is I hashed the key twice before, this change is reduced one hash calculation in the selector.

I think it's actually the reverse, the % operation is done for each message instead of for each "key"

Another difference is, I group the messages by the message key, but when users use random keys, the dispatcher can get benefit from the group operation. So the new way is grouped by hash(key) % ranges, so that we can also get group benefit for random keys. The benefit is we can reduce operates of the LinkedHashMap.

I think this only matters if rangeSize is small. With rangeSize = 65K or bigger there's no practical grouping benefit. If rangeSize=4, of course, this approach would be more efficient.

I believe a better approach here, instead of exposing the index internals, would be to just group by consumers instead of key or index. Basically, construct Map<Consumer, List<Entry>> and then dispatch. This would work for any rangeSize. I'll make this change after #6791.

@codelipenghui
Copy link
Contributor Author

I think it's actually the reverse, the % operation is done for each message instead of for each "key"

I think this only matters if rangeSize is small. With rangeSize = 65K or bigger there's no practical grouping benefit. If rangeSize=4, of course, this approach would be more efficient.

Totally agree.

I believe a better approach here, instead of exposing the index internals, would be to just group by consumers instead of key or index. Basically, construct Map<Consumer, List> and then dispatch. This would work for any rangeSize. I'll make this change after #6791.

If I understand correctly, this also needs to find a consumer for a key right? Can we group the messages by the consumer without getting a consumer from the Map?

@merlimat
Copy link
Contributor

If I understand correctly, this also needs to find a consumer for a key right?

Yes, instead of saying: "what is the index for this key?" we say "what is the consumer for this key?".

Grouping by consumer will lead to the smallest possible cardinality.

Can we group the messages by the consumer without getting a consumer from the Map?

I don't understand the question here.

@codelipenghui
Copy link
Contributor Author

Yes, instead of saying: "what is the index for this key?" we say "what is the consumer for this key?".

Grouping by consumer will lead to the smallest possible cardinality.

Sorry, I'm lost here. How can we map a key to a consumer? I just know currently we map them by a hash range or a hash ring. So we need to get a consumer for a key from the hash range or the hash ring. The key purpose is to reduce the get consumer operation from the hash range or the hash ring. Too many get consumer operations to result in high CPU usage of the thread.

I don't know how to map a key to a consumer without getting a consumer from a hash range or a hash ring. If I missed something, please point out, thanks.

jiazhai pushed a commit to jiazhai/pulsar that referenced this pull request May 18, 2020
…che#6647)

* Improve Key_Shared subscription message dispatching performance.

* Fix unit tests.

* Remove system.out.println
(cherry picked from commit c555eb7)
sijie pushed a commit that referenced this pull request Jun 1, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on #6647 (comment). I'm happy to discuss more about it.
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 1, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 1, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 12, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…che#6647)

* Improve Key_Shared subscription message dispatching performance.

* Fix unit tests.

* Remove system.out.println
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants