Skip to content

Conversation

@congbobo184
Copy link
Contributor

cherry-pick: #17736 problem fix and run branch-2.9 test

Motivation

Documentation

  • doc-not-needed

Matching PR in forked repository

Technoboy- and others added 30 commits May 25, 2022 17:12
* [branch-2.9][fix][broker] Fix jdk API compatibility issues.

* Fix compile issue
In the current sql implementation, If using `JSON` schema and querying for decimal type, there will be the following two errors:

1. The data type is displayed as varchar.
2. Loss of precision because scientific notation is used to display.

```
presto> select bigdecimal, typeof(bigdecimal) as devimal_type from pulsar."public/default".test_avro2;
      bigdecimal       | devimal_type
-----------------------+--------------
 1.2345678912345678E36 | varchar
 1.2345678912345678E36 | varchar
(2 rows)
```
The original data is: `1234567891234567891234567891234567.89`

-  When getting jsonNode,  use `BIG_DECIMAL` instead of float and double.
- `PulsarJsonFieldDecoder` increases the processing of Decimal types

(cherry picked from commit 0c6e2ca)
…15663)

When trying to reproduce the problem of #15609 using the master's code, it was found that the master also had this bug. The root cause is:
When there is only one ledger in the ManagedLedger, after the current ledger is closed, it has the timestamp and exceeds the time set by the backlog-qutoa, resulting in the failure to create the producer.

The added test could reproduce this.

So when there is only one ledger, we should not exclude it.

If revert this patch, the added test will fail.

