Skip to content

KAFKA-16684: Remove cache in responseData#15966

Closed
johnnychhsu wants to merge 4 commits intoapache:trunkfrom
johnnychhsu:KAFKA-16684/remove_cache
Closed

KAFKA-16684: Remove cache in responseData#15966
johnnychhsu wants to merge 4 commits intoapache:trunkfrom
johnnychhsu:KAFKA-16684/remove_cache

Conversation

@johnnychhsu
Copy link
Copy Markdown
Contributor

Context

The response data should change accordingly to the input, however with the current design, it will not change even if the input changes. We should remove this cache logic to avoid returning wrong data.
Jira: https://issues.apache.org/jira/browse/KAFKA-16684

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@johnnychhsu johnnychhsu marked this pull request as ready for review May 15, 2024 15:19
responseDataTmp.put(new TopicPartition(name, partition.partitionIndex()), partition));
}
});
responseData = responseDataTmp;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I do concern! The responseData could be different if the input gets changed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review!
yes, the returned data should be calculated on the fly based on the input topic names

@johnnychhsu johnnychhsu force-pushed the KAFKA-16684/remove_cache branch from dfadc1a to 1291dd2 Compare June 6, 2024 12:41
@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Jun 6, 2024

@johnnychhsu This is a bug fix, so please add test for it

Copy link
Copy Markdown
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! The only thing which I am not sure is that do we have any reason to cache the response for faster processing i.e. how frequently this method is called on same topicNames? If that call is too frequesnt with no change in topicNames then will it be worth to re-compute only if topicNames have changed?

The reason I say that is because the method call happens while fetching topicNames from session, which mostly will remain same.

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Jul 2, 2024

The reason I say that is because the method call happens while fetching topicNames from session, which mostly will remain same.

that is a good point. Maybe the cache should be hosted by session instead of response data. the response data can be accessed by everyone, so it seems to me that is not a good choice to have cache.

@apoorvmittal10
Copy link
Copy Markdown
Contributor

The reason I say that is because the method call happens while fetching topicNames from session, which mostly will remain same.

that is a good point. Maybe the cache should be hosted by session instead of response data. the response data can be accessed by everyone, so it seems to me that is not a good choice to have cache.

Sounds fair 👍

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Jul 2, 2024

Sounds fair

I file https://issues.apache.org/jira/browse/KAFKA-17065 to log it

@m1a2st
Copy link
Copy Markdown
Collaborator

m1a2st commented Jul 3, 2024

LGTM, I wll add test after this PR merge into trunk.

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Jul 3, 2024

org.gradle.api.internal.tasks.testing.TestSuiteExecutionException: Could not complete execution for Gradle Test Executor 100.
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:64)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:119)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:66)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at net.bytebuddy.description.type.TypeDescription$ForLoadedType.of(TypeDescription.java:8619)
	at net.bytebuddy.description.method.MethodDescription$ForLoadedMethod.getDeclaringType(MethodDescription.java:1190)
	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.isOverridden(MockMethodAdvice.java:199)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.isUnavailable(ConsumerNetworkClient.java:560)
	at org.apache.kafka.clients.consumer.internals.Fetcher.isUnavailable(Fetcher.java:87)
	at org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:427)
	at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:105)
	at org.apache.kafka.clients.consumer.internals.FetcherTest.sendFetches(FetcherTest.java:246)
	at org.apache.kafka.clients.consumer.internals.FetcherTest.testFetcherConcurrency(FetcherTest.java:2943)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)

the failed test is related to this PR. In the test case testFetcherConcurrency, it does not return correct sessionTopicNames so normally it should NOT see the correct response data. However, FetchResponse#responseData will return the cached data regardless of input, so it CAN get correct response data even though it pass empty topicNames. That is a good example of showing the potential bug :)

@m1a2st Could you copy the changes of this PR to another one, and please fix testFetcherConcurrency according to my comment. Also, please add new test for the change.

@johnnychhsu Sorry, I can't merge this PR as it causes the failed test. Please feel free to close this PR as @m1a2st will leverage this PR to complete it :)

@m1a2st
Copy link
Copy Markdown
Collaborator

m1a2st commented Jul 3, 2024

@chia7712, Thanks for your comments, I will open new PR for this issue.

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Jul 4, 2024

close this PR @m1a2st will file another one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants