Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,27 @@ public TaskStatus runTask(final Closer closer)
log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError));
}
}

MSQResultsReport resultsReport = null;
if (queryKernel != null && queryKernel.isSuccess()) {
// If successful, encourage the tasks to exit successfully.
// get results before posting finish to the tasks.
if (resultsYielder != null) {
resultsReport = makeResultsTaskReport(
queryDef,
resultsYielder,
task.getQuerySpec().getColumnMappings(),
task.getSqlTypeNames(),
MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
);
try {
resultsYielder.close();
}
catch (IOException e) {
throw new RuntimeException("Unable to fetch results of various worker tasks successfully", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be DruidException

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can probably refactor this part of a another PR.

}
} else {
resultsReport = null;
}
postFinishToAllTasks();
workerTaskLauncher.stop(false);
} else {
Expand Down Expand Up @@ -509,7 +527,6 @@ public TaskStatus runTask(final Closer closer)
try {
// Write report even if something went wrong.
final MSQStagesReport stagesReport;
final MSQResultsReport resultsReport;

if (queryDef != null) {
final Map<Integer, ControllerStagePhase> stagePhaseMap;
Expand Down Expand Up @@ -538,18 +555,6 @@ public TaskStatus runTask(final Closer closer)
stagesReport = null;
}

if (resultsYielder != null) {
resultsReport = makeResultsTaskReport(
queryDef,
resultsYielder,
task.getQuerySpec().getColumnMappings(),
task.getSqlTypeNames(),
MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
);
} else {
resultsReport = null;
}

final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload(
makeStatusReport(
taskStateForReport,
Expand All @@ -564,7 +569,6 @@ public TaskStatus runTask(final Closer closer)
countersSnapshot,
resultsReport
);

context.writeReports(
id(),
TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;

import javax.annotation.Nullable;
Expand All @@ -39,6 +41,7 @@

public class MSQResultsReport
{
private static final Logger log = new Logger(MSQResultsReport.class);
/**
* Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility
* with SQL (which also allows duplicate column names in query results).
Expand Down Expand Up @@ -83,18 +86,35 @@ public static MSQResultsReport createReportAndLimitRowsIfNeeded(
MSQSelectDestination selectDestination
)
{
if (selectDestination.shouldTruncateResultsInTaskReport()) {
List<Object[]> results = new ArrayList<>();
int rowCount = 0;
while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) {
results.add(resultYielder.get());
resultYielder = resultYielder.next(null);
++rowCount;
List<Object[]> results = new ArrayList<>();
long rowCount = 0;
int factor = 1;
while (!resultYielder.isDone()) {
results.add(resultYielder.get());
resultYielder = resultYielder.next(null);
++rowCount;
if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) {
break;
}
if (rowCount % (factor * Limits.MAX_SELECT_RESULT_ROWS) == 0) {
log.warn(
"Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. "
+ "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. "
+ "If you require all the results, consider setting [%s=%s] in the query context which will allow you to fetch large result sets.",
rowCount,
Limits.MAX_SELECT_RESULT_ROWS,
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLESTORAGE.getName()
);
factor = factor < 32 ? factor * 2 : 32;
}
return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), !resultYielder.isDone());
} else {
return new MSQResultsReport(signature, sqlTypeNames, resultYielder, false);
}
return new MSQResultsReport(
signature,
sqlTypeNames,
Yielders.each(Sequences.simple(results)),
!resultYielder.isDone()
);
}

@JsonProperty("signature")
Expand Down