KAFKA-12965 - Graceful clean up of task error metrics#10910
KAFKA-12965 - Graceful clean up of task error metrics#10910C0urante merged 6 commits intoapache:trunkfrom
Conversation
|
@kkonstantine can you help reviewing the PR |
C0urante
left a comment
There was a problem hiding this comment.
Thanks for identifying and fixing this issue @ramesh-muthusamy. I've left a few comments but overall this is a great improvement.
There was a problem hiding this comment.
Rather than do this bookkeeping here, could we pass the ErrorHandlingMetrics instance to the WorkerTask class in its constructor, and then close it in WorkerTask::removeMetrics? It'd align nicely with the existing contract for that method, which is that it will "Remove all metrics published by this task."
There was a problem hiding this comment.
Actually I did explore that option, but unfortunately the worker task instantiation happens post registration of error handling metrics . From the object lifecycle view I would then need to pass the destruction of the error handling metrics to worker source task while still keeping the instantiation to worker task.
There was a problem hiding this comment.
Sorry, I didn't mean instantiating the ErrorHandlingMetrics object from within the WorkerTask class, but rather, accepting it as a constructor parameter so that it can be closed during removeMetrics. Something like:
abstract class WorkerTask implements Runnable {
private final ErrorHandlingMetrics errorMetrics; // NEW
public WorkerTask(ConnectorTaskId id,
TaskStatus.Listener statusListener,
TargetState initialState,
ClassLoader loader,
ConnectMetrics connectMetrics,
ErrorHandlingMetrics errorMetrics, // NEW
RetryWithToleranceOperator retryWithToleranceOperator,
Time time,
StatusBackingStore statusBackingStore) {
this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
this.errorMetrics = errorMetrics; // NEW
this.statusListener = taskMetricsGroup;
this.loader = loader;
this.targetState = initialState;
this.stopping = false;
this.cancelled = false;
this.taskMetricsGroup.recordState(this.targetState);
this.retryWithToleranceOperator = retryWithToleranceOperator;
this.time = time;
this.statusBackingStore = statusBackingStore;
}
public void removeMetrics() {
// Close quietly here so that we can be sure to close everything even if one attempt fails
Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
}
}How does that look?
There was a problem hiding this comment.
@C0urante I have incorporated the review comments, can you help review.
There was a problem hiding this comment.
Nit: whitespace
| public void closeTaskErrorMetricGroup() { | |
| public void closeTaskErrorMetricGroup() { |
Also, just curious, any reason we don't want to implement Autocloseable and rename this method to close? It'd align nicely with the precedent set in #8442, for example, and would make this class easer to use with Utils::closeQuietly if we wanted to go that route in the future.
There was a problem hiding this comment.
good idea, I will incorporate the recommendation
|
Thanks, LGTM pending unit tests to verify expected behavior and help prevent regressions in the future. |
There was a problem hiding this comment.
Nit:
| public void close() { | |
| @Override | |
| public void close() { |
There was a problem hiding this comment.
This is resolved, incorporated comment
|
Any update on when this diff can be landed? Incase you need some help, I am happy to help. This is a problem that we have had to deal with too. |
|
@ramesh-muthusamy can you resolve the merge conflicts? Once that's done, I can do another round and, if everything still looks good, merge this PR. |
|
Hi Chris,
Yes thank you
…-Ramesh
On Fri, Sep 2, 2022 at 6:44 AM Chris Egerton ***@***.***> wrote:
@ramesh-muthusamy <https://github.com/ramesh-muthusamy> can you resolve
the merge conflicts? Once that's done, I can do another round and, if
everything still looks good, merge this PR.
—
Reply to this email directly, view it on GitHub
<#10910 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AB57QP36E7HWAJSXU6BX5VTV4HSBHANCNFSM47BK7ZUQ>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
|
Hi @ramesh-muthusamy--any plans to rebase this? |
Hi Chris, I will get to this this weekend. Apologies for the delay. |
|
No worries! If for some reason it's difficult to find the time to carry this across the finish line, let me know and I can either complete the remaining work myself or find another contributor who can take care of it. Barring that, looking forward to reviewing the rebase :) |
…being closed from a worker
b4d8d6f to
62fb20e
Compare
|
@C0urante I have rebased the PR to the latest trunk and resolved the merged conflicts. I have also looked at the failed tests and they seem to be related to timeouts in worker threads. Please let me know |
C0urante
left a comment
There was a problem hiding this comment.
Thanks @ramesh-muthusamy, LGTM
Reviewers: Chris Egerton <chrise@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>
…50) Reviewers: Chris Egerton <chrise@aiven.io> Co-authored-by: Ramesh <ramesh.154089@gmail.com>
Issue:
We noticed that the Error metrics reported in Kafka Connect worker continues to stay even after the task is re distributed to another worker. As a result you would notice over a period of time the task_error_metrics of a worker would contain the errors of all the tasks that it had ever come across.
This is an anti pattern to what other metrics are reported by Kafka Connect worker. The Kafka Connect worker should only report the error metrics of the present task and leave the persistence of the previous tasks to the metrics storage system that is consuming these metrics.
In the below example we notice that there is only 2 active tasks that are running , but we have more than 20+ tasks error metrics that are available.
Task counter mbean:
{"request":{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"}
,"value":{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200}
Task Error metrics mbean:
{"request":{"mbean":"kafka.connect:connector=,task=,type=task-error-metrics","type":"read"}
,"value":{"kafka.connect:connector=,task=35,type=task-error-metrics":
{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0}
,"kafka.connect:connector=,task=38,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=14,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=5,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=0,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=29,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=37,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=28,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=25,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=91,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=31,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=7,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=74,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=2,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=26,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=30,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=,task=53,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=****,task=16,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0, .....
Solution:
As part of the bug fix to KAFKA-12965 introducing code changes to gracefully cleanup the error handling metrics associated with a task. This is required to avoid duplicate metrics of task being reported from a worker that had the same task in the past.
UT - Not yet covered, in progress