Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Fix flaky test: KafkaTopicConsumerManagerTest.testTopicManagerClose#846

Merged
BewareMyPower merged 1 commit intostreamnative:masterfrom
Demogorgon314:fix/flaky-test-809
Oct 26, 2021
Merged

Fix flaky test: KafkaTopicConsumerManagerTest.testTopicManagerClose#846
BewareMyPower merged 1 commit intostreamnative:masterfrom
Demogorgon314:fix/flaky-test-809

Conversation

@Demogorgon314
Copy link
Member

@Demogorgon314 Demogorgon314 commented Oct 25, 2021

Because the KafkaRequestHandler.close() is called by channelInActive, when channelInActive called, the tcp connection is already closed. We need ensure topicManager.close() is called.

channelInActive call root

We can easily reproduce by add some sleep in KafkaRequestHandler.close() like this:

@Override
protected void close() {
    if (isActive.getAndSet(false)) {
        super.close();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        topicManager.close();
        String clientHost = ctx.channel().remoteAddress().toString();
        if (currentConnectedGroup.containsKey(clientHost)){
            log.info("currentConnectedGroup remove {}", clientHost);
            currentConnectedGroup.remove(clientHost);
        }
        producePurgatory.shutdown();
        fetchPurgatory.shutdown();

        // update alive channel count stat
        RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
    }
}

@Demogorgon314 Demogorgon314 self-assigned this Oct 25, 2021
@Demogorgon314 Demogorgon314 marked this pull request as ready for review October 25, 2021 11:39
@BewareMyPower BewareMyPower linked an issue Oct 26, 2021 that may be closed by this pull request
@BewareMyPower BewareMyPower merged commit 9fa6e1f into streamnative:master Oct 26, 2021
@Demogorgon314 Demogorgon314 deleted the fix/flaky-test-809 branch October 26, 2021 11:40
BewareMyPower pushed a commit that referenced this pull request Oct 26, 2021
Because the `KafkaRequestHandler.close()` is called by `channelInActive`, when `channelInActive` called, the tcp connection is already closed. We need ensure `topicManager.close()` is called. 

[channelInActive call root](https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/AbstractChannel.java#L765)

We can easily reproduce by add some sleep in `KafkaRequestHandler.close()` like this:

```java
@OverRide
protected void close() {
    if (isActive.getAndSet(false)) {
        super.close();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        topicManager.close();
        String clientHost = ctx.channel().remoteAddress().toString();
        if (currentConnectedGroup.containsKey(clientHost)){
            log.info("currentConnectedGroup remove {}", clientHost);
            currentConnectedGroup.remove(clientHost);
        }
        producePurgatory.shutdown();
        fetchPurgatory.shutdown();

        // update alive channel count stat
        RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
    }
}
```
BewareMyPower pushed a commit that referenced this pull request Oct 29, 2021
Because the `KafkaRequestHandler.close()` is called by `channelInActive`, when `channelInActive` called, the tcp connection is already closed. We need ensure `topicManager.close()` is called. 

[channelInActive call root](https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/AbstractChannel.java#L765)

We can easily reproduce by add some sleep in `KafkaRequestHandler.close()` like this:

```java
@OverRide
protected void close() {
    if (isActive.getAndSet(false)) {
        super.close();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        topicManager.close();
        String clientHost = ctx.channel().remoteAddress().toString();
        if (currentConnectedGroup.containsKey(clientHost)){
            log.info("currentConnectedGroup remove {}", clientHost);
            currentConnectedGroup.remove(clientHost);
        }
        producePurgatory.shutdown();
        fetchPurgatory.shutdown();

        // update alive channel count stat
        RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
    }
}
```
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: KafkaTopicConsumerManagerTest.testTopicManagerClose

2 participants