Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Close offset consumers when channel is inactive#425

Merged
jiazhai merged 4 commits intostreamnative:masterfrom
BewareMyPower:bewaremypower/close-acker
Apr 6, 2021
Merged

Close offset consumers when channel is inactive#425
jiazhai merged 4 commits intostreamnative:masterfrom
BewareMyPower:bewaremypower/close-acker

Conversation

@BewareMyPower
Copy link
Copy Markdown
Collaborator

Fixes #229

Motivation

The consumers of OffsetAcker are used to commit offsets only. However, they would never be closed. Therefore the topics that are consumed by Kafka clients cannot be deleted because topic has active producers/subscriptions.

Modifications

  • Record all group IDs before group coordinator handles SYNC_GROUP request, which will call OffsetAcker#addOffsetsTracker to add offset consumers if success.
  • Close and remove the consumers associated with the recorded group IDs before during the close phase of channel. It's okay because OffsetAcker#getConsumer (called by ackOffsets) will recreate the consumer if it doesn't exist.
  • Add tests to delete a topic after all Kafka clients are closed.

@BewareMyPower BewareMyPower requested a review from jiazhai as a code owner April 5, 2021 15:25
@BewareMyPower BewareMyPower changed the title Close offset consumers when channel is inactive [WIP] Close offset consumers when channel is inactive Apr 5, 2021
@BewareMyPower
Copy link
Copy Markdown
Collaborator Author

Mark it as WIP since some tests failed with unknown reason, and integration tests all failed.

@BewareMyPower BewareMyPower changed the title [WIP] Close offset consumers when channel is inactive Close offset consumers when channel is inactive Apr 6, 2021
@jiazhai jiazhai merged commit 9a5c25d into streamnative:master Apr 6, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/close-acker branch April 6, 2021 04:06
jiazhai pushed a commit that referenced this pull request Apr 8, 2021
### Motivation
`testDeleteClosedTopics` is very easy to fail in CI environment because there's a race condition. #425 closes a group's offset consumers when the channel becomes inactive. However, before Kafka consumer closes, it sends a `COMMIT_OFFSET` request to commit offsets, which is the behavior of the default `enable.auto.commit=true` config. Currently, KoP acknowledges the offset's associated message id in the callback of `GroupMetadataManager#storeGroup`, see `GroupCoordinator#handleCommitOffsets`:

```java
        result.whenCompleteAsync((ignore, e) ->{
            if (e == null){
                offsetAcker.ackOffsets(groupId, offsetMetadata);
            }
        });
```

So it's asynchronous with `handleCommitOffsets` itself. There's a possibility that after the channel was closed and `OffsetAcker`'s consumer was removed and closed, `OffsetAcker#ackOffsets` would be invoked again, and `getConsumer` would be invoked so that a new offset consumer would be created to the topic.

Here're some related logs in CI tests as the evidence.

> 15:49:07.884 [pulsar-client-io-40-1:org.apache.pulsar.client.impl.ConsumerImpl@698] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-delete-closed-topics-partition-0][sub-2] Subscribing to topic on cnx [id: 0xb7       fd0eb7, L:/127.0.0.1:50468 - R:localhost/127.0.0.1:15002], consumerId 2
> 15:49:07.980 [TestNG-method=testDeleteClosedTopics-1:org.apache.kafka.clients.consumer.KafkaConsumer@2152] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-2, groupId=sub-2] Kafka consumer has been closed
> 15:49:07.986 [pulsar-client-io-40-1:org.apache.pulsar.client.impl.ConsumerImpl@698] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-delete-closed-topics-partition-0][sub-2] Subscribing to topic on cnx [id: 0xb7fd0eb7, L:/127.0.0.1:50468 - R:localhost/127.0.0.1:15002], consumerId 3
>  ...
>  org.apache.pulsar.broker.service.BrokerServiceException$TopicBusyException: Topic has 1 connected producers/consumers

We can see after the Kafka consumer was closed, a new offset consumer was created again.

### Modifications
`ackOffsets` does the acknowledgement asynchronously but may triggers the creation of an offset consumer, so this PR makes `OffsetAcker#ackOffsets` be called before `GroupMetadataManager#storeGroup` and it prevents an offset consumer being created after channel is inactive.

Besides, it adds some condition checks to the test. In CI environment, sometimes topics cannot be deleted by 404 (`Partitioned topic does not exist`).
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] the topic can not remove after the producer and consumer finished

2 participants