Skip to content

KAFKA-10030 allow fetching a key from a single partition#8706

Merged
mjsax merged 12 commits intoapache:trunkfrom
dima5rr:KAFKA-10030
Jun 2, 2020
Merged

KAFKA-10030 allow fetching a key from a single partition#8706
mjsax merged 12 commits intoapache:trunkfrom
dima5rr:KAFKA-10030

Conversation

@dima5rr
Copy link
Copy Markdown
Contributor

@dima5rr dima5rr commented May 21, 2020

StreamThreadStateStoreProvider#stores throws exception whenever taskId is not found, which is not correct behaviour in multi-threaded env where state store partitions are distributed among several StreamTasks.

final Task task = tasks.get(keyTaskId);
if (task == null) {
throw new InvalidStateStoreException(
String.format("The specified partition %d for store %s does not exist.",
storeQueryParams.partition(),
storeName));
}
Reproducible with KStream number of threads more then 1

StoreQueryIntegrationTest#streamsConfiguration

config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

Suggested solution is to not throw exception if at least one state store is found, which is always true when using StoreQueryParameters.withPartition

https://issues.apache.org/jira/browse/KAFKA-10030?jql=project%20%3D%20KAFKA%20AND%20component%20%3D%20streams

@mjsax @guozhangwang

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 21, 2020

Retest this please.

@mjsax mjsax added the streams label May 21, 2020
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 21, 2020

@vinothchandar @brary for review

@dima5rr dima5rr requested a review from mjsax May 21, 2020 20:50
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 23, 2020

Retest this please.

1 similar comment
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 23, 2020

Retest this please.

Copy link
Copy Markdown
Contributor

@brary brary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM!

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding tests. Some minor follow up comments

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

Retest this please.

2 similar comments
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

Seems Jenkins is too busy... Will try again later

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

Retest this please.

1 similar comment
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

Retest this please.

Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style check failed

@abbccdda
Copy link
Copy Markdown

15:01:33 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:25: Using the '.*' form of import should be avoided - org.apache.kafka.streams.*. [AvoidStarImport]
15:01:33 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:56: Using the '.*' form of import should be avoided - org.hamcrest.Matchers.*. [AvoidStarImport]

cc @dima5rr

@dima5rr dima5rr requested a review from mjsax May 28, 2020 09:23
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 28, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 28, 2020

Retest this please.


@Test
public void shouldReturnKVStoreWithPartitionWhenItExists() {
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to test if the right store is returned instead of just checking for not-null? For this, in before() we need to get a reference on the store we pass into addStore()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think how to validate returned store reference, QueryableStoreProvider always wraps it in CompositeReadOnlyKeyValueStore?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... Good point. Let leave it as-is. It's also covered in integration tests that the right store is returned.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 28, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 29, 2020

Retest this please.

2 similar comments
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 29, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 29, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 29, 2020

Jenkins does not cooperator right now. Will try again later.

@vvcephei
Copy link
Copy Markdown
Contributor

Test this please

Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mjsax mjsax merged commit ec67788 into apache:trunk Jun 2, 2020
mjsax pushed a commit that referenced this pull request Jun 2, 2020
Reviewers: Navinder Pal Singh Brar <navinder_brar@yahoo.com>, Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
mjsax pushed a commit that referenced this pull request Jun 2, 2020
Reviewers: Navinder Pal Singh Brar <navinder_brar@yahoo.com>, Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 2, 2020

Thanks for the PR @dima5rr.

Merged to trunk and cherry-picked to 2.6 and 2.5 branches.

ijuma added a commit to confluentinc/kafka that referenced this pull request Jun 3, 2020
* apache-github/2.6: (32 commits)
  KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786)
  KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737)
  KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695)
  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777)
  MINOR: Remove unused variable to fix spotBugs failure (apache#8779)
  MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773)
  KAFKA-10030: Allow fetching a key from a single partition (apache#8706)
  Kafka-10064 Add documentation for KIP-571 (apache#8760)
  MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750)
  KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668)
  KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956)
  KAFKA-10074: Improve performance of `matchingAcls` (apache#8769)
  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723)
  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739)
  KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705)
  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749)
  KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238)
  KAFKA-9501: convert between active and standby without closing stores (apache#8248)
  MINOR: Relax Percentiles test (apache#8748)
  MINOR: regression test for task assignor config (apache#8743)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants