Conversation
mjsax
left a comment
There was a problem hiding this comment.
Call for review @guozhangwang @bbejeck @vvcephei
| try { | ||
| if (getTime.shouldRecord()) { | ||
| return measureLatency(() -> outerValue(inner.get(Bytes.wrap(serdes.rawKey(key)))), getTime); | ||
| return measureLatency(() -> outerValue(inner.get(keyBytes(key))), getTime); |
There was a problem hiding this comment.
Extracted into it's own method, to align with code of other stores.
bbejeck
left a comment
There was a problem hiding this comment.
Just one clarifying question, LGTM otherwise.
| this.map = new TreeMap<>(); | ||
| } | ||
|
|
||
| public KeyValueStore<K, V> enableLogging() { |
There was a problem hiding this comment.
Does this mean we won't have logging for in-memory key-value stores?
There was a problem hiding this comment.
No. This is old code before the state store refactoring via KIP-182. Both, persistent and in-memory key-value store are wrapped with ChangeLoggingKeyValueBytesStore since then, and this method is not called anywhere anymore, and thus the class is not used any longer. Compare Stores and KeyValueStoreBuilder.
There was a problem hiding this comment.
Thanks for the clarification. I noticed after I checked out the PR that the enableLogging call was not used at all. I guess I missed that during KIP-182.
vvcephei
left a comment
There was a problem hiding this comment.
LGTM for the most part! Thanks for cleaning all this up.
| joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", | ||
| junit: "junit:junit:$versions.junit", | ||
| hamcrest: "org.hamcrest:hamcrest-all:1.3", | ||
| hamcrest: "org.hamcrest:java-hamcrest:$versions.hamcrest", |
There was a problem hiding this comment.
According to http://hamcrest.org/JavaHamcrest/distributables,
it seems like this should be org:hamcrest:hamcrest:2.1.
But I haven't followed the history of the project. Can you elaborate on the choice of hamcrest-java and 2.0.0.0?
There was a problem hiding this comment.
Ah. Good catch. I was checking for hamcrast-all versions and there was no newer one available and was not aware the there is hamcrast, too.
| final Set<Pattern> sourcePatterns = new HashSet<>(); | ||
| final Set<SourceNodeFactory> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors); | ||
| final Set<SourceNodeFactory> sourceNodesForPredecessor | ||
| = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors); |
There was a problem hiding this comment.
Nit: seems a little weird to put the assignment operator at the start of this line instead of the end of the previous one.
There was a problem hiding this comment.
I agree -- typically only +/- between terms of an expression start a new line
There was a problem hiding this comment.
I like it better this way, but I don't care too much either. Will update accordingly.
| if (!stateChangelogTopics.containsKey(topicName)) { | ||
| final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, topicName); | ||
| final InternalTopicConfig internalTopicConfig | ||
| = createChangelogTopicConfig(stateFactory, topicName); |
| private void adjust(final StreamsConfig config) { | ||
| final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE); | ||
| final boolean enableOptimization20 | ||
| = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE); |
| for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { | ||
| final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); | ||
| final SourceNodeFactory sourceNode | ||
| = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); |
| final TopologyDescription.Subtopology[] sortedSubtopologies | ||
| = subtopologies.descendingSet().toArray(new Subtopology[0]); | ||
| final TopologyDescription.GlobalStore[] sortedGlobalStores | ||
| = globalStores.descendingSet().toArray(new GlobalStore[0]); |
|
|
||
| /** | ||
| * Put a key-value pair with the given timestamp into the corresponding window | ||
| * Put a key-value pair into the window with given window start timestamp |
| this.map = new TreeMap<>(); | ||
| } | ||
|
|
||
| public KeyValueStore<K, V> enableLogging() { |
| final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); | ||
| if (!globalStore.isEmpty()) { | ||
| return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); | ||
| return queryableStoreType.create(new WrappingStoreProvider(Collections.singletonList(globalStoreProvider)), storeName); |
There was a problem hiding this comment.
this line is still a bit long... You could try a static import for singletonList.
|
lgtm. @mjsax please feel free to merge after addressing the above comments. |
|
Thanks for the review. Updates this PR. Will merge after build finished. |
|
👍 Thanks @mjsax ! |
|
Retest this please. |
|
Unrelated tests in core and connect failed. Retest this please. |
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Guozhang Wang <guozhang@confluent.io>
While working on #6044, I did code cleanup on the side. Extracted parts of those cleanups into this PR for easier review. This PR should not change any behavior, but do:
Only change is a dependency change, that I need for #6044, and will make reviewing PRs for KIP-258 simpler.