Skip to content

MINOR: prefix topics if internal config is set#11611

Merged
guozhangwang merged 3 commits intoapache:trunkfrom
wcarlson5:prefix_topics
Jan 11, 2022
Merged

MINOR: prefix topics if internal config is set#11611
guozhangwang merged 3 commits intoapache:trunkfrom
wcarlson5:prefix_topics

Conversation

@wcarlson5
Copy link
Copy Markdown
Contributor

In order to move a topology to another runtime without having to copy over the internal topics it would be good to have the option to not prefix the internal topics with the application ID. So this change will introduce a new config that if set will be the internal topic prefix

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@wcarlson5
Copy link
Copy Markdown
Contributor Author

@ableegoldman This should make it so we can have topics prefixed with whatever is needed

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wcarlson5 just for my understanding: what prefix we would use in practice, in order to allow the topology to be moved between runtimes?

public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"
+ ".vector.enabled__";

// Private API used to control the usage of consistency offset vectors
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems irrelevant? It's not for the offset vectors right?

}

private void initInternal(final InternalProcessorContext<?, ?> context) {
final String prefix = StreamsConfig.InternalConfig.getString(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use ProcessorContextUtils#changelogFor here as well? Ditto below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could either be a StateStoreContext of a ProcessorContext and there are different methods for each. I am not sure why as they seem to do similar things. but I didn't want to make reaching changes here. I can just file a ticket to consolidate those methods into one maybe?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we would remove the deprecated init(final ProcessorContext context, final StateStore root) and then we only need ProcessorContextUtils#changelogFor(StateStoreContext..).

Sounds good for filing a JIRA for consolidating to the changelogFor, we can do that when we remove the deprecated init function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang
Copy link
Copy Markdown
Contributor

Though this comment is not for AK, I'd have to raise to our attention: in ksql we have hard-coded rules to detect internal topics to be cleaned when terminating a query, which is like this today:

  private static boolean isInternalTopic(final String topicName, final String applicationId) {
    final boolean prefixMatches = topicName.startsWith(applicationId + "-");
    final boolean suffixMatches = topicName.endsWith(KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX)
        || topicName.endsWith(KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX)
        || topicName.matches(KsqlConstants.STREAMS_JOIN_REGISTRATION_TOPIC_PATTERN)
        || topicName.matches(KsqlConstants.STREAMS_JOIN_RESPONSE_TOPIC_PATTERN);
    return prefixMatches && suffixMatches;
  }

With this change we have to remember updating this function otherwise the cleanup logic would break.

@wcarlson5
Copy link
Copy Markdown
Contributor Author

@guozhangwang yes that is a good point. When we go to use this we will have to set the config and we can updated the clean up logic then

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM! Re-triggering the jenkins tests.

@guozhangwang
Copy link
Copy Markdown
Contributor

@wcarlson5 could you double check if the failed tests are relevant?

@wcarlson5
Copy link
Copy Markdown
Contributor Author

@guozhangwang Yeah I needed update a few tests. It should be good now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants