KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions#8618
KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions#8618kkonstantine merged 12 commits intoapache:trunkfrom
Conversation
* Catches exceptions from SinkTask::close call in WorkerSinkTask * Logs exception at ERROR level but does not propagate exception upward * Add unit test that throws exceptions in put and close to verify that the exception from put is propagated out of WorkerSinkTask::execute Signed-off-by: Greg Harris <gregh@confluent.io>
337699b to
1a2cdbb
Compare
This avoids swallowing exceptions that would have stopped the task in absence of an exception from start or put. Signed-off-by: Greg Harris <gregh@confluent.io>
Signed-off-by: Greg Harris <gregh@confluent.io>
Signed-off-by: Greg Harris <gregh@confluent.io>
Signed-off-by: Greg Harris <gregh@confluent.io>
C0urante
left a comment
There was a problem hiding this comment.
Thanks, @gharris1727! This looks great, and the confusing exception logging here is definitely something that's bitten me in the past when debugging connectors. Just a few comments on the test; the functional bits look fine.
Signed-off-by: Greg Harris <gregh@confluent.io>
…ions Signed-off-by: Greg Harris <gregh@confluent.io>
Signed-off-by: Greg Harris <gregh@confluent.io>
|
@rhauch @kkonstantine This is ready for committer review. Thanks for taking a look! |
ncliang
left a comment
There was a problem hiding this comment.
Thanks @gharris1727 . I have a question about how the suppressed exceptions are logged.
| try { | ||
| // Make sure any uncommitted data has been committed and the task has | ||
| // a chance to clean up its state | ||
| try (QuietClosable ignored = this::closePartitions) { |
There was a problem hiding this comment.
I am not sure I understand how the suppressed exception is logged and not just silently swallowed? Do we need to call getSuppressed and log those somewhere or use one of those closeQuietly methods on Utils?
There was a problem hiding this comment.
This PR is not changing any of the printing logic, and that's still handled by the caller, WorkerTask::doRun. This is roughly what a suppressed exception looks like when it gets logged (from the test setup, not a live connector):
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:569)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
<snipped>
Suppressed: java.lang.RuntimeException
at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:97)
at org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.close(<generated>)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:402)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:599)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
... 57 more
Caused by: java.lang.RuntimeException
at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:68)
at org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.put(<generated>)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:547)
... 60 more
Suppressed exceptions are a native Java feature, and log4j supports printing their stacktraces.
There was a problem hiding this comment.
Again naming here can be misleading. ignored is more like unused in the try block.
But also that's not the point of this idiom. It's about suppressing exceptions from finally instead of the originator.
How about suppressible? Also, unused might be even better, because ignored is untrue especially if closePartitions is the only method that throws. But I think suppressible highlights the intentions here specifically.
There was a problem hiding this comment.
I used ignored as a way to trigger the IDE to avoid highlighting the unused variable.
I can change this to suppressible since the behavior is better described (suppressible "if another exception occurs first").
There was a problem hiding this comment.
That's a nice improvement.
I have a few comments aiming to offer more clarity.
Also,
Catches exceptions from WorkerSinkTask::closePartitions call
That hasn't changed from before right? The fix now preserves all the exceptions and suppresses the ones thrown on the (former) "finally" block.
| try { | ||
| // Make sure any uncommitted data has been committed and the task has | ||
| // a chance to clean up its state | ||
| try (QuietClosable ignored = this::closePartitions) { |
There was a problem hiding this comment.
Again naming here can be misleading. ignored is more like unused in the try block.
But also that's not the point of this idiom. It's about suppressing exceptions from finally instead of the originator.
How about suppressible? Also, unused might be even better, because ignored is untrue especially if closePartitions is the only method that throws. But I think suppressible highlights the intentions here specifically.
| } | ||
|
|
||
| @Test | ||
| public void testSinkTasksHandleCloseErrors() throws Exception { |
There was a problem hiding this comment.
Can you also write a test where an exception is thrown only by sinkTask.close? Actually, we could keep the name for this new test here as is, and the new test could be named in a way that tells us that the exceptions on close are suppressed in the presence of exceptions in the main try block.
There was a problem hiding this comment.
Ive written the test as described, and verified that the exception from close is caught by WorkerTask::doRun.
However, it never gets wrapped in a ConnectException like exceptions from the other connector methods, but i'm not sure that it's in-scope to change this in this PR.
There was a problem hiding this comment.
Let me see if I understand what you are describing as wrapped.
My use case is as follows:
SinkTask#close attempts to release resources and if it fails it throws a ConnectException as we'd expect from connector developers to do (currently it throws a RuntimeException which might be less representative).
With your fix this exception can appear as suppressed when an exception happens in SinkTask#put and that's what your test is guarding against.
My point is to add the missing test case for when the exception on close is the only exception that is thrown. There is a variety of ways to do that, but I agree with you, this test is not there now. However, I don't think this necessarily makes it out of scope.
There was a problem hiding this comment.
I added that test, so please look at the diff.
By wrapping, i'm completely disregarding the behavior from the connector, I'm only discussing the additional layers of exceptions added by the framework before printing.
At the moment, the RuntimeException from put is wrapped by a ConnectException, but the RuntimeException from close is never wrapped in a ConnectException. I'm questioning whether this is the ideal behavior, and whether we should add that wrapping layer, or consider it out-of-scope for this PR.
There was a problem hiding this comment.
That's the testing I had in mind. That's great.
No need to change the exception type right now.
Signed-off-by: Greg Harris <gregh@confluent.io>
Signed-off-by: Greg Harris <gregh@confluent.io>
Signed-off-by: Greg Harris <gregh@confluent.io>
|
ok to test |
|
retest this please |
|
ok to test |
kkonstantine
left a comment
There was a problem hiding this comment.
jdk11 and 14 green. jdk8 has some troubles starting recently.
LGTM
…8618) * If two exceptions are thrown the `closePartitions` exception is suppressed * Add unit tests that throw exceptions in put and close to verify that the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
…8618) * If two exceptions are thrown the `closePartitions` exception is suppressed * Add unit tests that throw exceptions in put and close to verify that the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
…8618) * If two exceptions are thrown the `closePartitions` exception is suppressed * Add unit tests that throw exceptions in put and close to verify that the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
closePartitionsexception is suppressedthe exception from put is propagated out of WorkerSinkTask::execute
Signed-off-by: Greg Harris gregh@confluent.io
Committer Checklist (excluded from commit message)