Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Jan 5, 2022

Motivation

This PR is a C++ catch up of #4400, which implements PIP 37.

Modifications

Changes of the interfaces:

  • Add the chunkingEnabled config to producer so that producer can enable chunking messages by enabling chunkingEnabled and disabling batchingEnabled.
  • Add the maxPendingChunkedMessage and autoOldestChunkedMessageOnQueueFull configs to consumer to control the behavior to handle chunks.

Implementations of producer side:

  • Refactor the sendAsync method to follow the similar logic of Java client. If canAddToBatch returns false and the message size exceeds the limit, split the large message into chunks and send them one by one.
  • Fix the bug of Commands::newSend that it didn't use readableBytes() as the buffer's size, which could lead to a wrong check sum result when sending chunks because each chunk is a reference of the original large buffer and only the reader index is different.

Implementations of consumer side:

  • Add a MapCache class to wrap a hash map with a double ended queue that records the UUIDs in order of time. Because when we removes an UUID, we need to remove the related cache from both the map and the queue. This class only exposes the necessary methods to avoid one of the two data structures is not cleaned. In addition, it makes test easier to verify the cache is cleared after chunks are merged.
  • For chunks, call processMessageChunk to cache these chunks and merge the chunks with the same UUID into the completed message once the last chunks arrived.

Tests:

  • Configure maxMessageSize=10240 in test-conf/standalone-ssl.conf so that we don't need to create a too large message in tests.
  • Add MapCacheTest to verify the public methods of MapCache.
  • Add MessageChunkingTest to verify the basic end to end case with all compression types.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

@BewareMyPower BewareMyPower added component/c++ type/PIP doc-not-needed Your PR changes do not impact docs labels Jan 5, 2022
@BewareMyPower BewareMyPower added this to the 2.10.0 milestone Jan 5, 2022
@BewareMyPower BewareMyPower changed the title [C++] PIP 37: Support large message size [WIP][C++] PIP 37: Support large message size Jan 5, 2022
@BewareMyPower
Copy link
Contributor Author

It looks like some code is not compatible with the test env. I'll fix it soon.

@BewareMyPower BewareMyPower changed the title [WIP][C++] PIP 37: Support large message size [C++] PIP 37: Support large message size Jan 5, 2022
@BewareMyPower
Copy link
Contributor Author

Hi @merlimat , could you review this PR?

@codelipenghui
Copy link
Contributor

ping @merlimat Please help review this PR.

@merlimat merlimat merged commit de0a8bd into apache:master Jan 22, 2022
@BewareMyPower BewareMyPower deleted the bewaremypower/cpp-chunk-msg branch January 22, 2022 01:18
BewareMyPower added a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)

Fix the conflicts by:
- Remove ReaderImpl::getLastMessageIdAsync introduced from #11723
- Remove getPriorityLevel() method introduced from #12076
- Revert changes of registerConsumer from #12118
- Remove new fields introduced from #13627
BewareMyPower added a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)

Fix the conflicts by
- Remove new fields introduced from #13627
@Anonymitaet
Copy link
Member

Hi @momo-jun could you please take a look at this PR? Similar to the Java client, do we need to add chunking docs for C++ client? Thanks

@momo-jun
Copy link
Contributor

Hi @momo-jun could you please take a look at this PR? Similar to the Java client, do we need to add chunking docs for C++ client? Thanks

Sure. Will look into this.

BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Apr 25, 2022
Fixes apache#13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in apache#13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```
BewareMyPower added a commit that referenced this pull request Apr 27, 2022
* [C++] Wait until event loops terminates when closing the Client

Fixes #13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in #13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```
codelipenghui pushed a commit that referenced this pull request Apr 28, 2022
* [C++] Wait until event loops terminates when closing the Client

Fixes #13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in #13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```

(cherry picked from commit cd78f39)
codelipenghui pushed a commit that referenced this pull request Apr 28, 2022
* [C++] Wait until event loops terminates when closing the Client

Fixes #13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in #13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```

(cherry picked from commit cd78f39)
codelipenghui pushed a commit that referenced this pull request Apr 29, 2022
* [C++] Wait until event loops terminates when closing the Client

Fixes #13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in #13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```

(cherry picked from commit cd78f39)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 4, 2022
…e#15316)

* [C++] Wait until event loops terminates when closing the Client

Fixes apache#13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in apache#13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```

(cherry picked from commit cd78f39)
(cherry picked from commit c0c67db)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 9, 2022
…e#15316)

* [C++] Wait until event loops terminates when closing the Client

Fixes apache#13267

### Motivation

Unlike Java client, the `Client` of C++ client has a `shutdown` method
that is responsible to execute the following steps:
1. Call `shutdown` on all internal producers and consumers
2. Close all connections in the pool
3. Close all executors of the executor providers.

When an executor is closed, it call `io_service::stop()`, which makes
the event loop (`io_service::run()`) in another thread return as soon as
possible. However, there is no wait operation. If a client failed to
create a producer or consumer, the `close` method will call `shutdown`
and close all executors immediately and exits the application. In this
case, the detached event loop thread might not exit ASAP, then valgrind
will detect the memory leak.

This memory leak can be avoided by sleeping for a while after
`Client::close` returns or there are still other things to do after
that. However, we should still adopt the semantics that after
`Client::shutdown` returns, all event loop threads should be terminated.

### Modifications
- Add a timeout parameter to the `close` method of `ExecutorService` and
  `ExecutorServiceProvider` as the max blocking timeout if it's
  non-negative.
- Add a `TimeoutProcessor` helper class to update the left timeout after
  calling all methods that accept the timeout parameter.
- Call `close` on all `ExecutorServiceProvider`s in
  `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
  In addition, in `handleClose` method, call `shutdown` in another
  thread to avoid the deadlock.

### Verifying this change

After applying this patch, the reproduce code in apache#13627 will pass the
valgrind check.

```
==3013== LEAK SUMMARY:
==3013==    definitely lost: 0 bytes in 0 blocks
==3013==    indirectly lost: 0 bytes in 0 blocks
==3013==      possibly lost: 0 bytes in 0 blocks
```

(cherry picked from commit cd78f39)
(cherry picked from commit 6d365c9)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs type/PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants