Skip to content

KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides#11272

Merged
ableegoldman merged 18 commits intoapache:trunkfrom
ableegoldman:12648-Configs-introduce-TaskConfig-for-task-level-overrides
Nov 10, 2021
Merged

KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides#11272
ableegoldman merged 18 commits intoapache:trunkfrom
ableegoldman:12648-Configs-introduce-TaskConfig-for-task-level-overrides

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

Most configs that are read and used by Streams today originate from the properties passed in to the KafkaStreams constructor, which means they get applied universally across all threads, tasks, subtopologies, and so on. The only current exception to this is the topology.optimization config which is parsed from the properties that get passed in to StreamsBuilder#build. However there are a handful of configs that could also be scoped to the topology level, allowing users to configure each NamedTopology independently of the others, where it makes sense to do so.

This PR refactors the handling of these configs by interpreting the values passed in via KafkaStreams constructor as the global defaults, which can then be overridden for individual topologies via the properties passed in when building the NamedTopology. More topology-level configs may be added in the future, but this PR covers the following:

  1. max.task.idle.ms
  2. task.timeout.ms
  3. buffered.records.per.partition
  4. default.timestamp.extractor.class
  5. default.deserialization.exception.handler

@ableegoldman
Copy link
Copy Markdown
Member Author

@wcarlson5 split the config work out from the Pt. 4 branch, here's the PR as promised. I ended up refactoring things a bit while I was pulling the changes out to make things easier to build on top of as we go. Let me know what you think, especially any thoughts on something we can do now to better set up the next piece: cache.max.bytes.buffering,

We should also keep an eye on KIP-770 and start thinking about the input.buffer.max.bytes config it's proposing. Might be good for you to be one of the primary reviewers, if you have time, so you can guide things to play along nicely with our work here.

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this make a lot of sense, I just had a couple questions/comments

stateDirectory,
stateMgr,
inputPartitions,
config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have the same pattern for task level configs as we do for streams configs?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like a separate TaskConfig public class for users to configure the same way we do for StreamsConfig? I've been going back and forth but ultimately I think we should keep all configs under StreamsConfig so that all configs for each part of the stack are handled in one place (eg it would be confusing if within the Consumer client, there were multiple sets of configs)...still I think we can clean this up eventually, when we do the KIP.

