From ac018f13db848d1ec69e764a77a5aaad8ec63784 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 16 Sep 2024 01:03:51 -0700 Subject: [PATCH 1/2] Remove workerId parameter from postWorkerError. It was redundant to MSQErrorReport#getTaskId. --- .../java/org/apache/druid/msq/exec/ControllerClient.java | 5 +---- .../src/main/java/org/apache/druid/msq/exec/WorkerImpl.java | 2 +- .../druid/msq/indexing/client/IndexerControllerClient.java | 4 ++-- .../msq/indexing/error/MSQWarningReportLimiterPublisher.java | 2 +- .../org/apache/druid/msq/test/MSQTestControllerClient.java | 2 +- 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java index 428ce59cd8fa..f56b752133f6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java @@ -83,10 +83,7 @@ void postResultsComplete( /** * Client side method to inform the controller that the error has occured in the given worker. */ - void postWorkerError( - String workerId, - MSQErrorReport errorWrapper - ) throws IOException; + void postWorkerError(MSQErrorReport errorWrapper) throws IOException; /** * Client side method to inform the controller about the warnings generated by the given worker. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 74e3850c6e96..418d58772d1b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -203,7 +203,7 @@ public void run() log.warn("%s", logMessage); if (controllerAlive) { - controllerClient.postWorkerError(context.workerId(), errorReport); + controllerClient.postWorkerError(errorReport); } if (t != null) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java index 1e31de71a8ac..1a420d69b6c9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java @@ -125,11 +125,11 @@ public void postResultsComplete(StageId stageId, int workerNumber, @Nullable Obj } @Override - public void postWorkerError(String workerId, MSQErrorReport errorWrapper) throws IOException + public void postWorkerError(MSQErrorReport errorWrapper) throws IOException { final String path = StringUtils.format( "/workerError/%s", - StringUtils.urlEncode(workerId) + StringUtils.urlEncode(errorWrapper.getTaskId()) ); doRequest( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java index 9a8b3f79f6d2..352d4fa1310d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java @@ -97,7 +97,7 @@ public void publishException(int stageNumber, Throwable e) // Send the warning as an error if it is disallowed altogether if (criticalWarningCodes.contains(errorCode)) { try { - controllerClient.postWorkerError(workerId, MSQErrorReport.fromException(workerId, host, stageNumber, e)); + controllerClient.postWorkerError(MSQErrorReport.fromException(workerId, host, stageNumber, e)); } catch (IOException postException) { throw new RE(postException, "Failed to post the worker error [%s] to the controller", errorCode); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java index 4c7ca61be023..3791be4f309e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java @@ -75,7 +75,7 @@ public void postResultsComplete(StageId stageId, int workerNumber, @Nullable Obj } @Override - public void postWorkerError(String workerId, MSQErrorReport errorWrapper) + public void postWorkerError(MSQErrorReport errorWrapper) { controller.workerError(errorWrapper); } From 63582e7c383e713b129073c4f7f35e48897ef826 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 16 Sep 2024 20:58:59 -0700 Subject: [PATCH 2/2] Fix javadoc. --- .../src/main/java/org/apache/druid/msq/exec/Controller.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java index d2370b057935..d316b9b6b0b7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java @@ -84,7 +84,7 @@ void updatePartialKeyStatisticsInformation( * taskId, not by query/stage/worker, because system errors are associated * with a task rather than a specific query/stage/worker execution context. * - * @see ControllerClient#postWorkerError(String, MSQErrorReport) + * @see ControllerClient#postWorkerError(MSQErrorReport) */ void workerError(MSQErrorReport errorReport);