(cherry picked from commit 3a80458)
…atest protobuf package (#15846)

* [fix][python]Fix generated Python protobuf code not compatible with latest protobuf package
Protobuf latest 4.21 (v21) version broke compatibility with files generated with protoc < 3.19
This fix downgrades protobuf python package to 3.20.1.
See https://developers.google.com/protocol-buffers/docs/news/2022-05-06

* Limit protobuf version to 3.20.* in setup.py

Co-authored-by: Lari Hotari <lhotari@apache.org>
(cherry picked from commit 7800fbd)
(cherry picked from commit 7fb23cf)
* Switch to relying on Netty for Hostname Verification

- Add "subjectAltName = DNS:localhost, IP:127.0.0.1" to unit test certs

Co-authored-by: Lari Hotari <lhotari@apache.org>
(cherry picked from commit aa7700d)
Fixes #14529

The cursor data is managed by ZooKeeper/etcd metadata store. When cursor data becomes more and more, the data size will increase and will take a lot of time to pull the data. Therefore, it is necessary to add compression for the cursor, which can reduce the size of data and reduce the time of pulling data.

- Add a named `ManagedCursorInfoMetadata` message to `MLDataFormats.proto` for as compression metadata
- Add the `managedCursorInfoCompressionType` to `org.apache.pulsar.broker.ServiceConfiguration` and `org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig`
- This feature is the same as the implementation of ManagedLedgerInfo compression, so the code is optimized to avoid duplication

(cherry picked from commit 4398733)
…#15063)

* [imporve][tiered storage] Reduce cpu usage when offloading the ledger
---

*Motivation*

When offloading a ledger, the BlockAwareSegmentInputStreamImpl will
wrap the ledger handler and make it can stream output. Then the JCloud
will read the stream as the payload and upload to the storage.
In the JCloud implementation, it read the stream with a buffer
https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68

In the current offload implementation, the read will call multiple times
to construct the buffer and then return the data.
After implement the read(byte[] b, int off, int len), the cpu usage reduced
almost 10%.

*Modifications*

- Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl

(cherry picked from commit 938ab7b)
…th executor to prevent deadlock (#15971)

(cherry picked from commit bbc404b)
…15868)

The pulsar_tool_env.sh sets the PULSAR_MEM and PULSAR_GC environment variables without allowing them to be overridden.
This change keps the default values but allows PULSAR_MEM & PULSAR_GC to be overridden which aligns with the
pulsar_env.sh file. This allows adjustments to be made to the memory settings when more memory is needed.

Co-authored-by: Jim Baugh <jim.baugh@oracle.com>

### Motivation
The pulsar_tool_env.sh sets the PULSAR_MEM environment variable without allowing it to be overridden. When running an pulsar-admin function (e.g. running the kafka to pulsar connector) we can hit java memory issues without a way to change the memory settings. This PR resolves this issue.

### Modifications
This change keeps the default value but allows PULSAR_MEM to be overridden which aligns with the
pulsar_env.sh file. This allows adjustments to be made to the memory settings when more memory is needed.

### Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

Check the box below or label this PR directly.

Need to update docs?
- [X] `doc-not-needed`
(Please explain why)
There is not currently documentation around the pulsar_tools_env.sh PULSAR_MEM setting. This change doesn't change the default behavior.

(cherry picked from commit fa6288e)
…try#checkReadCompletion. (#15104)

(cherry picked from commit 9376128)
* [improve] [tiered-storage] Add pure S3 provider for the offloader
---

*Motivation*

There have some cloud storages are compatible with S3
APIs, such as aliyun-oss. Some other storages also use
the S3 APIs and want to offload the data into them, but
we only support the AWS or the Aliyun.
The PR #8985 provides
the Aliyun offload provider, but it has a force limitation of
the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
is not a limitation on other storage service which compatible
with S3 APIs.
This PR provides  a more general offload provider `S3` which uses
pure JClouds S3 metadata and allows people to override the
default JClouds properties through system properties.

*Modifications*

- Add the pure S3 offload provider

(cherry picked from commit 047cb0e)
### Motivation

Fixes #15858

The conversion that is registered first is a higher priority than the registered later, so `TimestampMillisConversion` should not be registered after `TimestampMicrosConversion`.

### Modifications

Improve `avro` conversion order of registration.

(cherry picked from commit 311fdb5)
…og (#15817)

### Motivation
See #15483
The `@Secret` annotation works well, and introduced in #8910

### Modifications
- Revert the unneeded `@JsonIgnore`
- remove `Assert.assertFalse(s.contains("Password"));` `Password` is printed in a key. The sensitive field's value is `****`.

(cherry picked from commit 67361e8)
Fixes #15832

### Motivation
The transaction needs batch size to help determine whether the batch message is in the pending ack state.

### Modifications
Returns the batch size of messageID directly.

(cherry picked from commit f87b370)
nodece and others added 28 commits November 26, 2022 15:53
…ge ack owner (#16245)

### Motivation

The broker don't need to go through all the consumers to get the ack owner consumer.
Instead, it should check the current consumer first. If the pending acks of current consumer
don't have the ack position, go through all the consumers to find the owner consumer.

(cherry picked from commit 68484f9)
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 291fedc)
…xception (#17512)

* [fix][tiered-storage] Don't cleanup data when offload met BadVersion
---

*Motivation*

There have two ways that will cause the offload data cleanup. One is met
offload conflict exception, and another is completeLedgerInfoForOffloaded
reaches max retry time and throws zookeeper exceptions.

We retry the zookeeper operation on connection loss exception. We should
be careful about this exception, because we may loss data if the metadata
update successfully.

When a MetaStore exception happens, we can not make sure the metadata update is
failed or not. Because we have a retry on the connection loss, it is
possible to get a BadVersion or other exception after retrying.

So we don't clean up the data if this happens.

*Modification*

- don't delete data if has meta store exception

* log error when skip deleting

* improve logs

(cherry picked from commit c2588ba)
link #17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test

(cherry picked from commit 1246d79)
…17687)

* Fix parsing partitionedKey with Base64 encode issue.

* release the buf

* fix checkstyle issue.

(cherry picked from commit f3cc107)
…nnel inactive (#17856)

### Motivation
https://github.com/apache/pulsar/blob/b89c1451551a6bbe681465726906a2e61c9d8a69/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L282-L297
The `pendingLookupRequestSemaphore` will leak when channel inactive. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`

### Modifications
We can't easily release the semaphore in `channelInactive`, because there are not only `LookUpRequest`. So release the semaphore when connectionException

### Verifying this change
Add unit test case to cover this change

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc-required`
(Your PR needs to update docs and you will update later)

- [x] `doc-not-needed`
bug fixs, no need doc

- [ ] `doc`
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)

(cherry picked from commit b451880)
#18219)

### Motivation
https://github.com/apache/pulsar/blob/b061c6ac5833c21e483368febebd0d30679a35e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L748-L774
The `pendingLookupRequestSemaphore` will leak when handleError. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`

related PR: #17856

### Modifications
We can't easily release the semaphore in `handleError`, because there are not only `LookUpRequest`. So release the semaphore when LookupException

### Verifying this change
Add unit test case to cover this change

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc-required`
(Your PR needs to update docs and you will update later)

- [x] `doc-not-needed`
bug fixs, no need doc

- [ ] `doc`
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)

(cherry picked from commit fad3ccc)
* Support LocalDateTime Conversion

* move `TimestampMicrosConversion` to correct line

(cherry picked from commit b31c5a6)
in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage.

https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456

after the schema is updated, close the `ledgerHandle`, just like schema-read:

https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
(cherry picked from commit 2620450)
…mes topics in one broker (#17526)

(cherry picked from commit 260f5c6)
… in `FunctionDetails` (#18111)

(cherry picked from commit 8ad7157)
…ving pending batch receives requests (#16160)

The consumer will apply the default batch receive policy even if the user will not use the batch receive API.

https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61

This will consume lots of CPU if the client have many consumers (100k consumers)

The Pulsar perf tool can also reproduce the problem if run the test with many consumers

If there is no pending batch receive operation for a consumer, no need to trigger the
batch timeout task periodically. We can only start the timeout check after adding batch
receive request to pending request queue.

Remove the lock in MultiTopicsConsumerImpl as #10352 does

Added new test to verify the batch receive timeout task will not start if no batch
receive request

(cherry picked from commit a0ccdc9)
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.