topologyConfigs = new TopologyConfig(
topologyName,
applicationConfig,
topologyProperties == null ?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use Optional in streams? this would be a good place for it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically Optional is not supposed to be used for fields, only return values...so they say. I'm definitely a little concerned about this pattern of null fields but it's already used throughout InternalTopologyBuilder and often unavoidable due to the particular order of things in Streams. So yet another thing to clean up at a later time (think guozhang filed a ticket for this already)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman , thanks for the improvement. Left some comments. Thanks.

* topology via the {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or
* {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods.
*/
public class TopologyConfig extends AbstractConfig {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention that currently we have 5 topology level properties: BUFFERED_RECORDS_PER_PARTITION_CONFIG..., and this list should be updated if new topology level config is introduced in the future?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, but honestly for now it's going to be on us/me to keep track of any new configs that could be topology overrides. It's not a problem if someone introduces a config that could be scoped to a single topology and isn't, it's just a feature we can expand later. We need to work out a clean API before taking this to the KIP stage anyways

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the non-testing code.

One meta comment I had is about enforcing the ordering of set global configs / topology overrides with named-topology, others are all minor.

*/
public synchronized Topology build(final Properties props) {
internalStreamsBuilder.buildAndOptimizeTopology(props);
internalTopologyBuilder.setTopologyProperties(props);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a TODO here that for now we always set the overrides as the same as global application props?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, these are the props that the user passes in when building the topology, eg needed for TOPOLOGY_OPTIMIZATION which is the only actual topology-level override today. So the overrides are only set the same as the global props if the user decides to pass the same set of configs in.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public NamedTopologyStreamsBuilder(final String topologyName) {
super();
this.topologyName = topologyName;
super(new NamedTopology(topologyName));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup!

}

return this;
public synchronized final void setStreamsConfig(final StreamsConfig config) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: I feel we can try to re-order the topology steps in a better way now. Today we have the following in order which looks awkward to me.

  1. InternalStreamsBuilder#internalTopologyBuilder(props): this is to do the logical plan -> physical plan build. The props should be a per-topology overrides in the long run.
  2. InternalStreamsBuilder#rewriteTopology(StreamsConfig config): this is to do some post-plan generation modifications which, ideally, should be part of 1) as well. Here the config is at global application-level.
  3. Inside 2) we call setStreamsConfig(config); which is to set the global application configs.

I think a better order should be first setting the global configs, and then the per-topology overrides. So we should consider setting the global configs at the beginning when constructing the Topology (either from StreamsBuilder with DSL, or directly with PAPI), then the per-topology props for overrides.

Of course the reason we have this awkwardness is because for PAPI, the Topology is built without "props" passed in at all and hence we can only rely on the last step 3) to make sure the configs are finally initialized. But now with the new NamedTopology we can bypass this by requiring it to always come with a prop at the construction time as well, which would be used to set both (the func names are just for illustrations):

  • internalTopologyBuilder.setTopologyOverrides(props);
  • internalTopologyBuilder.setApplicationConfigs(config);

We would end up with the following scenarios:

  1. No named-topology, DSL: props are set at StreamsBuilder.build(), and we know there's no overrides.
  2. No named-topology, PAPI: the only pain left, as we have to wait at InternalTopologyBuilder.rewriteTopology, but still we know there's no overrides.
  3. Named-topology, DSL: props are set at constructor, for both application-level and overrides.
  4. Named-topology, PAPI: currently no one should be using in this way, but just in case it is supported it is still the same as 3) above.

And if in the long run we would stick with the NamedTopology where we would deprecate 1/2 above, we would end up naturally at the state where we enforce props to be set at the topology construction time.

WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That all makes sense to me -- initially I was trying to make this just work with the existing public API, but ultimately figured that even the concept of "topology-level overrides" would require a KIP. If we're only dealing with named topologies then we're free to do things the right way from the start -- thanks for the suggestions :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Played around with this a bit and was able to clean things up nicely, plus address some other awkwardness that was bugging me. Worked out great (just need to do some cleanup of the tests now...)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you.

@guozhangwang
Copy link
Copy Markdown
Contributor

Checkstyle rule violations were found. :)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated PR. It's config overridden procedure is much cleaner now. Just one question below regarding where is setTopologyOverrides used after the refactoring, others are all minor.

*
* @throws IllegalArgumentException if the name contains the character sequence "__"
*/
public NamedTopology newNamedTopology(final String topologyName, final Properties topologyConfigs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems not used? And even in external callers like ksql, I think we would go through the newNamedTopologyBuilder().build() right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well eventually we'll want to support building a NamedTopology with the PAPI rather than the DSL, but I can remove this for the time being to keep the API minimal until we've figured out exactly what it will look like & gone through the KIP

}

@Test
public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated tests! They are great.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@ableegoldman ableegoldman force-pushed the 12648-Configs-introduce-TaskConfig-for-task-level-overrides branch from 1093ad4 to c71ee29 Compare October 22, 2021 23:46
@ableegoldman ableegoldman force-pushed the 12648-Configs-introduce-TaskConfig-for-task-level-overrides branch from c71ee29 to 2bcff16 Compare October 29, 2021 01:29
Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ableegoldman ableegoldman merged commit 908a6d2 into apache:trunk Nov 10, 2021
stanislavkozlovski added a commit to stanislavkozlovski/kafka that referenced this pull request Nov 11, 2021
…ntegration-11-nov

* ak/trunk: (15 commits)
  KAFKA-13429: ignore bin on new modules (apache#11415)
  KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides (apache#11272)
  KAFKA-12487: Add support for cooperative consumer protocol with sink connectors (apache#10563)
  MINOR: Log client disconnect events at INFO level (apache#11449)
  MINOR: Remove topic null check from `TopicIdPartition` and adjust constructor order (apache#11403)
  KAFKA-13417; Ensure dynamic reconfigurations set old config properly (apache#11448)
  MINOR: Adding a constant to denote UNKNOWN leader in LeaderAndEpoch (apache#11477)
  KAFKA-10543: Convert KTable joins to new PAPI (apache#11412)
  KAFKA-12226: Commit source task offsets without blocking on batch delivery (apache#11323)
  KAFKA-13396: Allow create topic without partition/replicaFactor (apache#11429)
  ...
ableegoldman added a commit to confluentinc/ksql that referenced this pull request Nov 18, 2021
…8331)

This PR prepares the necessary changes for us to finally merge apache/kafka#11272 without breaking the build. Since we were already starting up an empty KafkaStreams, the only changes we need here are to get the TopologyBuilder from this KafkaStreams
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…el overrides (apache#11272)

Most configs that are read and used by Streams today originate from the properties passed in to the KafkaStreams constructor, which means they get applied universally across all threads, tasks, subtopologies, and so on. The only current exception to this is the topology.optimization config which is parsed from the properties that get passed in to StreamsBuilder#build. However there are a handful of configs that could also be scoped to the topology level, allowing users to configure each NamedTopology independently of the others, where it makes sense to do so.

This PR refactors the handling of these configs by interpreting the values passed in via KafkaStreams constructor as the global defaults, which can then be overridden for individual topologies via the properties passed in when building the NamedTopology. More topology-level configs may be added in the future, but this PR covers the following:

max.task.idle.ms
task.timeout.ms
buffered.records.per.partition
default.timestamp.extractor.class
default.deserialization.exception.handler

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>, Luke Chen <showuon@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants