KAFKA-15942: Implement ConsumerInterceptor#14963
KAFKA-15942: Implement ConsumerInterceptor#14963Joker-5 wants to merge 2 commits intoapache:trunkfrom
Conversation
vamossagar12
left a comment
There was a problem hiding this comment.
Thanks for the PR @Joker-5. I looked at the ticket KAFKA-15492, the ask seems to be to implement ConsumerInterceptor in AsyncKafkaConsumer. I don't see those changes in this PR. Also, I see a test failure like testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() – org.apache.kafka.clients.consumer.internals.EagerConsumerCoordinatorTest having this stacktrace:
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.internals.ConsumerInterceptors.onCommit(java.util.Map)" because the return value of "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.access$1000(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)" is null
Stacktrace
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.internals.ConsumerInterceptors.onCommit(java.util.Map)" because the return value of "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.access$1000(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)" is null
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$2.onSuccess(ConsumerCoordinator.java:1062)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$2.onSuccess(ConsumerCoordinator.java:1057)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.addListener(RequestFuture.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1057)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1008)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.autoCommitOffsetsAsync(ConsumerCoordinator.java:1185)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsAsync(ConsumerCoordinator.java:1202)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:720)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:472)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.prepareCoordinatorForCloseTest(ConsumerCoordinatorTest.java:3761)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout(ConsumerCoordinatorTest.java:1381)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
Do you think, that is related to the changes in this PR?
…rceptors#onCommit method
…rceptors#onCommit method
de10628 to
22d2b46
Compare
|
@vamossagar12 Thanks for reviewing the PR! At the beginning I misunderstood this ticket, now I understood and the code has already updated. Would you have time to take a look at this PR? |
|
hmm, the JDK21 build failed with this error => |
|
Thanks for the changes @Joker-5 . I will take a look this week. |
|
Hey @Joker-5, I took the ticket since your original PR seemed to only change the legacy consumer, so I thought it was just linked to the wrong ticket. I think there are some things missing here
How about we merge my PR which has the two changes and I add you in a |
Hi @lucasbru, I understand. This is the second PR which i committed to Kafka so it seems a bit confusing. Now I learned a lot from your PR, so just do it. There're some information for
|
Thanks so much for the review! |
|
@Joker-5 , makes sense and thanks for letting me know. We can close this PR then? |
Sure, I'll close this PR soon. |
|
@Joker-5 I can add it in the PR description so that I don't forget. But the tag needs to added to the final commit, once we merge it, so we cannot really add it yet |
|
There is one here: 1a54c25 |
Got it, feel so sorry to trouble you again. |
When invoking
ConsumerInterceptor#onCommitmethod in ConsumerCoordinator, there're some redundant if-nonNull branches.private final ConsumerInterceptors<?, ?> interceptorsis a non-null field because it'll be instantiate in the constructor, and this field is non-null when delivering from LegacyKafkaConsumer.Some proofs:
There're 2 constructors in LegacyKafkaConsumer, both of them will instantiate
interceptorsfield:this.interceptors = new ConsumerInterceptors<>(interceptorList);this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());And this field can not set to null because it's private and no method to modify it.
Committer Checklist (excluded from commit message)