Skip to content

KAFKA-16448: Unify error-callback exception handling#16745

Merged
mjsax merged 5 commits intoapache:trunkfrom
mjsax:minor-kip-1033
Aug 3, 2024
Merged

KAFKA-16448: Unify error-callback exception handling#16745
mjsax merged 5 commits intoapache:trunkfrom
mjsax:minor-kip-1033

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jul 31, 2024

@sebastienviale @loicgreffier -- as discussed on other PR review, I think we should apply some cleanup and unification for the different handler. Would love to get your feedback.

\cc @cadonna @lucasbru @bbejeck

", processorNodeId='" + processorNodeId + '\'' +
", taskId=" + taskId +
'}';
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omitting headers on purpose, as we should not log user-data.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's late in the game to ask this, but what about a timestamp? Just a thought, I don't have a strong opinion here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Sounds like a miss... \cc @sebastienviale @loicgreffier

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side comment: I would not block this PR on this question, but do a follow up one, if we want to add ts.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I agree it shouldn't block, adding a timestamp in a follow on PR is fine.

globalProcessorContext.metrics()
)
),
null
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup as discussed on some other PR.

// Rethrow exceptions that should not be handled here
throw e;
} catch (final RuntimeException e) {
} catch (final RuntimeException processingException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup: use proper variable names

keySerializer,
exception);
} catch (final Exception exception) {
} catch (final RuntimeException serializationException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup: use proper variable names


private void recordSendError(final String topic,
final Exception exception,
final Exception productionException,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup: use proper variable names

final ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor droppedRecordsSensor) {
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, null);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup as discussed on some other PR


try {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor);
} catch (final TimeoutException timeoutException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally unrelated (might actually to good to put into different PR, but want to avoid that I forget it).

Seems we forgot to handle this case entirely... we might call context.forward() inside of punctuate() and thus need to handle the same error cases as for node.process()...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a TimeoutException here? Could a punctuate potentially throw it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we forgot to handle this case entirely... we might call context.forward() inside of punctuate() and thus need to handle the same error cases as for node.process()...

We could end up in a sink node, and thus in RecordCollectorImpl / producer.send() which could throw a TimeoutException.

log.error("Fatal when handling serialization exception", e);
recordSendError(topic, e, null, context, processorNodeId);
return;
throw new FailedProcessingException("Fatal user code error in production error callback", fatalUserException);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: inside send() is seem incorrect to call recordSendError() which we only use in from the producer.send(..., Calback) which is called async. Here inside RecordCollector.send() (before we hit producer.send()) can should rather throw directly

originalException);
}

private void handleException(final String errorMessage, final Throwable originalException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an attempt to preserve the error message we create when a handler throws by itself.

private void handleException(final Throwable originalException) {
handleException(
String.format(
"Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given what we pass in originalException into `StreamsException, it seems not necessary to include the stracktrace in the error message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to break two unit tests in ProcessingExceptionHandlerIntegrationTest while testing exception.getMessage

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! -- I did only run a few test locally, not the whole suite. Will take a look.

@sebastienviale
Copy link
Copy Markdown
Contributor

thanks for the PR, you're just ahead of me

@sebastienviale
Copy link
Copy Markdown
Contributor

I just notice a nit in ProductionExceptionHandler

 * @param record        the record that failed to serialize
 * @param exception     the exception that occurred during serialization
 * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
 */
@Deprecated
default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record,
                                                                        final Exception exception) {
    return ProductionExceptionHandlerResponse.FAIL;
}

should be

  • @deprecated Since 3.9. Use {@link #handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead.

@loicgreffier
Copy link
Copy Markdown
Contributor

loicgreffier commented Jul 31, 2024

A very little typo Use Please brought by KAFKA-16448 here:

* @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)}

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 1, 2024

Updates this PR, and also extended it to align all 5 cases we have:

  • deserialzation
  • production / serialization
  • production / send
  • processing
  • punctuation

We now handle both error cases "handler throws exception" and "handler returns null" (this one I just realized while working on this PR and just added it).


@Test
public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() {
public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-purposed this test case for NULL case.

It seem redundant to test "throw exception" twice as originally in the code (one for FAIL and once for CONTINUE), because if we throw, there is no response we return at all...

public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
if (((String) record.key()).contains("FATAL")) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed any longer -- we just need one handler that can throw an exception

taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to test 4 cases, so added Optional to make this possible.

assertEquals(rawRecord.timestamp(), record.timestamp());
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(rawRecord.headers(), record.headers());
try (final Metrics metrics = new Metrics()) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup. Metrics must be close to avoid resource leaks. (A few more times below.)


@Test
public void shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() {
public void punctuateShouldNotHandleFailProcessingExceptionAndThrowStreamsException() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming to make it readable :) (same below)

Plus some formatting changes to make the code readable

LogAndContinueProcessingExceptionHandler.class.getName()
));

assertDoesNotThrow(() ->
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this... if it would throw, the test fails anyway.

if (uncaughtException != null || !e.getMessage().contains("Injected test exception")) {
if (uncaughtException != null ||
!(e instanceof StreamsException) ||
!e.getCause().getMessage().equals("Injected test exception.")) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required only, because the original test conditions was not "correct" anyway. The outer exception was already a StreamsException wrapping the RuntimeException the test throws. Testing the output exception for type RuntimeException does not make any sense anyway, as all non-checked exception are RuntimeException and the check passes...

The only actual change we get in this PR, is that the error message of the outer StreamsException changes -- originally, it was not set explicitly, and thus was set as "RuntimeException: Injected test exception" -- thus, even if we did check the "wrong/outer" error message the test passed. This PR now sets the StreamsException error message explicitly (what I don't consider a backward compatibility issue), but to this check, we need to check the error message of the wrapped exception (bonus, we can test for equals instead of contains).

!(e.getMessage().contains("test exception"))) {
if (++exceptionCount > 2 ||
!(e instanceof StreamsException) ||
!(e.getCause().getMessage().endsWith(" test exception."))) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. However, we cannot use equals as the use different error messages for different situation when we throw.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax overall this looks good to me, I have a couple of minor comments.

", processorNodeId='" + processorNodeId + '\'' +
", taskId=" + taskId +
'}';
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's late in the game to ask this, but what about a timestamp? Just a thought, I don't have a strong opinion here.


try {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor);
} catch (final TimeoutException timeoutException) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a TimeoutException here? Could a punctuate potentially throw it?

final Exception exception) {
assertInputs(context, exception);
return response;
if (response == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the response can be null should this be response.isEmpty?

Copy link
Copy Markdown
Member Author

@mjsax mjsax Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null is on purpose. It's how we setup the code (it's not something the handler return).

In the end response is "what should the handler return in this test run", and I miss use it to say "handler should throw an exception". I could also add a new boolean flag shouldThrowException instead to not miss use response for this case.

Will update the PR.

assertInstanceOf(RuntimeException.class, exception);
assertEquals("KABOOM!", exception.getMessage());
return response;
if (response == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above here

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mjsax mjsax merged commit 7c539f9 into apache:trunk Aug 3, 2024
@mjsax mjsax deleted the minor-kip-1033 branch August 3, 2024 19:41
mjsax added a commit that referenced this pull request Aug 3, 2024
Follow up code cleanup for KIP-1033.

This PR unifies the handling of both error cases for exception handlers:
 - handler throws an exception
 - handler returns null

The unification happens for all 5 handler cases:
 - deserialzation
 - production / serialization
 - production / send
 - processing
 - punctuation

Reviewers:  Sebastien Viale <sebastien.viale@michelin.com>, Loic Greffier <loic.greffier@michelin.com>, Bill Bejeck <bill@confluent.io>
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 3, 2024

Merged to trunk and cherry-picked to 3.9 branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants