Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Conversation

@BewareMyPower
Copy link
Collaborator

@BewareMyPower BewareMyPower commented Jan 25, 2022

Fixes #1032

Motivation

When a consumer of multiple partitions polls empty, i.e. there is no available message a DelayedFetch instance is created and added to DelayedOperationPurgatory. The DelayedFetch instance can be triggered when the timeout (maxWaitMs) exceeds or triggered by KafkaRequestHandler#notifyPendingFetches when some messages are sent successfully.

The latter behavior was introduced from #973. However, it could cause NPE because the MessageFetchContext instance held by DelayedFetch might have already be recycled. It's because there is no way to notify the purgatory that there is a delayed fetch operation whose messageFetchContext field is null now. For a MessageFetchContext instance, if one partition has available messages, the tryComplete method will be triggered eventually. Then recycle() will be called. But the delayed fetch operation won't be removed.

Therefore, onDataWrittenToSomePartition will be called on a recycled MessageFetchContext in DelayedFetch#tryComplete, and NPE will happen because all fields of MessageFetchContext are null now.

The KoP release 2.8.2.2+ and 2.9.1.2+ are affected by the bug.

Modifications

We can simply fix this bug by adding null check in MessageFetchContext#onDataWrittenToSomePartition. However, instead of that, this PR chooses to save the DelayedFetch instances in MessageFetchContext and remove them from DelayedOperationPurgatory in recycle() method.

Compared with adding null check in onDataWrittenToSomePartition, the solution of this PR can remove invalid DelayFetch instances from the purgatory and the associated TimerTask instances from the task list.

This PR adds a test testEmptyPollWhenProduceAndConsumeConcurrently to reproduce the empty polls for a consumer of multiple partitions by increasing the maxWaitMs to 3 seconds. The test will fail if without the changes in kafka-impl module.

@Demogorgon314
Copy link
Member

I don't think this patch can fix the NPE, I rerun the benchmark by using this patch, it still has an NPE problem.

Openmessage benchmark error logs

09:48:59.112 [pool-2-thread-1] ERROR - exception occur while consuming message
java.lang.IllegalStateException: Unexpected error code 7 while fetching at offset 333183 from topic-partition test-topic-irAkc1k-0000-13
	at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1373) ~[kafka-clients-3.0.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:648) ~[kafka-clients-3.0.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304) ~[kafka-clients-3.0.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:?]
	at io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkConsumer.lambda$new$0(KafkaBenchmarkConsumer.java:58) ~[driver-kafka-0.0.1-SNAPSHOT.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

Broker logs

2022-01-26T09:48:48,281+0800 [pulsar-io-29-8] WARN  io.netty.util.concurrent.AbstractEventExecutor - A task raised an exception. Task: io.streamnative.pulsar.handlers.kop.KafkaRequestHandler$$Lambda$1494/0x0000000800b7b440@525beb64
io.streamnative.pulsar.handlers.kop.utils.KopTopic$KoPTopicIllegalArgumentException: Invalid short topic name 'test-topic-irAkc1k-0000', it should be in the format of <tenant>/<namespace>/<topic> or <topic>
	at io.streamnative.pulsar.handlers.kop.utils.KopTopic.expandToFullName(KopTopic.java:77) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.utils.KopTopic.<init>(KopTopic.java:59) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.utils.KopTopic.toString(KopTopic.java:96) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$handleFetch$4(MessageFetchContext.java:319) ~[?:?]
	at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.handleFetch(MessageFetchContext.java:316) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.onDataWrittenToSomePartition(MessageFetchContext.java:255) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.DelayedFetch.tryComplete(DelayedFetch.java:63) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation.maybeTryComplete(DelayedOperation.java:127) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:363) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:229) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.lambda$notifyPendingFetches$32(KafkaRequestHandler.java:1007) ~[?:?]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]

@BewareMyPower
Copy link
Collaborator Author

OK. I think it's another corner case that we should take care.

@BewareMyPower BewareMyPower marked this pull request as draft January 26, 2022 01:59
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG]openmessage Rebalance failed. Unable to consume

2 participants