Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,9 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq - what does this mean for users that have enabled auto topic creation? Although it's not a best practice, this seems it could lead to unexpected behavior.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean on the server side or on the client? I agree we should consider checking for this in the user supplied configs before overriding it, but I think the reasonable default behavior is to disable it (by client config, since it's enabled server-side by default).
If users really do want it then it's on them to enable it through the main consumer config. On the other hand, prior to this it was effectively disabled permanently by the workaround of using pattern subscription. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that SGTM I forgot that auto topic creation is the default server-side.


// add admin retries configs for creating topics
final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public StreamSourceNode(final String nodeName,
this.consumedInternal = consumedInternal;
}

public Collection<String> getTopicNames() {
public Collection<String> topicNames() {
return new ArrayList<>(topicNames);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static <K, V> TableSourceNodeBuilder<K, V> tableSourceNodeBuilder() {
@Override
@SuppressWarnings("unchecked")
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final String topicName = getTopicNames().iterator().next();
final String topicName = topicNames().iterator().next();

// TODO: we assume source KTables can only be timestamped-key-value stores for now.
// should be expanded for other types of stores as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ public class InternalTopologyBuilder {
// node factories in a topological order
private final Map<String, NodeFactory> nodeFactories = new LinkedHashMap<>();

// state factories
private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();

// built global state stores
private final Map<String, StoreBuilder> globalStateBuilders = new LinkedHashMap<>();

// built global state stores
Expand All @@ -87,10 +85,6 @@ public class InternalTopologyBuilder {
// map from sink processor names to sink topic (without application-id prefix for internal topics)
private final Map<String, String> nodeToSinkTopic = new HashMap<>();

// map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
// even if it can be matched by multiple regex patterns
private final Map<String, Pattern> topicToPatterns = new HashMap<>();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved since it's only used by a single class


// map from state store names to all the topics subscribed from source processors that
// are connected to these state stores
private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
Expand Down Expand Up @@ -118,12 +112,15 @@ public class InternalTopologyBuilder {

private final QuickUnion<String> nodeGrouper = new QuickUnion<>();

private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
// Used to capture subscribed topics via Patterns discovered during the partition assignment process.
private final Set<String> subscriptionUpdates = new HashSet<>();

private String applicationId = null;

private Pattern topicPattern = null;

private List<String> topicCollection = null;

private Map<Integer, Set<String>> nodeGroups = null;

public static class StateStoreFactory {
Expand Down Expand Up @@ -218,6 +215,10 @@ Processor describe() {
}
}

// Map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
// even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
private final Map<String, Pattern> topicToPatterns = new HashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in a previous commit this was static and I had concerns that it wouldn't work, so I'm glad to see you changed this.


private class SourceNodeFactory extends NodeFactory {
private final List<String> topics;
private final Pattern pattern;
Expand Down Expand Up @@ -915,7 +916,7 @@ private void buildSourceNode(final Map<String, SourceNode> topicSourceMap,
final SourceNode node) {

final List<String> topics = (sourceNodeFactory.pattern != null) ?
sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
sourceNodeFactory.getTopics(subscriptionUpdates()) :
sourceNodeFactory.topics;

for (final String topic : topics) {
Expand Down Expand Up @@ -1057,24 +1058,23 @@ public synchronized Map<Integer, TopicsInfo> topicGroups() {
}

private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
final SourceNodeFactory sourceNode =
(SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from given regex
nodeToSourceTopics.put(
stringPatternEntry.getKey(),
sourceNode.getTopics(subscriptionUpdates.getUpdates()));
log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
if (hasSubscriptionUpdates()) {
for (final String nodeName : nodeToSourcePatterns.keySet()) {
final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(nodeName);
final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
//need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
nodeToSourceTopics.put(nodeName, sourceTopics);
sourceTopicNames.addAll(sourceTopics);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also update sourceTopicNames to keep in sync with nodeToSourceTopics

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this at this point? I guess so at it makes sense to have sourceTopicNames match what's in nodeToSourceTopics. I'm only asking as we never had this before and I'm curious as to why.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I did switch #sourceTopicPattern and the new #sourceTopicCollection to use sourceTopicNames instead of nodeToSourceTopics.values, in which case we do still need to do this.
It shouldn't matter, although I will say the hardest part of working with this code/class was figuring out which data structures did/did not contain what contents or updates. I'm inclined to leave this in so that the two similar data structures are kept in sync, but I'm fine with removing it just to keep the changes minimal/necessary

}
log.debug("Updated nodeToSourceTopics: {}", nodeToSourceTopics);
}
}

private void setRegexMatchedTopicToStateStore() {
if (subscriptionUpdates.hasUpdates()) {
if (hasSubscriptionUpdates()) {
for (final Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
final Set<String> updatedTopicsForStateStore = new HashSet<>();
for (final String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
for (final String subscriptionUpdateTopic : subscriptionUpdates()) {
for (final Pattern pattern : storePattern.getValue()) {
if (pattern.matcher(subscriptionUpdateTopic).matches()) {
updatedTopicsForStateStore.add(subscriptionUpdateTopic);
Expand Down Expand Up @@ -1117,11 +1117,11 @@ private Pattern resetTopicsPattern(final Set<String> resetTopics,
final Set<Pattern> resetPatterns) {
final List<String> topics = maybeDecorateInternalSourceTopics(resetTopics);

return buildPatternForOffsetResetTopics(topics, resetPatterns);
return buildPattern(topics, resetPatterns);
}

private static Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics,
final Collection<Pattern> sourcePatterns) {
private static Pattern buildPattern(final Collection<String> sourceTopics,
final Collection<Pattern> sourcePatterns) {
final StringBuilder builder = new StringBuilder();

for (final String topic : sourceTopics) {
Expand Down Expand Up @@ -1209,37 +1209,33 @@ private String decorateTopic(final String topic) {
return applicationId + "-" + topic;
}

SubscriptionUpdates subscriptionUpdates() {
return subscriptionUpdates;
boolean usesPatternSubscription() {
return !nodeToSourcePatterns.isEmpty();
}

synchronized Collection<String> sourceTopicCollection() {
log.debug("No source topics using pattern subscription found, using regular subscription for the main consumer.");

if (topicCollection == null) {
topicCollection = maybeDecorateInternalSourceTopics(sourceTopicNames);
Collections.sort(topicCollection);
}

return topicCollection;
}

synchronized Pattern sourceTopicPattern() {
log.debug("Found pattern subscribed source topics, falling back to pattern subscription for the main consumer.");

if (topicPattern == null) {
final List<String> allSourceTopics = new ArrayList<>();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified by just using sourceTopicNames, which is identical to nodeToSourceTopics.values() but without the global topics, which we remove

if (!nodeToSourceTopics.isEmpty()) {
for (final List<String> topics : nodeToSourceTopics.values()) {
allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
}
allSourceTopics.removeAll(globalTopics);
}
final List<String> allSourceTopics = maybeDecorateInternalSourceTopics(sourceTopicNames);
Collections.sort(allSourceTopics);

topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values());
topicPattern = buildPattern(allSourceTopics, nodeToSourcePatterns.values());
}

return topicPattern;
}

// package-private for testing only
synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
final String logPrefix) {
log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)",
logPrefix, subscriptionUpdates);
this.subscriptionUpdates = subscriptionUpdates;
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
}

private boolean isGlobalSource(final String nodeName) {
final NodeFactory nodeFactory = nodeFactories.get(nodeName);

Expand Down Expand Up @@ -1865,42 +1861,25 @@ private static String nodeNames(final Set<TopologyDescription.Node> nodes) {
return sb.toString();
}

/**
* Used to capture subscribed topic via Patterns discovered during the
* partition assignment process.
*/
public static class SubscriptionUpdates {

private final Set<String> updatedTopicSubscriptions = new HashSet<>();

private void updateTopics(final Collection<String> topicNames) {
updatedTopicSubscriptions.clear();
updatedTopicSubscriptions.addAll(topicNames);
}

public Collection<String> getUpdates() {
return Collections.unmodifiableSet(updatedTopicSubscriptions);
}

boolean hasUpdates() {
return !updatedTopicSubscriptions.isEmpty();
}

@Override
public String toString() {
return String.format("SubscriptionUpdates{updatedTopicSubscriptions=%s}", updatedTopicSubscriptions);
}
Set<String> subscriptionUpdates() {
return Collections.unmodifiableSet(subscriptionUpdates);
}

void updateSubscribedTopics(final Set<String> topics,
final String logPrefix) {
final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
log.debug("{}found {} topics possibly matching regex", logPrefix, topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
updateSubscriptions(subscriptionUpdates, logPrefix);
boolean hasSubscriptionUpdates() {
return !subscriptionUpdates.isEmpty();
}

synchronized void updateSubscribedTopics(final Set<String> topics,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SubscriptionUpdates class and various updateXXX methods were unnecessarily complex given they all boiled down to just the 4 lines of actual actions below

final String logPrefix) {
log.debug("{}found {} topics possibly matching subscription", logPrefix, topics);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing space between logPrefix and 'found'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topics is a Set.
What's your intention for the second parameter ?
If you want the number of topics logged, you should use topics.size().

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spacing kept bothering me too but it's actually correct (logPrefix has a space at the end) -- we should actually clean up this class at some point to use log = new LogContext(logPrefix).logger(getClass()); rather than explicitly insert the prefix everywhere.
I didn't write this log message, just moved things around, but I think the intention was to list the actual topics not just the size. I probably could've improved the wording of it though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since topics Set can be quite large, I doubt the intention was to show the contents.
'{} topics' reads like the count of entries should be shown.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the topics set should only include the matching topics that the Streams app is actually subscribed to, but I agree the wording suggests it should be the size/count. Since this PR is already merged do you want to submit a quick follow-up PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #8005

subscriptionUpdates.clear();
subscriptionUpdates.addAll(topics);

log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as above two.

logPrefix, subscriptionUpdates);
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
}

// following functions are for test only

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ public void run() {
* @throws StreamsException if the store's change log does not contain the partition
*/
private void runLoop() {
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
subscribeConsumer();

while (isRunning()) {
try {
Expand All @@ -713,7 +713,15 @@ private void runLoop() {

private void enforceRebalance() {
consumer.unsubscribe();
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
subscribeConsumer();
}

private void subscribeConsumer() {
if (builder.usesPatternSubscription()) {
Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman Jan 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix; if the user has not themselves added pattern source topics we will go back to using regular subscription (having safely disabled auto topic creation)

consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
} else {
consumer.subscribe(builder.sourceTopicCollection(), rebalanceListener);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) {
}

protected static Set<TaskId> prepareForSubscription(final TaskManager taskManager,
final Set<String> topics,
final Set<TaskId> standbyTasks,
final RebalanceProtocol rebalanceProtocol) {
final Set<String> topics,
final Set<TaskId> standbyTasks,
final RebalanceProtocol rebalanceProtocol) {
// Any tasks that are not yet running are counted as standby tasks for assignment purposes,
// along with any old tasks for which we still found state on disk
final Set<TaskId> activeTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public void updateSubscriptionsFromAssignment(final List<TopicPartition> partiti
assignedTopics.add(topicPartition.topic());
}

final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
final Collection<String> existingTopics = builder().subscriptionUpdates();
if (!existingTopics.containsAll(assignedTopics)) {
assignedTopics.addAll(existingTopics);
builder().updateSubscribedTopics(assignedTopics, logPrefix);
Expand All @@ -527,7 +527,7 @@ public void updateSubscriptionsFromAssignment(final List<TopicPartition> partiti

public void updateSubscriptionsFromMetadata(final Set<String> topics) {
if (builder().sourceTopicPattern() != null) {
final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
final Collection<String> existingTopics = builder().subscriptionUpdates();
if (!existingTopics.equals(topics)) {
builder().updateSubscribedTopics(topics, logPrefix);
}
Expand Down
Loading