Skip to content

MINOR: remove unnecessary store info from TopologyBuilder#2338

Closed
guozhangwang wants to merge 8 commits intoapache:trunkfrom
guozhangwang:KMinor-refactor-state-to-changelogtopic
Closed

MINOR: remove unnecessary store info from TopologyBuilder#2338
guozhangwang wants to merge 8 commits intoapache:trunkfrom
guozhangwang:KMinor-refactor-state-to-changelogtopic

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

This PR is extracted from #2333 as an incremental fix to ease the reviewing:

  1. Removed storeToProcessorNodeMap from ProcessorTopology since it was previously used to set the context current record, and can now be replaced with the dirty entry in the named cache.

  2. Replaced sourceStoreToSourceTopic from ProcessorTopology with storeToChangelogTopic map, which includes the corresponding changelog topic name for all stores that are changelog enabled.

  3. Modified ProcessorStateManager to rely on sourceStoreToSourceTopic when retrieving the changelog topic; this makes the second parameter loggingEnabled in register not needed any more, and we can deprecate the old API with a new one.

  4. Also fixed a minor issue in KStreamBuilder: if the storeName is not provided in the table(..) function, do not create the underlying materialized store. Modified the unit tests to cover this case.

  5. Fixed a bunch of other unit tests failures that are exposed by this refactoring, in which we are not setting the applicationId correctly when constructing the mocking processor topology.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

ping @dguy @mjsax @enothereska for reviews.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/655/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/656/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/654/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

} finally {
currNode = current;

for (StateStore stateStore : context.allStateStores().values()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not need currNode to map to the store we are flushing any more? I don't see any change in StateStore#flush() -- or did we not need this in the first place and this is just a cleanup of unnecessary code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover work that was not introduced in this PR: before we use the topology.storeToProcessorNodeMap() to remember the linked processor node of the state store, so we can set it while flushing and possibly forwarding to children processor nodes. When we add KIP-63 we store that information in DirtyEntry so we do not need this any more, but forgot to remove it.

This cleanup is mainly for removing those not-needed-anymore maps from topology builder

final KStreamBuilder builder = new KStreamBuilder();
builder.stream("topic").groupByKey().count("my-store");
final ProcessorTopology topology = builder.build(0);
final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was a missing application ID not a problem before the change? I don't see the connection.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained as above. We are now solely depending on the topology builder to give us the information about which state stores should be changelog enabled and what are their changelog topic names. Before that we have a separate loggingEnabled parameter in register to indicate that.

put(storeName1, storeChangelogTopicName1);
put(storeName2, storeChangelogTopicName2);
}
});
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this change? (same blow). Why do we to provide this now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now passing the map from store names to its changlog topics as the last parameter into ProcessorStateManager, since we can actually gather that information beforehand in the topology builder already (any stores that are not changelog enabled won't have an entry in the map). So in ProcessorStateManager it only needs to access this map to decide whether we need to get a changelog for the state store and what would be the changelog name.

In these tests since we are adding two stores persistentXX and nonPersistentXX which are all changelogged, we need to pass in this map.

Copy link
Copy Markdown
Member

@mjsax mjsax Jan 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But in the original version of this test we pass in two empty maps (sourceStoreToSourceTopics and storeToProcessorNodeMap) -- so I am wondering why we do need to provide this information now but not before (or where did this information come from before). Still confused.

Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); // standby
ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test worked with an empty map before -- why do we need to provide the mapping now? (same below)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto above.

final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);

// only materialize the KTable into a state store if the storeName is not null
if (storeName != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a new test for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, this is not committed.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/756/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/758/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/756/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/768/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/766/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/766/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/776/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/778/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/776/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/779/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/781/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/779/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/851/
Test FAILed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang One trivial typo, but LGTM

private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();

// map from state store names that are directly associated with source processors to their subscribed topics,
// map from state store names to ths state store's corresponding changelog topic if possible,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo "ths"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Merged to trunk.

@asfgit asfgit closed this in e90db3e Jan 13, 2017
@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/853/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/853/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/855/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/853/
Test FAILed (JDK 7 and Scala 2.10).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
This PR is extracted from apache#2333 as an incremental fix to ease the reviewing:

1. Removed `storeToProcessorNodeMap` from ProcessorTopology since it was previously used to set the context current record, and can now be replaced with the dirty entry in the named cache.

2. Replaced `sourceStoreToSourceTopic` from ProcessorTopology with `storeToChangelogTopic` map, which includes the corresponding changelog topic name for all stores that are changelog enabled.

3. Modified `ProcessorStateManager` to rely on `sourceStoreToSourceTopic` when retrieving the changelog topic; this makes the second parameter `loggingEnabled` in `register` not needed any more, and we can deprecate the old API with a new one.

4. Also fixed a minor issue in `KStreamBuilder`: if the storeName is not provided in the `table(..)` function, do not create the underlying materialized store. Modified the unit tests to cover this case.

5. Fixed a bunch of other unit tests failures that are exposed by this refactoring, in which we are not setting the applicationId correctly when constructing the mocking processor topology.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Ewen Cheslack-Postava

Closes apache#2338 from guozhangwang/KMinor-refactor-state-to-changelogtopic
@guozhangwang guozhangwang deleted the KMinor-refactor-state-to-changelogtopic branch July 15, 2017 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants