From 86633f67921874a27993764ec5f4fd0453fdbbca Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 12 Mar 2024 17:53:50 +0530 Subject: [PATCH 1/3] Fix data race in getting results from MSQ select tasks. --- .../apache/druid/msq/exec/ControllerImpl.java | 34 +++++++++++-------- .../msq/indexing/report/MSQResultsReport.java | 24 +++++++------ 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index d62bcce04ddc..5da74f0a52a9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -462,9 +462,27 @@ public TaskStatus runTask(final Closer closer) log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError)); } } - + MSQResultsReport resultsReport = null; if (queryKernel != null && queryKernel.isSuccess()) { // If successful, encourage the tasks to exit successfully. + // get results before posting finish to the tasks. + if (resultsYielder != null) { + resultsReport = makeResultsTaskReport( + queryDef, + resultsYielder, + task.getQuerySpec().getColumnMappings(), + task.getSqlTypeNames(), + MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context()) + ); + try { + resultsYielder.close(); + } + catch (IOException e) { + throw new RuntimeException("Unable to fetch results of various worker tasks successfully", e); + } + } else { + resultsReport = null; + } postFinishToAllTasks(); workerTaskLauncher.stop(false); } else { @@ -509,7 +527,6 @@ public TaskStatus runTask(final Closer closer) try { // Write report even if something went wrong. final MSQStagesReport stagesReport; - final MSQResultsReport resultsReport; if (queryDef != null) { final Map stagePhaseMap; @@ -538,18 +555,6 @@ public TaskStatus runTask(final Closer closer) stagesReport = null; } - if (resultsYielder != null) { - resultsReport = makeResultsTaskReport( - queryDef, - resultsYielder, - task.getQuerySpec().getColumnMappings(), - task.getSqlTypeNames(), - MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context()) - ); - } else { - resultsReport = null; - } - final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload( makeStatusReport( taskStateForReport, @@ -564,7 +569,6 @@ public TaskStatus runTask(final Closer closer) countersSnapshot, resultsReport ); - context.writeReports( id(), TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload)) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java index db46b420f00a..bd4f17252afb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java @@ -83,18 +83,22 @@ public static MSQResultsReport createReportAndLimitRowsIfNeeded( MSQSelectDestination selectDestination ) { - if (selectDestination.shouldTruncateResultsInTaskReport()) { - List results = new ArrayList<>(); - int rowCount = 0; - while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) { - results.add(resultYielder.get()); - resultYielder = resultYielder.next(null); - ++rowCount; + List results = new ArrayList<>(); + int rowCount = 0; + while (!resultYielder.isDone()) { + results.add(resultYielder.get()); + resultYielder = resultYielder.next(null); + ++rowCount; + if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) { + break; } - return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), !resultYielder.isDone()); - } else { - return new MSQResultsReport(signature, sqlTypeNames, resultYielder, false); } + return new MSQResultsReport( + signature, + sqlTypeNames, + Yielders.each(Sequences.simple(results)), + !resultYielder.isDone() + ); } @JsonProperty("signature") From c648c9dadf8d33f659bc00dd81e2524a48c488c8 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 12 Mar 2024 23:40:50 +0530 Subject: [PATCH 2/3] Add better logging --- .../msq/indexing/report/MSQResultsReport.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java index bd4f17252afb..e6eb9681f61c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java @@ -28,8 +28,10 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; @@ -39,6 +41,7 @@ public class MSQResultsReport { + private static final Logger log = new Logger(MSQResultsReport.class); /** * Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility * with SQL (which also allows duplicate column names in query results). @@ -84,7 +87,8 @@ public static MSQResultsReport createReportAndLimitRowsIfNeeded( ) { List results = new ArrayList<>(); - int rowCount = 0; + long rowCount = 0; + int factor = 1; while (!resultYielder.isDone()) { results.add(resultYielder.get()); resultYielder = resultYielder.next(null); @@ -92,6 +96,18 @@ public static MSQResultsReport createReportAndLimitRowsIfNeeded( if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) { break; } + if (rowCount % (Math.min(32, factor) * Limits.MAX_SELECT_RESULT_ROWS) == 0) { + log.warn( + "Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. " + + "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. " + + "If you require all the results, consider setting [%s=%s] in the query context which will allow you to fetch large result sets.", + rowCount, + Limits.MAX_SELECT_RESULT_ROWS, + MultiStageQueryContext.CTX_SELECT_DESTINATION, + MSQSelectDestination.DURABLESTORAGE.getName() + ); + factor = factor * 2; + } } return new MSQResultsReport( signature, From 463b353d51d468cc20d0c37104a85dd9a1066a78 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 12 Mar 2024 23:43:55 +0530 Subject: [PATCH 3/3] Handling number overflow. --- .../apache/druid/msq/indexing/report/MSQResultsReport.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java index e6eb9681f61c..b96ce469145e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java @@ -96,7 +96,7 @@ public static MSQResultsReport createReportAndLimitRowsIfNeeded( if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) { break; } - if (rowCount % (Math.min(32, factor) * Limits.MAX_SELECT_RESULT_ROWS) == 0) { + if (rowCount % (factor * Limits.MAX_SELECT_RESULT_ROWS) == 0) { log.warn( "Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. " + "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. " @@ -106,7 +106,7 @@ public static MSQResultsReport createReportAndLimitRowsIfNeeded( MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.getName() ); - factor = factor * 2; + factor = factor < 32 ? factor * 2 : 32; } } return new MSQResultsReport(