KAFKA-9971: Error Reporting in Sink Connectors (KIP-610)#8720
KAFKA-9971: Error Reporting in Sink Connectors (KIP-610)#8720kkonstantine merged 8 commits intoapache:trunkfrom
Conversation
levzem
left a comment
There was a problem hiding this comment.
@aakashnshah looks great! excited for this feature
just got mostly nits for you
There was a problem hiding this comment.
do we need this invalid config step here
There was a problem hiding this comment.
I don't think it hurts to add this step.
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @aakashnshah! I have a few comments below, including a kind of big question: why are we not reusing the existing RetryWithToleranceOperator that already tracks the tolerance and 1 or more reporters?
There was a problem hiding this comment.
All fields that can be final should be marked as such. This provides semantic intent to future developers and helps prevent unintentionally changing the fields in the future.
There was a problem hiding this comment.
Do we really want to pass the ExecutionException to the ConnectException, or would it be better to pass that exception's cause to the ConnectException?
How about log messages here?
There was a problem hiding this comment.
I suggested earlier about reusing the RetryWithToleranceOperator, and that doing so might require adding a produce-like method to that class that simply reports a new error. If that method took a Callback here and passed it to its producer.send(...) call, then we could provide a callback that removed the (completed) future from our list, helping to keep that list as small as possible with only the incomplete futures.
If we did that, we'd want to use a LinkedList rather than an ArrayList, since we're no longer removing futures only from the ends of the list.
There was a problem hiding this comment.
If we ensure that the producer future is never null, then we can remove the if (future == null) kind of checks in this class' methods.
gharris1727
left a comment
There was a problem hiding this comment.
Please add your new API class to the isolation whitelist, and update the corresponding tests.
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java#L130-L143
There was a problem hiding this comment.
Does this test ever encounter this exception? I don't think we will be able to backport this test to < 2.6 because the method won't exist at all, much less generate the exception that is being caught here.
If anything, this generates a less informative NPE later in put, and hides the actual root cause.
There was a problem hiding this comment.
Yeah, I should remove this comment. This test won't encounter this exception since it's always going to have the class and method if the test itself exists.
There was a problem hiding this comment.
Does this have an unbounded waiting time? How does this interact with task.shutdown.graceful.timeout.ms? What is the delivery guarantee of these error reports?
There was a problem hiding this comment.
Yes, the waiting time is unbounded. The delivery guarantee is that all errant records up to the latest offset in preCommit() will be sent to Kafka before preCommit() is invoked.
|
ok to test |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @aakashnshah. I think this is much closer to a more straightforward implementation of the KIP.
I took another pass, and there are quite a few things to change/correct. I'll keep reviewing the test code in a subsequent review.
There was a problem hiding this comment.
You can't change this public API. InternalSinkRecord needs to be in the runtime module.
There was a problem hiding this comment.
When you move InternalSinkRecord to the runtime module, be sure to make this private final.
rhauch
left a comment
There was a problem hiding this comment.
Made another pass, with comments inline below. Thanks, @aakashnshah.
There was a problem hiding this comment.
Let's add a trace log message before and after this call.
Signed-off-by: Aakash Shah <ashah@confluent.io>
Signed-off-by: Aakash Shah <ashah@confluent.io>
Signed-off-by: Aakash Shah <ashah@confluent.io>
|
retest this please |
|
ok to test |
|
retest this please |
|
ok to test |
|
retest this please |
1 similar comment
|
retest this please |
|
Retest this please. |
|
ok to test |
|
Closing to try to fix builds; will reopen shortly. |
|
ok to test |
|
retest this please |
|
retest this please. |
|
retest this please |
rhauch
left a comment
There was a problem hiding this comment.
A few more fixes. It's getting close.
|
Not sure what's happening, but here are some finally-running builds from previous commit (0e40): |
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks for the PR @aakashnshah
Looks good overall.
I left a few mostly minor comments after a quick pass, if there's time to address.
| @Override | ||
| public boolean equals(Object o) { | ||
| return super.equals(o); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return super.hashCode(); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return super.toString(); | ||
| } |
There was a problem hiding this comment.
these overrides don't seem to add much.
There was a problem hiding this comment.
IIUC, spotbugs complained if these were not here.
There was a problem hiding this comment.
never mind then. I'll leave this to AI.
There was a problem hiding this comment.
Also, topic can never be null if it's coming from a parsed config value that doesn't have null as its default value. (another way to think of that is that you can't pass a null value from properties)
|
|
||
| // delete connector | ||
| connect.deleteConnector(CONNECTOR_NAME); | ||
|
|
|
Three green builds of commit
@aakashnshah, let's get fixes for @kkonstantine's suggestions. Thanks! |
|
ok to test |
|
retest this please |
1 similar comment
|
retest this please |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @aakashnshah! LGTM, pending green builds (that are finally running!)
kkonstantine
left a comment
There was a problem hiding this comment.
LGTM
Nicely done @aakashnshah
|
2/3 builds are green. Merging now to allow for other PRs to resolve conflicts sooner rather than later, if needed. |
* 'trunk' of github.com:apache/kafka: (36 commits) Remove redundant `containsKey` call in KafkaProducer (apache#8761) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) MINOR: Log the reason for coordinator discovery failure (apache#8747) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) MINOR: remove unnecessary timeout for admin request (apache#8738) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) MINOR: Update documentation.html to refer to 2.6 (apache#8745) MINOR: Update documentation.html to refer to 2.5 (apache#8744) KAFKA-9673: Filter and Conditional SMTs (apache#8699) KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720) KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735) MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728) KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736) KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731) KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala
Implementation for KIP-610: https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
This PR adds the
ErrantRecordReporterinterface as well as its implementation -WorkerErrantRecordReporter. TheWorkerErrantRecordReporteris created inWorkerand brought up throughWorkerSinkTasktoWorkerSinkTaskContext. An integration test and unit test has been added.