Skip to content

KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks#13276

Merged
C0urante merged 3 commits intoapache:trunkfrom
yashmayya:KAFKA-14732
Feb 28, 2023
Merged

KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks#13276
C0urante merged 3 commits intoapache:trunkfrom
yashmayya:KAFKA-14732

Conversation

@yashmayya
Copy link
Copy Markdown
Contributor

  • Kafka Connect in distributed mode currently retries infinitely with a fixed retry backoff (250 ms) in case of errors arising during connector task reconfiguration.
  • Tasks can be "reconfigured" during connector startup (to get the initial task configs from the connector), a connector resume or if a connector explicitly requests it via its context.
  • Task reconfiguration essentially entails requesting a connector instance for its task configs and writing them to the Connect cluster's config storage (in case a change in task configs is detected).
  • A fixed retry backoff of 250 ms leads to very aggressive retries - consider a Debezium connector which attempts to initiate a database connection in its taskConfigs method. If the connection fails due to something like an invalid login, the Connect worker will essentially spam connection attempts frequently and indefinitely (until the connector config / database side configs are fixed).
  • An exponential backoff retry mechanism seems more well suited for the DistributedHerder::reconfigureConnectorTasksWithRetry method. The initial retry backoff is retained as 250 ms with a chosen maximum backoff of 60000 ms.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@mukkachaitanya mukkachaitanya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @yashmayya! The intent seems right. Had a couple of comments/suggestions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious if there is a way to not do infinite retries. If we are actually retrying infinitely, esp in the case of startConnector phase, then the connector just doesn't have tasks. Is it possible to somehow bubble up errors as part of connector (not task) status?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that's an interesting idea and I don't see the harm in limiting the number of retries to some reasonable value and then marking the connector as failed after that (we could include the last exception's trace in the connector's status).

Is it possible to somehow bubble up errors as part of connector (not task) status?

The AbstractHerder (DistributedHerder's parent class) implements the ConnectorStatus.Listener interface and so we should be able to update the connector's status to failed via

@Override
public void onFailure(String connector, Throwable cause) {
statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
trace(cause), workerId, generation()));
}

For instance, here we use the onFailure hook to update a connector's status as failed if there is an exception thrown during startup.

@yashmayya
Copy link
Copy Markdown
Contributor Author

Hi @C0urante, could you please take a look?

PS - I'd be happy to move the minor Javadoc improvements to a separate PR if you'd like (we never got to a conclusion here 🙂 )

Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash, this looks great.

RE non-infinite retries: probably a good idea, needs a KIP though IMO.

RE Javadoc comment updates: a separate PR would be great. Unless they're required by a functional change, they should be separated, which makes it easier to review and merge.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is great. I think it'd be worth it to perform a third and fourth tick. The third can be used to simulate successfully generating task configs after the two failed attempts, and the fourth can be used to ensure that we don't retry any further.

It's also worth noting that we're only testing the case where Connector::taskConfigs (or really, Worker::connectorTaskConfigs) fails, but the logic that's being added here applies if intra-cluster communication fails as well (which may happen if the leader of the cluster is temporarily unavailable, for example). It'd be nice if we could have test coverage for that too, but I won't block this PR on that.

Copy link
Copy Markdown
Contributor Author

@yashmayya yashmayya Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've updated the test to add another herder tick which runs a successful task reconfiguration request (I skipped the addition of another tick because the no further retries bit can be verified by the poll timeout at the end of the previous tick).

Regarding the test case for the task reconfiguration REST request to the leader - I did consider that initially but while trying to add one, there were some complications (timing related issues) arising from the use of the forwardRequestExecutor at which point I felt like it was more trouble than it was worth. However, your comment made me revisit it and I've made some changes to drop in a simple mock executor service which runs requests synchronously (on the same thread as the caller). Let me know what you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair point about the fourth tick!

I don't love using a synchronous executor here since it diverges significantly from the non-testing behavior of the herder. But, I can't think of a better way to test this without going overboard in complexity, and it does give us decent coverage.

So, good enough 👍

…check extra herder tick after task configs generated successfully; Add testTaskReconfigurationRetriesWithLeaderRequestForwardingException; Revert Javadoc changes to ExponentialBackoff
Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Yash!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants