KAFKA-13654: Extend KStream process with new Processor API#11993
KAFKA-13654: Extend KStream process with new Processor API#11993vvcephei merged 26 commits intoapache:trunkfrom
Conversation
vvcephei
left a comment
There was a problem hiding this comment.
Thanks, @jeqo ! Great work, as always. I've attached some comments, but not too many. I think we should be able to merge soon.
I didn't double-check that all the APIs in this PR are exactly as represented in the KIP. I'll be sure to do that before merging.
There was a problem hiding this comment.
Hmm, why is this one named NamedInternal.empty(), but the other one (on L1515) named Named.as(builder.newProcessorName(PROCESSOR_NAME))?
We should be really sure that we get the right processor naming behavior. It's extremely difficult to change them later, so if we get it wrong, we basically have to support the wrong behavior forever with a bunch of special-case logic.
There was a problem hiding this comment.
Great catch! Do you think a new processor name is needed for processValues? e.g. KSTREAM-PROCESSVALUES-
There was a problem hiding this comment.
Yeah, that's probably a good idea, actually.
There was a problem hiding this comment.
Are we missing adding the oldProcessor to the topology here?
There was a problem hiding this comment.
Interestingly enough, it seems that the old processor is not being added to the topology builder anymore.
Internal Topology doesn't even have a #addProcessor(String, OldProcessorSupplier, ...) method anymore.
Do you think there might be some regression here?
There was a problem hiding this comment.
Hmm. I seem to recall having refactored the internal topology builder code a long time ago so that the old processors would get shimmed to the new API so that we only need one path.
I guess the task here would be to double-check that adding old processors still works without attached stores, with attached stores via the addProcessor method, and with attached stores via the old ProcessorSupplier.stores().
In addition to that, I'd also double check that this code block is actually used by the last of those code paths.
Of both of those check out, then I think we have a correct implementation here.
There was a problem hiding this comment.
Added a couple of tests to KStream to test these.
There was a problem hiding this comment.
Why not just call delegate.process(record)? Actually, why do we need to add this method here at all?
There was a problem hiding this comment.
Oh, now I see. Why not just capture the result of ProcessorAdapter.adapt(mockProcessor) in the constructor and use that for the this.mockProcessor field instead? That way it would already be the right type. That's basically what we were doing before, although I agree we don't need to have that method to expose the processor on the super class.
There was a problem hiding this comment.
Got it. Replaced the initialization with: this.mockProcessor = (MockApiProcessor<...>) ProcessorAdapter.adapt(mockProcessor);
Let me now if this look good now.
There was a problem hiding this comment.
Fallback to MockProcessor instead of MockApiProcessor as there is no need to migrate this I think.
There was a problem hiding this comment.
Oy, forgot about the scala API ... again. Do we need to add the new interfaces to the scala API? Also, the scala deprecated annotation takes parameters about when it was deprecated and what you should use instead, which we should make use of.
There was a problem hiding this comment.
Got it. Params are added. I also forgot to add processValues to the Scala API. Should be there now.
There was a problem hiding this comment.
This wasn't in the KIP... Do we need it for something?
There was a problem hiding this comment.
These are required to implement from ProcessingContext#currentSystemTimeMs() and ProcessingContext#currentStreamTimeMs(). I followed a similar approach as
There was a problem hiding this comment.
Ok, I think I remember this now. The KIP that added those time APIs forgot to add them to the new Processor API and only did the old API. I think it got missed because both KIPs were going in parallel.
Can you dig up that KIP and just notify the vote thread of it that we need to amend the KIP to address this oversight? It shouldn't require a new vote or anything, but we should update the KIP.
There was a problem hiding this comment.
I see. I have updated the KIP and the vote thread 👍🏽
|
Thank you, @vvcephei !! I have applied most suggestions, let me know your thoughts 🙌🏽 |
There was a problem hiding this comment.
Just a note: Unlike in the Java API, where you could not have assigned the void result of the method to a variable, in the Scala API, someone could actually have assigned the result of this method to a Unit-typed variable. They still wouldn't have been able to do anything with it, so I think the chances are small they actually would have captured the result, and I think that in this case, it's better to make this small change, rather than add two new methods (which would be different from the Java counterparts) for 100% compatibility.
revert test commit test commit revert test commit MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (apache#11985) Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException. Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com> MINOR: Correct Connect docs on connector/task states (apache#11914) The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode and by the removal of the connector/task from the in-memory status map when running in standalone mode. As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state. Reviewers: Mickael Maison <mickael.maison@gmail.com> KAFKA-13828; Ensure reasons sent by the consumer are small (apache#12043) This PR reworks the reasons used in the ConsumerCoordinator to ensure that they remain reasonably short. Reviewers: Bruno Cadonna <bruno@confluent.io> KAFKA-13542: Add rebalance reason in Kafka Streams (apache#12018) Reviewers: Bruno Cadonna <bruno@confluent.io>, David Jacot <djacot@confluent.io> MINOR: Verify stopReplica if broker epoch not stale (apache#12040) Verify that ReplicaManager.stopReplica is called if the stop replica request doesn't result in a stale broker epoch error. Reviewers: Mickael Maison <mimaison@users.noreply.github.com> KAFKA-13651; Add audit logging to `StandardAuthorizer` (apache#12031) This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It follows the same conventions as AclAuthorizer with a similarly formatted log message. When logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at TRACE. Reviewers: Colin P. McCabe <cmccabe@apache.org> KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode apache#11910 In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric names. We should implement this in KRaft mode. This PR also changes TopicCommandIntegrationTest to support KRaft mode. Reviewers: Colin P. McCabe <cmccabe@apache.org> KAFKA-12613: Fix inconsistent validation logic between KafkaConfig and LogConfig (apache#10472) Reviewers: Mickael Maison <mickael.maison@gmail.com> KAFKA-13823 Feature flag changes from KIP-778 (apache#12036) This PR includes the changes to feature flags that were outlined in KIP-778. Specifically, it changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because FeatureLevelRecord was unused previously, we do not need to introduce a new version. The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade, downgrade, and disable sub-commands. Refer to [KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more details on the new command structure. Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com> MINOR: Update LICENSE-binary (apache#12051) Updates the license file. Reviewer: Bill Bejeck <bbejeck@apache.org> MINOR: Remove redundant conditional judgments in Selector.clear() (apache#12048) Condition 'sendFailed' is always 'false' when reached. Reviewers: Guozhang Wang <wangguoz@gmail.com> MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (apache#12041) Following PR apache#11940, ignore unused config when ConsumerCoordinator is not constructed. Reviewers: Guozhang Wang <wangguoz@gmail.com> [MINOR] Update upgrade documentation for 3.2 (apache#12055) Reviewer: Bruno Cadonna <cadonna@apache.org> MINOR: Move some TopicCommand and ConfigCommand integration tests to unit tests (apache#12024) Move some TopicCommand and ConfigCommand integration tests to unit tests to speed up the tests Reviewers: Luke Chen <showuon@gmail.com> MINOR: Improve the description of principal under different mechanisms of sasl (apache#11947) Reviewers: Mickael Maison <mickael.maison@gmail.com> MINOR: update comment in LocalLog.replaceSegments() (apache#12054) Reviewers: Luke Chen <showuon@gmail.com> MINOR: Make link in quickstart dynamic (apache#12057) Reviewer: Matthias J. Sax <mjsax@apache.org> KAFKA-13769: Explicitly route FK join results to correct partitions (apache#11945) Prior to this commit FK response sink routed FK results to SubscriptionResolverJoinProcessorSupplier using the primary key. There are cases, where this behavior is incorrect. For example, if KTable key serde differs from the data source serde which might happen without a key changing operation. Instead of determining the resolver partition by serializing the PK this patch includes target partition in SubscriptionWrapper payloads. Default FK response-sink partitioner extracts the correct partition from the value and routes the message accordingly. Reviewers: Matthias J. Sax <matthias@confluent.io> KAFKA-13807: Fix incrementalAlterConfig and refactor some things (apache#12033) Ensure that we can set log.flush.interval.ms at the broker or cluster level via IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the second synonym rather than the first. Add a regression test to DynamicConfigChangeTest. Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to pass through information like the deadline (if there is one) and the Kafka principal which is making the request (in the future we will want to log this information). In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat request interval, to avoid heartbeats piling up on the controller queue. This should have been done previously, but we overlooked it. Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal with a lot of churn (especially in test code) whenever a new constructor parameter gets added for one of these. In ControllerConfigurationValidator, create a separate function for when we just want to validate that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the validation code for IncrementalAlterConfigs, but this was messy. Split out the replica placement code into a separate package and reorganize it a bit. Reviewers: David Arthur <mumrah@gmail.com MINOR: Correct spelling errors in KafkaRaftClient (apache#12061) Correct spelling errors in KafkaRaftClient Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com> MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (apache#12027) The second validation does not verify the second bounce because the verified producer and the verified consumer are stopped in `self.run_validation`. This means that the second `run_validation` just spit out the same information as the first one. Instead, we should just run the validation at the end. Reviewers: Jason Gustafson <jason@confluent.io> KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date (apache#11681) The reason why this test is flaky is because we have race condition at the beginning of the test, when brokers are staring up, and the adminClient is requesting for brokers metadata. Once the adminClient only got partial metadata, the test will fail, because in these tests, brokers will be shutdown to test leader election. Fix this issue by explicitly waiting for metadata cache up-to-date in waitForReadyBrokers, and let admin client get created after waitForReadyBrokers. Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com> KAFKA-13832: Fix flaky testAlterAssignment (apache#12060) In KRaft mode the metadata is not propagate in time, so we should should wait for it before make assertions. Reviewers: Luke Chen <showuon@gmail.com> KAFKA-13654: Extend KStream process with new Processor API (apache#11993) Updates the KStream process API to cover the use cases of both process and transform, and deprecate the KStream transform API. Implements KIP-820 Reviewer: John Roesler <vvcephei@apache.org> KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (apache#12063) Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a broker configuration is changed. This is backwards. This function must be called only for broker configs, and never for topic configs or cluster configs. The second bug is that there were several configurations such as max.connections which are related to broker listeners, but which do not involve changing the registered listeners. We should support these configurations in KRaft. This PR fixes the configuration change validation to support this case. Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com> KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (apache#12004) Reviewers: Mickael Maison <mickael.maison@gmail.com> MINOR: Scala cleanups in core (apache#12058) Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com> KAFKA-13799: Improve documentation for Kafka zero-copy (apache#12052) Improve documentation for Kafka zero-copy. Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer. Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Guozhang Wang <wangguoz@gmail.com> KAFKA-13785: [5/N][emit final] cache for time ordered window store (apache#12030) A new cache for RocksDBTimeOrderedWindowStore. Need this because RocksDBTimeOrderedWindowStore's key ordering is different from CachingWindowStore which has issues for MergedSortedCacheWindowStoreIterator Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com> KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names (apache#11703) Reviewers: Guozhang Wang <wangguoz@gmail.com> HOTFIX: fix broken trunk due to conflicting and overlapping commits (apache#12074) Reviewers: Victoria Xia <victoria.xia@confluent.io>, David Arthur <mumrah@gmail.com> MINOR: cleanup for postProcessAndValidateIdempotenceConfigs method (apache#12069) Reviewers: Luke Chen <showuon@gmail.com> KAFKA-13834: fix drain batch starving issue (apache#12066) In drainBatchesForOneNode method, there's possibility causing some partitions in a node will never get picked. Fix this issue by maintaining a drainIndex for each node. Reviewers: Luke Chen <showuon@gmail.com>, RivenSun <91005273+RivenSun2@users.noreply.github.com> MINOR: Improve Gradle Caching and Fix Deprecations (apache#12003) * Fix UP-TO-DATE check in `create*VersionFile` tasks `create*VersionFile` tasks explicitly declared output UP-TO-DATE status as being false. This change properly sets the inputs to `create*VersionFile` tasks to the `commitId` and `version` values and sets `receiptFile` locally rather than in an extra property. * Enable output caching for `process*Messages` tasks `process*Messages` tasks did not have output caching enabled. This change enables that caching, as well as setting a property name and RELATIVE path sensitivity. * Fix existing Gradle deprecations Replaces `JavaExec#main` with `JavaExec#mainClass` Replaces `Report#destination` with `Report#outputLocation` Adds a `generator` configuration to projects that need to resolve the `generator` project (rather than referencing the runtimeClasspath of the `generator` project from other project contexts. Reviewers: Mickael Maison <mickael.maison@gmail.com> MINOR; Fix partition change record noop check (apache#12073) When LeaderRecoveryState was added to the PartitionChangeRecord, the check for being a noop was not updated. This commit fixes that and improves the associated test to avoid this oversight in the future. Reviewers: Colin Patrick McCabe <cmccabe@apache.org> MINOR: revert back to 60s session timeout for static membership test (apache#11881) Reviewers: Guozhang Wang <wangguoz@gmail.com> KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (apache#12075) This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically, if we had a broker registration in the metadata log, but no associated heartbeat, previously the HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the metadata log replay path in ClusterControlManager. Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
KIP-820 implementation.
Changes:
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)