Skip to content

KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic#12984

Merged
C0urante merged 9 commits intoapache:trunkfrom
yashmayya:KAFKA-14455
Feb 2, 2023
Merged

KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic#12984
C0urante merged 9 commits intoapache:trunkfrom
yashmayya:KAFKA-14455

Conversation

@yashmayya
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya commented Dec 13, 2022

  • Kafka Connect's POST /connectors and PUT /connectors/{connector}/config REST APIs internally simply write a message to the Connect cluster's internal config topic (which is then processed asynchronously by the herder).
  • However, no callback is passed to the producer's send method and there is no error handling in place for producer send failures (see here / here).
  • Consider one such case where the Connect worker's principal doesn't have a WRITE ACL on the cluster's config topic. Now suppose the user submits a connector's configs via one of the above two APIs. The producer send here / here won't succeed (due to a TopicAuthorizationException) but the API responses will be 201 Created success responses anyway.
  • This is a very poor UX because the connector will actually never be created but the API response would indicate a successful operation. Furthermore, this failure would only be detectable if TRACE logs are enabled (via this log) making it near impossible for users to debug.
  • This PR updates the KafkaConfigBackingStore APIs to use synchronous producer sends in the non-EOS enabled case which surfaces exceptions and also brings the behavior in-line with the EOS enabled case (where the transaction commit ensures the same behavior).
  • Side quest: update the outdated Javadoc for KafkaConfigBackingStore and improve readability of its rendered form.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@yashmayya
Copy link
Copy Markdown
Contributor Author

@C0urante could you please take a look at this whenever you get a chance? I'm sorry to keep pinging you for review requests, but I'm not aware of any other committers currently taking a look at Connect PRs :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe here we're intentionally relying on this behavior of commitTransaction to propagate errors:

Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable
errors, this method will throw the last received exception immediately and the transaction will not be committed.
So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.

I'm not sure if we're in danger of double-completing the callback, and whether semantically the callback should be completed after the commitTransaction completes or an exception is thrown from commitTransaction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe here we're intentionally relying on this behavior of commitTransaction to propagate errors:

Thanks for pointing this out!

As per https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-

When used as part of a transaction, it is not necessary to define a callback or check the result of the future in order to detect errors from send. If any of the send calls failed with an irrecoverable error, the final [commitTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--) call will fail and throw the exception from the last failed send. When this happens, your application should call [abortTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--) to reset the state and continue to send data.

So what you're saying definitely does make sense, and I tried this scenario out manually - for an EOS enabled worker, the response from the POST /connectors API when the worker's principal doesn't have a WRITE ACL on the config topic is (without the changes from this PR):

{
  "error_code": 500,
  "message": "Failed to write to config topic; this may be due to a transient error and the request can be safely retried"
}

While the worker logs do have the exact root cause as well (the TopicAuthorizationException), I believe the REST API response in this case isn't all that helpful to the user. With the changes from this PR, the response in the same scenario looks like:

{
  "error_code": 500,
  "message": "Not authorized to access topics: [connect-configs]"
}

I'm not sure if we're in danger of double-completing the callback

I don't believe we are in danger of double completing the callback, although you're right in that we are unnecessarily handling the error in two places in a way.

We could avoid the use of callbacks in the producer send (although I couldn't find anything that explicitly warns against doing so) when using the transactional producer and instead refactor this / this so that the final exception that is thrown is the producer exception itself rather than the (doubly) wrapped one. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should certainly try to propagate the more informative error message. Maybe we can find a control flow where the callback error is propagated before calling the commitTransaction and getting the more generic error message.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that commitTransaction ensures that all producer records are flushed and all pending callbacks invoked before the transaction is committed, so this - where the callback error is propagated before calling the commitTransaction and getting the more generic error message. should already be the case with the current changes?

However, it doesn't look like using callbacks with the transactional producer offers much benefit - we could simply reword the existing exception message (this may be due to a transient error and the request can be safely retried) to indicate that it could potentially denote a non-transient error as well. Furthermore, the specific case that this PR attempted to fix (missing WRITE ACL on the config topic not being surfaced to users properly) is anyway highly unlikely to go unnoticed in an EOS enabled Connect cluster since the herder thread itself will repeatedly hit this condition here in the tick loop (the worker logs will reveal the underlying TopicAuthorizationException) and request processing won't happen at all (all external requests that are run on the herder's thread will timeout).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should already be the case with the current changes

It may be the case but not for the reason you specified. The guarantee from the producer is that the record callbacks are completed before the commitTransaction returns, while I was suggesting a control flow where we ensure that record callbacks are completed before the commitTransaction is called. If you still call commitTransaction, you will get both the callback's error and the synchronous error from commitTransaction, and would still need to make sure that the correct exception is shadowed. The control over this shadowing is handled by this code and whoever is terminating the callback in the REST layer.

we could simply reword the existing exception message

This is certainly the easy way out, but I don't think we should settle for that. Vague error messages indicate that they aren't capturing and reporting the exception properly, and making them more vague to compensate doesn't help anyone diagnose the underlying issue. If we can make a control flow change that reports the errors faithfully, that is going to help someone down the line debugging it.

the worker logs will reveal the underlying TopicAuthorizationException

Is this the case? The worker is using this code path to write the session key; If we're hiding the result from the REST calls, are we not also hiding the error from the herder tick thread?

Copy link
Copy Markdown
Contributor Author

@yashmayya yashmayya Jan 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow what benefit we'd be getting here by handling both the producer callback error as well as the one thrown by commitTransaction? The control flow would be more straightforward by removing the producer callback and just relying on commitTransaction to throw exceptions, if any. The producer's Javadoc itself also suggests that callbacks need not be defined when using the transactional producer since commitTransaction will throw the error from the last failed send in a transaction.

making them more vague to compensate

We wouldn't be making it more vague. The message would state that the write to the config topic failed which is the cause for failure. Since the exception mapper used by Connect's REST server only writes the top level exception's message to the response (i.e. nested exceptions aren't surfaced via the REST API response), I think it makes sense to keep the top level exception's message generic and allow users to debug further via the worker logs (where the entire exception chain's stack trace will be visible). Note that I'm suggesting a similar change for the non-EOS enabled case as well - i.e. don't use the producer error directly here, instead wrapping it in a ConnectException which says that the write to the config topic failed. The reasoning here is that since a Connect user may not even know that Connect uses a producer under the hood to write certain requests to the config topic for asynchronous processing, it would make more sense to have an informative Connect specific exception message rather than directly throwing the producer exception which may or may not contain enough details to be relevant to a Connect user.

If we're hiding the result from the REST calls, are we not also hiding the error from the herder tick thread?

Hm no, the hiding issue was only for non-EOS enabled workers. Like I've pointed out above, for workers that have EOS enabled, the REST API does return a 500 response.

Edit: Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the usage of the producer callback here since we're moving to synchronous usage of producer send in the non-EOS case as well anyway (aside from the earlier point that it doesn't really make sense to handle errors via both a callback as well as commitTransaction). The behavior of surfacing exceptions synchronously is similar in both cases now; one through calling get() on the returned future from Producer::send and the other through Producer::commitTransaction.

Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain.

Yet another option for this could be to simply append a "Check the worker logs for more details on the error" to the top level exception's message in the REST API response (the worker logs will have the entire exception chain). Thoughts?

@yashmayya yashmayya requested a review from gharris1727 January 17, 2023 07:38
Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fix!

Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash! Left a few comments.

Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash, just a couple more thoughts.

* Get the current time in milliseconds. This will return the same cached value until the timer
* has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
*
* <p>
Copy link
Copy Markdown
Contributor

@C0urante C0urante Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, would you mind saving nonessential improvements like this for a dedicated PR? They tend to add noise to the diff and makes things harder to review.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but wouldn't it also be slightly strange to create a dedicated PR for such a trivial change? 😄

Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good, almost there!

Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Yash!

We can't backport this to 3.4 right now, but given the importance of this fix, I'd like to once we're beyond code freeze. Filed https://issues.apache.org/jira/browse/KAFKA-14674 to track.

@C0urante C0urante merged commit a3cf8b5 into apache:trunk Feb 2, 2023
@yashmayya
Copy link
Copy Markdown
Contributor Author

Thanks Greg and Chris for the great reviews on this one!

C0urante pushed a commit that referenced this pull request Feb 8, 2023
… failures while writing to the config topic (#12984)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
C0urante pushed a commit that referenced this pull request Feb 8, 2023
… failures while writing to the config topic (#12984)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
@C0urante
Copy link
Copy Markdown
Contributor

C0urante commented Feb 8, 2023

Finished backporting to 3.3 and 3.4, now that both active releases on those branches have concluded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants