Skip to content

KAFKA-3522: Add internal RecordConverter interface#6150

Merged
mjsax merged 3 commits intoapache:trunkfrom
mjsax:kafka-3522-rocksdb-recordConverter
Jan 20, 2019
Merged

KAFKA-3522: Add internal RecordConverter interface#6150
mjsax merged 3 commits intoapache:trunkfrom
mjsax:kafka-3522-rocksdb-recordConverter

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jan 15, 2019

Part of KIP-258.

Adding the proposed RecordConverter interface.

The RecordConverter is responsible to convert ConsumerRecords from a changelog topic into <byte[], byte[]> key-value-pairs that are put into the stores. This is required to move the timestamp from the ConsumerRecord into the value-part in the store. The default implementation only maps key-value to key-value, ie, it's a no-op and this PR does not change any behavior.

@mjsax mjsax added the streams label Jan 15, 2019
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 15, 2019

Call for review @guozhangwang @bbejeck @vvcephei

final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stores can implement the RecordConverter interface -- if they don't do, we use default no-op converter that maps from key-value to key-value.

final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stores can implement the RecordConverter interface -- if they don't do, we use default no-op converter that maps from key-value to key-value.

/**
* {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pair.
*/
// TODO: move this class to public API
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the KIP is accepted, this interface will become part of public API.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax, just one minor comment

log.trace("Restoring state store {} from changelog topic {}", storeName, topic);

final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the tests for setting the stateStore and recordConverter

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to add a test for GlobalStateManagerImpl -- also discovered a gap while trying to add a test for ProcessorStateManagerImpl: updating standby tasks was not covered by the converter -- added a test for this case.

However, I am not sure how I can test the converter for ProcessorStateManagerImpl restore case, because the ProcessorStateManagerImpl does not use the converter, but by the TaskManager -- however, it would be weird to test test in the TaskManagerTest because it has nothing to do with the task manager.

Thoughts?

I'll add some upgrade integration tests later, that will cover this case. Not sure if this would be sufficient?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jan 16, 2019

test failures kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable and kafka.api.UserQuotaTest.testQuotaOverrideDelete unrelated.

retest this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two meta comments:

  1. could we run the simple benchmark and see if there's any significant perf impact?
  2. Could we add more test case to ProcessorStateManager / GlobalStateManagerTest to verify that the default / customized converter is used?

public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
stateManager.initialize();
initializeConsumer(2, 1, t1);
initializeConsumer(2, 0, t1);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is "fix" on the side -- initializeConsumer did some weird stuff.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 18, 2019

Updated this.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 18, 2019

java.lang.AssertionError: expected:<6> but was:<5>
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:645)
	at org.junit.Assert.assertEquals(Assert.java:631)
	at org.apache.kafka.streams.KafkaStreamsTest.testStateOneThreadDeadButRebalanceFinish(KafkaStreamsTest.java:211)

This test passed locally. Might be a race condition. We should monitor this. \cc @guozhangwang @bbejeck @vvcephei @ableegoldman

Retest this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. If you've verified no observable perf difference please feel free to merge

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 18, 2019

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 18, 2019

Test failures in core. Retest this please.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 19, 2019

Move RecordConverter to public API -- will merge this after Jenkins is green. Vote passed and is closed.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 19, 2019

Retest this please

@mjsax mjsax merged commit ed7b67d into apache:trunk Jan 20, 2019
@mjsax mjsax deleted the kafka-3522-rocksdb-recordConverter branch January 20, 2019 06:54
abbccdda pushed a commit to abbccdda/kafka that referenced this pull request Jan 24, 2019
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk:
  MINOR: fix race condition in KafkaStreamsTest (apache#6185)
  KAFKA-4850: Enable bloomfilters (apache#6012)
  MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test
  KAFKA-5117: Stop resolving externalized configs in Connect REST API
  MINOR: Cleanup handling of mixed transactional/idempotent records (apache#6172)
  KAFKA-7844: Use regular subproject for generator to fix *All targets (apache#6182)
  Fix Documentation for cleanup.policy is out of date (apache#6181)
  MINOR: increase timeouts for KafkaStreamsTest (apache#6178)
  MINOR: Rejoin split ssl principal mapping rules (apache#6099)
  MINOR: Handle case where connector status endpoints returns 404 (apache#6176)
  MINOR: Remove unused imports, exceptions, and values (apache#6117)
  KAFKA-3522: Add internal RecordConverter interface (apache#6150)
  Fix Javadoc of KafkaConsumer (apache#6155)
  KAFKA-6455: Extend CacheFlushListener to forward timestamp (apache#6147)
  MINOR: Log partition info when creating new request batch in controller (apache#6145)
  KAFKA-7652: Part I; Fix SessionStore's findSession(single-key) (apache#6134)
  MINOR: Remove the InvalidTopicException handling in InternalTopicManager (apache#6167)
  [KAFKA-7024] Rocksdb state directory should be created before opening the DB (apache#6138)
  MINOR:: Fix typos (apache#6079)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants