Skip to content

KAFKA-9445: Allow adding changes to allow serving from a specific partition#7984

Merged
mjsax merged 11 commits intoapache:trunkfrom
brary:navinderbrar-KAFKA-9445-allow-serving-from-specific-partition
Jan 30, 2020
Merged

KAFKA-9445: Allow adding changes to allow serving from a specific partition#7984
mjsax merged 11 commits intoapache:trunkfrom
brary:navinderbrar-KAFKA-9445-allow-serving-from-specific-partition

Conversation

@brary
Copy link
Copy Markdown
Contributor

@brary brary commented Jan 19, 2020

This is the implementation of KIP-562: https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@brary brary requested a review from vvcephei January 19, 2020 10:40
@brary
Copy link
Copy Markdown
Contributor Author

brary commented Jan 22, 2020

@vinothchandar Can you scan through once. I am not sure why are the checks not running on this?

@mumrah
Copy link
Copy Markdown
Member

mumrah commented Jan 23, 2020

@brary assuming the vote passes (which it looks like it will), do you think we can get this in for 2.5? The code freeze is Feb 12

Copy link
Copy Markdown
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass. Looks about right to me. My suggestion would be to add an integration test, where we filter by a partition and try to fetch a key belonging to a different partition and make sure we are not able to fetch the value..

final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
final Set<String> sourceTopicsSet = sourceTopics.stream().collect(Collectors.toSet());
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks right to me.. (I can't be sure unless I step through the code though :))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added an integration test covering all 4 cases.

  • Fetch a key, by default gets stores for all partitions without stale store. Only active returns the value and standby returns null.

  • Fetch a key from a stale store from a specific partition. If the partition is available and active, it returns the key if not it throws InvalidStateStoreException.

  • Fetch a key with stale stores enabled, active and standby both should return the value.

  • Fetch a key with stale stores and enableda and from a specific partition, active and standby for that partition, both return the value. If you ask the key from a wrong partition, it returns null.

@brary
Copy link
Copy Markdown
Contributor Author

brary commented Jan 24, 2020

@brary assuming the vote passes (which it looks like it will), do you think we can get this in for 2.5? The code freeze is Feb 12

Hi David, yes it will be done by then.

@brary brary changed the title [WIP: KAFKA-9445] Allow adding changes to allow serving from a specific partition KAFKA-9445: Allow adding changes to allow serving from a specific partition Jan 25, 2020
@brary
Copy link
Copy Markdown
Contributor Author

brary commented Jan 27, 2020

Calling for a review @vvcephei @guozhangwang @mjsax . I hope we can send this in 2.5

@brary brary requested a review from guozhangwang January 28, 2020 03:38
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

@mumrah When do you plan to cut the release branch? We would love to get this in? Overall, the PR is good enough for merging if necessary to meet the deadline and to get the public API changes in. We could do a follow up PR to address the review comments.

@brary -- for public API changes, feature freeze deadline is the one we need to consider (not code freeze).

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/StoreQueryParams.java Outdated
//visible for testing
public void setStoreQueryParams(final StoreQueryParams storeQueryParams) {
this.storeQueryParams = storeQueryParams;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this setter? Why can't we pass in StoreQueryParams in the constructor in the tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were four different tests in this class, with different storeName or queryable store type for each test. So, I would have to create 4 different instances of wrappingStoreProvider in the #before (one for each test) if I don't add a setter. I thought this to easier. I can surely do that as well. LMK.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be cleaner to create a new instance for each test -- if you test WrappingStoreProvider for a different use case, it makes total sense to create a new object. We should only pock holes into the API if it is absolutely necessary to keep clean API abstractions.

// Assert that only active is able to query for a key by default
assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue()));
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I understand this test? If there are two instances, and a topic with 2 partitions, each instance is hosting one task. Hence, how does this test verify that all partitions are considered for querying?

  1. there is only one partition
  2. even if there would be two partitions, how could this test verify if only one partition was queried or both?

Maybe enabling stale querying helps to verify?

Similar question about the below test. Or do I miss something?

Copy link
Copy Markdown
Contributor Author

@brary brary Jan 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you are right. Although, my main point for writing this test was that when no partition is implicitly provided, the key fetch returns a value(and only on the machine where active resides). I have already written tests to verify that enabling stale stores returns value as well in shouldQueryAllStalePartitionStores() so we can remove this test as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to verify that only the active task returns data, the test name should indicate this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mumrah
Copy link
Copy Markdown
Member

mumrah commented Jan 30, 2020

When do you plan to cut the release branch? We would love to get this in? Overall, the PR is good enough for merging if necessary to meet the deadline and to get the public API changes in. We could do a follow up PR to address the review comments.

@mjsax the plan is to cut the branch tomorrow (1/30). If you think this PR is in a good enough state to merge, perhaps we should do that and address review comments in a follow up (as suggested). Unless of course @brary fixes everything and gets this merged to trunk before I make the branch 😀

@brary
Copy link
Copy Markdown
Contributor Author

brary commented Jan 30, 2020

When do you plan to cut the release branch? We would love to get this in? Overall, the PR is good enough for merging if necessary to meet the deadline and to get the public API changes in. We could do a follow up PR to address the review comments.

@mjsax the plan is to cut the branch tomorrow (1/30). If you think this PR is in a good enough state to merge, perhaps we should do that and address review comments in a follow up (as suggested). Unless of course @brary fixes everything and gets this merged to trunk before I make the branch 😀

Thanks for the review @mjsax . Get so much to learn during reviews as well. :)

I have addressed the majority of comments. This should be good to be merged. Will address a few left in a follow-up.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR. I am merging it to get it into 2.5 release. We should still address open comment is a follow up PR. The public JavaDocs still need some work (added more comments); and not all comments from the first review got addressed yet.

\cc @vvcephei @vinothchandar @guozhangwang

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
* and it is not restoring from the changelog.
* If true, allow queries on standbys and restoring replicas in addition to active ones.
* @param <T> return type
* @param storeQueryParams to set the optional parameters to fetch type of stores user wants to fetch when a key is queried
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove optional the object is used to set mandatory and optional parameters. Overall it reads a little bit complicated. Not sure atm how to improve it. Maybe @vinothchandar or @vvcephei have some ideas?

* available on that instance for that particular store name.
* It contains a partition, which for a point queries can be populated from the {@link KeyQueryMetadata}.
*/
public class StoreQueryParams<T> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an after though, I am wondering why we not call this class StoreQueryParameters -- using an abbreviation does not really make a good name IMHO?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I can see that now. @vvcephei wdyt? It being a public class and passed as a part of KIP, can I directly change it in PR?

/**
* Represents all the query options that a user can provide to state what kind of stores it is expecting.
* The options would be whether a user would want to enable/disable stale stores
* or whether it knows the list of partitions that it specifically wants to fetch.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it knows -> they know
it specifically wants -> they specifically want

"list of partitions" -- a user can only specify a single partition (this make we wonder: the optimization we do, only applies to single key lookups? For range queries, a user should never limit the number of partitions? -- if yes, we should explain this detailed in the JavaDocs of withPartition(...) -- one exception might be, if the underlying data is range-partitioned and the user would know that the range that is queried is solely contained in a single partition)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, if a user knows which partition to exactly query for range() or all() or approxnum() apart from get(), withPartition would be helpful for those functions as well. As you said if a data is range partitioned or you specifically want to approx entries in each partition(I do that sometimes to see skew in partitions). I will add this to JavaDocs.

for (final Task streamTask : tasks.values()) {
if (keyTaskId != null && !keyTaskId.equals(streamTask.id())) {
continue;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you missed this commnet? If you disagree with my comment, can you elaborate why (maybe I am missing something)?

if (partition == null) {
return null;
}
final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang -- as this is internal, we can still improve on it in a follow up.

// Assert that only active is able to query for a key by default
assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue()));
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to verify that only the active task returns data, the test name should indicate this.

@mjsax mjsax merged commit 05b2361 into apache:trunk Jan 30, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 2, 2020
Conflicts and/or compiler errors due to the fact that we
temporarily reverted the commit that removes
Scala 2.11 support:

* SslAdminIntegrationTest: keep using JAdminClient,
take upstream changes otherwise.
* ReassignPartitionsClusterTest: keep using
JAdminClient, take upstream changes otherwise.
* KafkaApis: use `asScala.foreach` instead of
`forEach`.

# By Ismael Juma (3) and others
# Via GitHub
* apache-github/trunk: (22 commits)
  KAFKA-9437; Make the Kafka Protocol Friendlier with L7 Proxies [KIP-559] (apache#7994)
  KAFKA-9375: Add names to all Connect threads (apache#7901)
  MINOR: Introduce 2.5-IV0 IBP (apache#8010)
  KAFKA-8503; Add default api timeout to AdminClient (KIP-533) (apache#8011)
  Add retries to release.py script (apache#8021)
  KAFKA-8162: IBM JDK Class not found error when handling SASL (apache#6524)
  MINOR: Add explicit result type in public defs/vals (apache#7993)
  KAFKA-9408: Use StandardCharsets.UTF-8 instead of "UTF-8" (apache#7940)
  KAFKA-9474: Adds 'float64' to the RPC protocol types (apache#8012)
  KAFKA-9360: Allow disabling MM2 heartbeat and checkpoint emissions (apache#7887)
  KAFKA-7658: Add KStream#toTable to the Streams DSL (apache#7985)
  KAFKA-9445: Allow adding changes to allow serving from a specific partition (apache#7984)
  KAFKA-9422: Track the set of topics a connector is using (KIP-558) (apache#8017)
  KAFKA-9040; Add --all option to config command (apache#7607)
  KAFKA-4203: Align broker default for max.message.bytes with Java producer default (apache#4154)
  KAFKA-9426: Use switch instead of chained if/else in OffsetsForLeaderEpochClient (apache#7959)
  KAFKA-9405: Use Map.computeIfAbsent where applicable (apache#7937)
  KAFKA-9026: Use automatic RPC generation in DescribeAcls (apache#7560)
  MINOR: Remove unused fields in StreamsMetricsImpl (apache#7992)
  KAFKA-9460: Enable only TLSv1.2 by default and disable other TLS protocol versions (KIP-553) (apache#7998)
  ...
@brary
Copy link
Copy Markdown
Contributor Author

brary commented Feb 2, 2020

Opened PR : #8033 as a follow up to address code review comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants