Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Conversation

@Demogorgon314
Copy link
Member

apache/pulsar#12600 changed behavior. Before this change, only tenant admin allow to list topics, after apache/pulsar#12600, it allows consume permission list topics.

Ting Yuan and others added 25 commits December 29, 2021 22:41
…ive#972)

In AdminManager there are some points in which we are not using correctly CompletableFuture.
This patch fixes deletePartitionedTopic and a couple of other parts.

This patch is trivial rework, no need to add tests
…#973)

When the Kafka client issues a Fetch and sets a maxWait time we are already scheduling a DelayedFetch, but there is no way to trigger that Fetch and it is deemed to wait for the fully timeout.
This adds latency spikes on the Kafka Consumer.

With this patch we are triggering any pending DelayedFetch in case of writing any record to one of the partitions interested by the Fetch.

This is only a first implementation, in the future we can make it better and do not trigger at the first record, but wait in any case for more records to come.
With this implementation the Fetch result will contain usually only 1 record, but this is enough to let the Kafka Client start a new Fetch cycle and do not waste time in doing nothing (waiting for maxWait).

Changes:
- trigger pending Fetches while producing to the topic
- add new metric WAITING_FETCHES_TRIGGERED
- add DelayedOperation#wakeup, that means the operation should wake up due to some trigger  (in this case the Production of records to the topic)
- add a new test that would fail without this patch (because the tests asserts that there is no idle cycle in the Consumer loop)
streamnative#908
### Motivation
In the current implementation, list topic and list offset might get the wrong response, because the authorized future is not finished yet. We need to make sure to authorize future finish correctly.

### Modifications
* Ensure all future is finished when the response
### Motivation
The current implementation of authorizing is using Pulsar's default authorization provider. For future maintenance, we better use the `AuthorizationService`. because not all users want to use Pulsar's default authorization provider.

### Modifications
1. Split CREATE, DELETE and ALTER.
2. Use `AuthorizationService` instead of Self-implementation authorization.

(cherry picked from commit c0c1e48)
Fixes streamnative#908

### Motivation
In the current implementation, list topic and list offset might get the wrong response, because the authorized future is not finished yet. We need to make sure to authorize future finish correctly. 

The previous fix only checked `allTopicMap` size, but we are traverse allTopicMap's value lists. So the same case might be failed.

### Modifications
* Ensure all future is finished when the response
…streamnative#1011)

With this patch when you configure kafkaAdvertisedListeners you can use the special placeholder `advertisedAddress` that picks up the value configured for `advertisedAddress` in broker.conf.

Something like:
`kafkaAdvertisedListeners=PLAINTEXT://advertisedAddress:9092`

Without this placeholder you have to inject the value using other external means, like adding stuff to the Helm Chart (status.podIP) and it is not trivial.
…eamnative#1016)

In PR streamnative#845, we added the conversion metrics of the number of messages, and the metrics of message conversion time is still very useful.
…treamnative#1021)

### Motivation

Currently when the metrics related to Kafka requests are updated, a `PrometheusStatsLogger` will be created each time a request arrived.

```java
            requestStats.getStatsLogger()
                    .scopeLabel(KopServerStats.REQUEST_SCOPE, apiName)
```

The `StatsLogger#scopeLabel` method calls `scope` method:

```java
    public StatsLogger scope(String name) {
        return new PrometheusStatsLogger(provider, completeName(name), labels);
    }
```

The newly created `PrometheusStatsLogger` object is nearly the same with the caller itself except the name. The shared `provider` field is responsible to maintain the cache of Prometheus stats.

Therefore, there is no need to create another object if the name keeps the same. This PR is to avoid the redundant small object creations of `PrometheusStatsLogger`.

### Modifications

Maintain a `ConcurrentHashMap` for each channel. The key is the `ApiVersions` object that represents a Kafka request's type, the value is the associated `StatsLogger` object.

In addition, since a `RequestStats` instance is associated with only one `StatsLogger` instance, this PR creates the `RequestStats` instance at the beginning and avoid creating the instance for each connection.
### Motivation

There is a thread safety issue for `MessageFetchContext`. The instance of that this class could be accessed by different threads. For example, I added some logs to each method and saw the following output:

| Thread name                                 | Method name             |
| ------------------------------------------- | ----------------------- |
| pulsar-io-18-12                             | `MessageFetchContext`   |
| pulsar-io-18-12                             | `handleFetch`           |
| pulsar-io-18-12                             | `handlePartitionData`   |
| pulsar-io-18-12                             | `checkOffsetOutOfRange` |
| BookKeeperClientWorker-OrderedExecutor-10-0 | `readEntries`           |
| BookKeeperClientWorker-OrderedExecutor-10-0 | `handleEntries`         |
| metadata-store-19-1                         | `tryComplete`           |
| metadata-store-19-1                         | `complete`              |
| metadata-store-19-1                         | `recycle`               |

Though all fields are never modified before `recycle()` is called, the visibility of these fields cannot be guaranteed without `volatile`.

### Modifications
- Add `volatile` keyword to fields of `MessageFetchContext` except the atomic variables and thread safe containers.
- For atomic variables and thread safe containers, make them `final` and initialize them in the constructor. In `recycle()` method, instead of resetting them with `null` values, just reset them to the default state.

This PR can also improve the performance. Because even if `MessageFetchContext` is managed by the Netty object pool, each time it's allocated from `MessageFetchContext#get`, the atomic variables and thread safe containers are still allocated from heap memory.
Fixes streamnative#957 streamnative#1007

## Motivation
Currently, the KoP use Kafka's implantation to check duplicate message, but it is hard to support `MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION > 1`.

However, we should reuse the Pulsar message duplication check in KoP by mapping `baseSequence` to `sequenceId` and `lastSequence` to `highestSequenceId`.

Pulsar is using producer name to persist sequenced, in KoP we want to follow the Kafka behavior. So we need to use a name role to build a producer name.

Because Kafka will reuse PID when the transaction ID is the same but will increase the producer Enoch. So we need to ensure the producer name is not the same.

So the producer name role is `PID_PREFIX-{producerId}-{producerEpoch}`.

## Modifications
Support `MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION > 1`, and reuse Pulsar message deuplication.

(cherry picked from commit 1841b82)
…Topic (streamnative#1057)

Fixes: streamnative#1056

### Motivation

The `testMultiBrokerProduceAndConsumeOnePartitionedTopic` test method fails sporadically. Because the sometimes delete topic will timeout. link PR: apache/pulsar#14060

### Modifications

* remove the call of `deletePartitionedTopic`

(cherry picked from commit 8c7afcd)
Fixes streamnative#1053

### Motivation

Currently, the idempotent producer test is not stable, because consumers don't provide orderly consumption, we should add a test to verify idempotent produce, and remove consumption orderly check.

### Modifications

* Add test for an idempotent producer in KafkaApiTest
* Remove consumption orderly check

(cherry picked from commit da8d3ce)
…treamnative#1054)

Fixes streamnative#1050

### Motivation

Currently `ReplicaManager#getPartitionLog` transfers exceptions thrown from `PartitionLog#appendRecords` to Kafka's error code by calling `Errors.forException` and treats the unknown error as `KAFKA_STORAGE_ERROR`. However, the exception is thrown from broker side:
- Control records are sent by `ManagedLedger#asyncAddEntry`, the `ManagedLedgerException` will be thrown if failed.
- Normal records are sent by `PersistentTopic#publishMessages`, the `BrokerServiceException` that wraps `ManagedLedgerException` will be thrown if failed.

After a bundle unload event, some entries might still be cached in the managed ledger. In this case, all entries will be failed with `ManagedLedgerAlreadyClosedException`. However, if Kafka producer received an error other than `InvalidMetadataException`, it won't update the metadata and send records to the same broker and failed again. See https://github.com/apache/kafka/blob/7c2d6724130f8251aa255917b74d9c3ef1fe67f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L636-L647

We should return the error code associated with `InvalidMetadataException` to notify Kafka producer it's time to find the new leader of the partition.

### Modifications

- When the managed ledger's state is wrong, complete the future with `NotLeaderForPartitionException`, which implements `InvalidMetadataException`.
- Add a test `testIllegalManagedLedger` to verify this error code is received when the managed ledger is closed.

(cherry picked from commit 3d48a3c)
This is a preliminary refactor that makes ProducerIdManager an interface.
The plan is to contribute a new implementation of ProducerIdManager that does not use the MetadataStore service (ZooKeeper) but it stores the ids on a Pulsar topic.

Using Pulsar as backing store for generating the ids is better when you are using this feature in a MultiTenant environment and you want to fully isolate the data of each tenants.

(cherry picked from commit b07222d)
…ive#1064)

### Motivation
The `isMarkerMessage` method is used for `ReplicatedSubscriptions`, we should use `isMarkerMessage` in KoP.

### Modifications
Do not override `isMarkerMessage ` method in KoP MessagePublishContext.

(cherry picked from commit 19d3a3a)
…ative#1069)

- streamnative#1049 changed behavior, therefore this change is needed

(cherry picked from commit bf2331a)
…with kafkaListeners (streamnative#1092)

This PR is a follow up bug fix of streamnative#1042 for branch-2.8.2. It also migrate some util methods and tests from streamnative#864.
)

Fixes  streamnative#1090

### Motivation
Currently, even if we config kafkaTransactionCoordinatorEnabled=false, the Kafka transaction coordinator is still started.
It happened when the broker has a consumer fetch message.


### Modifications
When `kafkaTransactionCoordinatorEnabled = false`, ensure the transaction coordinator don't start.
…treamnative#1103)

Fixes streamnative#1068

### Motivation

Currently, we are using lookup to authorize the list topic permission, we should follow the pulsar way by using `validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);` to check the permission.

### Modifications

List topic use pulsar GET_TOPICS permission

(cherry picked from commit 5e7b674)
**Modifications**
- use a shared threadpool
- lazily start the drainQueuedTransactionMarkers activity only when needed
- set period to 100ms (instead of 1ms)

**Result**
If a tenant never uses transactions, then you never start the background activity, also we are able to scale the number for active tenants.

A future improvement may be to shutdown the activity when a tenant is idle for a while

(cherry picked from commit 12cf707)
…ad (streamnative#1116)

**Motivation**
During OMB testing @dave2wave noticed that the BookKeeperClientScheduler-OrderedScheduler-X-Y thread pool is busy in doing decoding work (from Pulsar -> Kafka) and this is something that may limit the efficiency of the system because that threadpool handles all the BookKeeper activities.

**Modifications**
With this patch we are performing the "decoding" from Pulsar -> Kafka (con the consumer code path) in a threadpool that is not the BookKeeper Ordered Executor pool (BookKeeperClientScheduler-OrderedScheduler-X-Y).

**Results**
With this patch @dave2wave reported far better performances using OMB:
> the test results show the BookKeeperClientWorker-OrderedExecutor is working much less than before. It is about 40-45% cpu in our OMB tests now. The equivalent Pulsar workload is about 20% cpu. Before this fix the process blocked at 95-100%

with this patch we are able to push OMB to 1 million messages/second even with `entryFormat:pulsar`, before this patch this was possible in that env only with `entryFormat:kafka`

(cherry picked from commit 51563fe)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants