Skip to content

KAFKA-9176: Retry on getting local stores from KafkaStreams#8568

Merged
guozhangwang merged 3 commits intoapache:trunkfrom
guozhangwang:K9176-get-store-with-exception
Apr 28, 2020
Merged

KAFKA-9176: Retry on getting local stores from KafkaStreams#8568
guozhangwang merged 3 commits intoapache:trunkfrom
guozhangwang:K9176-get-store-with-exception

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

This PR fixes and improves two major issues:

  1. When calling KafkaStreams#store we can always get an InvalidStateStoreException, and even waiting for Streams state to become RUNNING is not sufficient (this is also how OptimizedKTableIntegrationTest failed). So I wrapped all the function with a Util wrapper that captures and retries on that exception.

  2. While trouble-shooting this issue, I also realized a potential bug in test-util's produceKeyValuesSynchronously, which creates a new producer for each of the record to send in that batch --- i.e. if you are sending N records with a single call, within that call it will create N producers used to send one record each, which is very slow and costly.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @vvcephei @ableegoldman for reviews.

time.sleep(1L);
final boolean enableTransactions) {

try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fix 2).

return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
}

public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those functions are not used anywhere, ditto below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the cleanup


// Assert that the current value in store reflects all messages being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor fix, that we should retry this condition.

config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fix to the test itself: with caching the records are delayed sending to the sink topics.

@@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
// Assert that all messages in the second batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore still failed on one of the builds at this line :/
But, at least we got farther into the test before it failed so I'd say this is still an improvement 😄

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! One question about why we sometimes do a null check (and sometimes don't) but otherwise LGTM. Kind of a bummer that OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore still failed on one of the builds but feel free to merge this as a first step and investigate the new failure separately

kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(replicatedStore);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to check for null now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since previously we would just throw the exception with the un-wrapped call, here asserting it is not null is equal to make sure that the store is indeed returned.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

I looked at the three failed tests:

So I think this PR is good to be merged.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few minor comments, which you can take or leave :)

Thanks for the awesome improvement!


final Set<KeyValue<Long, Long>> expectedStoreContent) throws InterruptedException {
final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
.getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore());
.getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Comment on lines +341 to +342
if (store == null)
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a huge deal, but technically, these should have brackets.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

final Future<RecordMetadata> f = producer.send(
new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
f.get();
producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the flush at the end makes it synchronous anyway?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we wait after sending each record, here we only wait once after sending all records, so it is more efficient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That's what I was asking for confirmation on. I realize now the structure of my sentence was ambiguous.

I agree that the method contract is that the batch should be synchronously produced, not that each record should be synchronously produced, so this change looks good to me.

return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
}

public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the cleanup

@guozhangwang guozhangwang merged commit 11fc953 into apache:trunk Apr 28, 2020
@guozhangwang guozhangwang deleted the K9176-get-store-with-exception branch April 28, 2020 22:04
@guozhangwang
Copy link
Copy Markdown
Contributor Author

Merged to trunk.

ijuma added a commit to confluentinc/kafka that referenced this pull request Apr 30, 2020
…/master`

* apache-github/trunk: (45 commits)
  MINOR: Fix broken JMX link in docs by adding missing starting double quote (apache#8587)
  KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219 (apache#8567)
  KAFKA-9922: Update demo instructions in examples README (apache#8559)
  KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (apache#8442)
  KAFKA-9875: Make integration tests more resilient (apache#8578)
  KAFKA-9932: Don't load configs from ZK when the log has already been loaded (apache#8582)
  KAFKA-9925: decorate pseudo-topics with app id (apache#8574)
  KAFKA-9832: fix attempt to commit non-running tasks (apache#8580)
  KAFKA-9127: don't create StreamThreads for global-only topology (apache#8540)
  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
  KAFKA-9176: Retry on getting local stores from KafkaStreams (apache#8568)
  KAFKA-9823: Follow-up, check state for handling commit error response (apache#8548)
  KAFKA-6145: KIP-441: Add TaskAssignor class config (apache#8541)
  MINOR: Fix partition numbering from 0 to P-1 instead of P in docs (apache#8572)
  KAFKA-9921: disable caching on stores configured to retain duplicates (apache#8564)
  Minor: remove redundant check in auto preferred leader election (apache#8566)
  MINOR: Update the link to the Raft paper in docs (apache#8560)
  MINOR: Fix typos in config properties in MM2 test (apache#8561)
  MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982)
  MINOR: document how to escape json parameters to ducktape tests (apache#8546)
  ...
ijuma added a commit to confluentinc/kafka that referenced this pull request Apr 30, 2020
There was a minor conflict in gradle.properties because the default
Scala version changed upstream to Scala 2.13. I kept the upstream
change.

Related to this, I have updated Jenkinsfile to compile and validate
with Scala 2.12 in a separate stage so that we ensure we maintain
compatibility. Unlike Apache Kafka, we only run the tests with the
default Scala version, which is now 2.13.

* apache-github/trunk: (45 commits)
MINOR: Fix broken JMX link in docs by adding missing starting double
quote (apache#8587)
KAFKA-9652: Fix throttle metric in RequestChannel and request log due
to KIP-219 (apache#8567)
  KAFKA-9922: Update demo instructions in examples README (apache#8559)
KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses
(apache#8442)
  KAFKA-9875: Make integration tests more resilient (apache#8578)
KAFKA-9932: Don't load configs from ZK when the log has already been
loaded (apache#8582)
  KAFKA-9925: decorate pseudo-topics with app id (apache#8574)
  KAFKA-9832: fix attempt to commit non-running tasks (apache#8580)
KAFKA-9127: don't create StreamThreads for global-only topology
(apache#8540)
  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
  KAFKA-9176: Retry on getting local stores from KafkaStreams (apache#8568)
KAFKA-9823: Follow-up, check state for handling commit error response
(apache#8548)
  KAFKA-6145: KIP-441: Add TaskAssignor class config (apache#8541)
MINOR: Fix partition numbering from 0 to P-1 instead of P in docs
(apache#8572)
KAFKA-9921: disable caching on stores configured to retain duplicates
(apache#8564)
Minor: remove redundant check in auto preferred leader election
(apache#8566)
  MINOR: Update the link to the Raft paper in docs (apache#8560)
  MINOR: Fix typos in config properties in MM2 test (apache#8561)
MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters.
(apache#7982)
MINOR: document how to escape json parameters to ducktape tests
(apache#8546)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants