Skip to content

KAFKA-6144: Allow state stores to serve stale reads during rebalance#7868

Closed
vinothchandar wants to merge 21 commits intoapache:trunkfrom
vinothchandar:kafka-6144-standby-state-serving
Closed

KAFKA-6144: Allow state stores to serve stale reads during rebalance#7868
vinothchandar wants to merge 21 commits intoapache:trunkfrom
vinothchandar:kafka-6144-standby-state-serving

Conversation

@vinothchandar
Copy link
Copy Markdown
Member

@vinothchandar vinothchandar commented Dec 24, 2019

KIP-535 Implementation

Summary of testing strategy

  • Unit tests for standby metadata
  • Integration test for querying standbys, during rebalance
  • Local testing
  • Unit tests around APIs used for allLocalOffsetLags() API
  • Integration test for lag APIs; LagInfo class
  • Final test coverage good; all new methods; even some love for deprecated APIs
  • Streams System tests passing (1 test failing; currently triaging)
  • Special cases like source topic optimization;
  • Update KIP with latest

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vinothchandar
Copy link
Copy Markdown
Member Author

Test Result (1 failure / +1)
org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore

Is the flaky test, that also fails locally. Will see if there is a quick fix. Otherwise, it seems to pass.

@vvcephei This does not have the tests for the lag API yet. But if you could make a quick pass and see if we are close, tht would be awesome.. Will iterate and make one final pass based on your comments..

@brary
Copy link
Copy Markdown
Contributor

brary commented Dec 25, 2019

Test Result (1 failure / +1)
org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore

Is the flaky test, that also fails locally. Will see if there is a quick fix. Otherwise, it seems to pass.

@vvcephei This does not have the tests for the lag API yet. But if you could make a quick pass and see if we are close, tht would be awesome.. Will iterate and make one final pass based on your comments..

@vinothchandar I have fixed the test. It was failing as it used the assumption that standby stores don't return data so had to change the logic to differentiate between active and replica.

Copy link
Copy Markdown
Contributor

@AlanConfluent AlanConfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a WIP, but made a few comments while reading it over.

final long unknownPosition = 0;

// Obtain the current positions, of all the active-restoring and standby tasks
for (final StreamThread streamThread : this.threads) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to do some synchronization while going through all of these threads and their metadata?

Within StreamThread and TaskManager, it doesn't seem like some of these fields are threadsafe. Might make sense to use add locking and/or return an immutable copy of some of these maps.

I'm not too familiar with the code just yet, so please tell me if I'm missing something.

Copy link
Copy Markdown
Contributor

@brary brary Dec 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The information which we are fetching from StreamThreads is basically the state of each task i.e. whether an active/standby is in running/restoring/created state. And these state maps are concurrent hashmaps so I think should be fine. Let me know if I am missing any non thread safe use of these threads.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made another pass.. but I believe all the maps fetched are concurrent hash maps, which should support such access.

  • streamThread.allStandbyTasks() ultimately calls AssignedTasks#allTasks, that populates a new list from two concurrent maps
  • standbyTask.checkpointedOffsets() gets a new unmodifiable map as well, will make couple maps concurrent as well in ProcessorStateManager, for sake of avoiding future issues if some code were to call remove() on them
  • streamThread.restoringTaskIds(), streamThread.allStreamsTasks() all grab values from concurrent maps.
  • activeTask.restoredOffsets() actually does have one corner case, around suspending using ChangelogReader#remove. Will make the corresponding map concurrent as well.

if you see any other gap or a better way than fishing like this let us know :)

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
final String storeName = streamsMetadataState.getStoreForChangelogTopic(topicPartition.topic());
final long offsetPosition;
if (activeChangelogPositions.containsKey(topicPartition)) {
// if unknown, assume it's positioned at the tail of changelog partition
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For standbys, you conservatively assume that unknowns are at the head, which means that if another standby is known, that will be picked to read from. I assume that we'll always read from actives, regardless of their lag, is that right? What's the benefit of making this less conservative assumption here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we will always read from actives.. For e.g currently you can have an active be in restoring state, while standbys are farther ahead (just that no further processing will happen until active comes back up). KIP-441 is changing this..

}
localOffsetLags.put(storeName, partitionToOffsetLag);
});
} catch (final Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to move this try catch around adminClient.listOffsets(offsetSpecMap) to make this as localized as possible? You might accidentally catch another exception here (like the IllegalStateException) and have a misleading message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a localized catch and removed try-catch for the lines it was used earlier as I don't think IllegalStateException should throw a StreamsException. Let me know if my understanding is not correct.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vinothchandar , I've completed my high-level pass. I left comments where I had any concerns for you to follow up on.

One thing I didn't do is confirm that the public API changes match exactly what was proposed in the KIP. I assume you'll double-check that before calling for final reviews.

Can you also confirm that you've run the tests with coverage and all the new logic is being exercised by the tests? (There are a few unused methods that indicate testing gaps, but there could be more, hidden in branches). Also on this front, I'd just re-iterate that we should make sure that the lag-computation logic still works when the source-table optimization is enabled.

Thanks!

for (final StreamThread streamThread : this.threads) {
for (final StandbyTask standbyTask : streamThread.allStandbyTasks()) {
final Map<TopicPartition, Long> changelogPartitionLimits = standbyTask.checkpointedOffsets();
standbyTask.changelogPartitions().forEach(topicPartition ->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just something to bear in mind as you make more changes: it'd be better to just stick to regular for loops instead of using the forEach method on collections. On Streams it's more natural, but for regular collections, the for loop is just fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont have strong opinions on this. but like to understand the reasoning behind the suggestion? is it because a lot of the code follows this style?

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Comment thread streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java Outdated
}

@Test
public void shouldReturnEmptyLocalStorePartitionLags() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the topology is empty?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tasks are running.. The streams threads themselves are mocked at this test level. I had a hard time trying to mock different pieces. This test is verifying that certain methods are being called to compute the lag. thats all...

Real tests on lag values will happen on an integration test, that brings up a real topology. lmk if you have better suggestions/advice on testing this.

.metadataForKey(storeName, key, new StringSerializer());
if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
final KeyQueryMetadata metadata = streams
.queryMetadataForKey(storeName, key, new StringSerializer());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also preserve the tests for the deprecated methods?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pondered this early on. Seems some deprecated methods in KafkaStreams, just got their tests changed. I could call the other method as well metadataForKey and match it with the new API may be, to prevent just duplicating tests. wdyt

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/Users/vchandar/Code/kafka/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:352: warning: [deprecation] <K>metadataForKey(String,K,Serializer<K>) in KafkaStreams has been deprecated

ah yes. build fails when deprecated methods are called. Let me see if there is a way to supress this and have one call in the integration test that validates the new and old api against each other

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Jan 8, 2020

@vinothchandar , when you're ready to call for final reviews, can you also kick off a few runs of the Streams system tests to be sure they still pass?

@vinothchandar
Copy link
Copy Markdown
Member Author

@vvcephei Thanks for the review. I made a note of the pending test coverage, KIP update and system test results in the checklist above. I intend to get past that before calling the final review.

I have addressed most of your comments and pushed it up. I wanted to get a high level pass early on, mainly to get a confirmation that this code/implementation direction is on track and avoid surprises at the last minute.. e.g all these maps we are accessing to get offset information, do they look correct? . Do you see anything here that needs to change a lot?

vinothchandar and others added 21 commits January 13, 2020 10:10
     - Rough initial cut, with basic HA working. Many more changes needed still
     - [TODO] Introduce a new version for AssignmentInfo & support version probing
     - [TODO] Implement API changes in KafkaStreams (deprecation, new classes, new methods)
     - [TODO] Tests using metadataForKey(), redone over new APIs. New tests with standbys involved.
     - [TODO] Tests need to be (re)written, including version probing ones
 - Reimplemented KafkaStreams#allLocalOffsetLags()
 - Moved KeyQueryMetadata to org.apache.kafka.streams so we can import publicly
 - Minor renames, whitespace fixes
… from Kafka

 - Re-implement the assignment protocol to serialize the topic index just once
 - Code cleanups, small bug fixes
 - Also cleanup non-essential code formatting changes
…l as offset lag

  - Improved javadocs for new class and KeyQueryMetadata
  - Redid the tests, based on new APIs
@vinothchandar vinothchandar force-pushed the kafka-6144-standby-state-serving branch from 810e7c2 to 98879d7 Compare January 13, 2020 19:24
@vinothchandar
Copy link
Copy Markdown
Member Author

vinothchandar commented Jan 13, 2020

@vvcephei @brary Just updated the status in the PR description.. I am currently planning to tackle the 1 failing system test and the source topic optimization case above and update the KIP with the final details.

The optimization is per se not needed by ksql atm (not sure about navinder's use-case), I plan to spend some time on this to make it work. But, given my timelines/dealines, I would like to split it as a follow on if thats acceptable. (lag API will then throw exception when optimization is turned on for now)

@vinothchandar vinothchandar marked this pull request as ready for review January 13, 2020 19:34
@vinothchandar vinothchandar changed the title [WIP] KAFKA-6144: Allow state stores to serve stale reads during rebalance KAFKA-6144: Allow state stores to serve stale reads during rebalance Jan 13, 2020
Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@brary
Copy link
Copy Markdown
Contributor

brary commented Jan 14, 2020

@vvcephei @brary Just updated the status in the PR description.. I am currently planning to tackle the 1 failing system test and the source topic optimization case above and update the KIP with the final details.

The optimization is per se not needed by ksql atm (not sure about navinder's use-case), I plan to spend some time on this to make it work. But, given my timelines/dealines, I would like to split it as a follow on if thats acceptable. (lag API will then throw exception when optimization is turned on for now)

Hi @vinothchandar, thanks and looks reasonable to me. Even for me this works fine as it is.

@vvcephei
Copy link
Copy Markdown
Contributor

Hey all, to try and decrease the scope of this PR, I pulled out the part I feel most comfortable with merging as #7960 . Please take a look and make sure it still looks good to you (@vinothchandar and @brary ). I plan to kick off the system tests and merge that part of it after they pass.

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Jan 15, 2020

Hi all, I've factored out and merged two big parts of this PR:

  1. the ability to query standbys (KAFKA-6144: option to query restoring and standby #7962)
  2. the extensions to the cluster metadata API (KAFKA-9428: Add KeyQueryMetadata APIs to KafkaStreams #7960)

Once this PR is rebased, it should only include changes to computing and reporting the lags on local stores as proposed in KIP-535.

@vinothchandar
Copy link
Copy Markdown
Member Author

There is a separate PR for this already.. #7961

if everyone agrees, we can close this and just focus on that

@brary
Copy link
Copy Markdown
Contributor

brary commented Jan 18, 2020

Since other PR's have been merged. We can close this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants