Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@
import org.apache.druid.msq.indexing.WorkerCount;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
Expand Down Expand Up @@ -1828,9 +1826,9 @@ private static QueryDefinition makeQueryDefinition(
);

return builder.build();
} else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
} else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) {
return queryDef;
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
} else if (MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) {

// attaching new query results stage if the final stage does sort during shuffle so that results are ordered.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
Expand Down Expand Up @@ -2933,12 +2931,12 @@ private void startQueryResultsReader()

final InputChannelFactory inputChannelFactory;

if (queryKernelConfig.isDurableStorage() || MSQControllerTask.writeResultsToDurableStorage(querySpec)) {
if (queryKernelConfig.isDurableStorage() || MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) {
inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation(
queryId(),
MSQTasks.makeStorageConnector(context.injector()),
closer,
MSQControllerTask.writeResultsToDurableStorage(querySpec)
MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)
);
} else {
inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> taskIds);
Expand Down
Comment thread
adarshsanjeev marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
Expand Down Expand Up @@ -305,16 +306,38 @@ public Optional<Resource> getDestinationResource()
return querySpec.getDestination().getDestinationResource();
}

/**
* Checks whether the task is an ingestion into a Druid datasource.
*/
public static boolean isIngestion(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DataSourceMSQDestination;
}

/**
* Checks whether the task is an export into external files.
*/
public static boolean isExport(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof ExportMSQDestination;
}

/**
* Checks whether the task is an async query which writes frame files containing the final results into durable storage.
*/
public static boolean writeFinalStageResultsToDurableStorage(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;
}

/**
* Checks whether the task is an async query which writes frame files containing the final results into durable storage.
*/
public static boolean writeFinalResultsToTaskReport(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof TaskReportMSQDestination;
}

/**
* Returns true if the task reads from the same table as the destination. In this case, we would prefer to fail
* instead of reading any unused segments to ensure that old data is not read.
Expand All @@ -330,11 +353,6 @@ public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec)
}
}

public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;
}

@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,14 @@ public QueryDefinition makeQueryDefinition(
);

if (doLimitOrOffset) {
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
Comment thread
adarshsanjeev marked this conversation as resolved.
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(null) // no shuffling should be required after a limit processor.
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
Expand Down Expand Up @@ -224,12 +225,13 @@ public QueryDefinition makeQueryDefinition(
);
if (doLimitOrOffset) {
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(null)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.msq.querykit.DataSourcePlan;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
Expand Down Expand Up @@ -111,69 +112,77 @@ public QueryDefinition makeQueryDefinition(
final ScanQuery queryToRun = originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final RowSignature scanSignature = getAndValidateSignature(queryToRun, jsonMapper);
final ShuffleSpec shuffleSpec;
final RowSignature signatureToUse;
final boolean hasLimitOrOffset = queryToRun.isLimited() || queryToRun.getScanRowsOffset() > 0;

final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature);
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
final List<KeyColumn> clusterByColumns = new ArrayList<>();

// Add regular orderBys.
for (final ScanQuery.OrderBy orderBy : queryToRun.getOrderBys()) {
clusterByColumns.add(
new KeyColumn(
orderBy.getColumnName(),
orderBy.getOrder() == ScanQuery.Order.DESCENDING ? KeyOrder.DESCENDING : KeyOrder.ASCENDING
)
);
}

// We ignore the resultShuffleSpecFactory in case:
// 1. There is no cluster by
// 2. There is an offset which means everything gets funneled into a single partition hence we use MaxCountShuffleSpec
if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset) {
shuffleSpec = MixShuffleSpec.instance();
signatureToUse = scanSignature;
} else {
final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature);
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
final List<KeyColumn> clusterByColumns = new ArrayList<>();

// Add regular orderBys.
for (final ScanQuery.OrderBy orderBy : queryToRun.getOrderBys()) {
clusterByColumns.add(
new KeyColumn(
orderBy.getColumnName(),
orderBy.getOrder() == ScanQuery.Order.DESCENDING ? KeyOrder.DESCENDING : KeyOrder.ASCENDING
)
);
}

// Update partition by of next window
final RowSignature signatureSoFar = signatureBuilder.build();
boolean addShuffle = true;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
for (KeyColumn c : windowClusterBy.getColumns()) {
if (!signatureSoFar.contains(c.columnName())) {
addShuffle = false;
break;
}
}
if (addShuffle) {
clusterByColumns.addAll(windowClusterBy.getColumns());
// Update partition by of next window
final RowSignature signatureSoFar = signatureBuilder.build();
boolean addShuffle = true;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
for (KeyColumn c : windowClusterBy.getColumns()) {
if (!signatureSoFar.contains(c.columnName())) {
addShuffle = false;
break;
}
} else {
// Add partition boosting column.
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
}
if (addShuffle) {
clusterByColumns.addAll(windowClusterBy.getColumns());
}
} else {
// Add partition boosting column.
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
}

final ClusterBy clusterBy =
QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(clusterBy, false);

final ClusterBy clusterBy =
QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
shuffleSpec = resultShuffleSpecFactory.build(clusterBy, false);
signatureToUse = QueryKitUtils.sortableSignature(
QueryKitUtils.signatureWithSegmentGranularity(signatureBuilder.build(), segmentGranularity),
clusterBy.getColumns()
);
final RowSignature signatureToUse = QueryKitUtils.sortableSignature(
QueryKitUtils.signatureWithSegmentGranularity(signatureBuilder.build(), segmentGranularity),
clusterBy.getColumns()
);

ShuffleSpec scanShuffleSpec;
if (!hasLimitOrOffset) {
// If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected.
scanShuffleSpec = finalShuffleSpec;
} else {
// If there is a limit spec, check if there are any non-boost columns to sort in.
boolean requiresSort = clusterByColumns.stream()
Copy link
Copy Markdown
Contributor

@LakshSingla LakshSingla Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to see if we are sorting on the non-boost column? Isn't that automatically added by the ScanQueryKit et al? Maybe we can simplify the condition by checking if the orderBy is non-empty (before adding any boosting)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a if-branch for window functions above this bit of code, which adds its own cluster bys, which are not dependent on the orderBy.

.anyMatch(keyColumn -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(keyColumn.columnName()));
if (requiresSort) {
// If yes, do a sort into a single partition.
scanShuffleSpec = ShuffleSpecFactories.singlePartition().build(clusterBy, false);
} else {
// If the only clusterBy column is the boost column, we just use a mix shuffle to avoid unused shuffling.
// Note that we still need the boost column to be present in the row signature, since the limit stage would
// need it to be populated to do its own shuffling later.
Comment on lines +175 to +176
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that the limit factory couldn't partition boost. Am I mistaken?

Copy link
Copy Markdown
Contributor

@LakshSingla LakshSingla Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otoh, if there isn't any partition boosting, then even the original code would have run into similar problem if there wasn't a cluster key right - too large partitions

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit factory does not increment the partition boosting at regular intervals. The value would be all 0 if it was not added to the row signature at the scan stage.

even the original code would have run into similar problem

The original code would have mix shuffle speced. It would always make a single partition, so yes, it should have been too large.

scanShuffleSpec = MixShuffleSpec.instance();
}
}

queryDefBuilder.add(
StageDefinition.builder(Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()))
.inputs(dataSourcePlan.getInputSpecs())
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
.shuffleSpec(shuffleSpec)
.shuffleSpec(scanShuffleSpec)
.signature(signatureToUse)
.maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount)
.processorFactory(new ScanQueryFrameProcessorFactory(queryToRun))
Expand All @@ -185,7 +194,7 @@ public QueryDefinition makeQueryDefinition(
.inputs(new StageInputSpec(firstStageNumber))
.signature(signatureToUse)
.maxWorkerCount(1)
.shuffleSpec(null) // no shuffling should be required after a limit processor.
.shuffleSpec(finalShuffleSpec) // Apply the final shuffling after limit spec.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a lot of shuffling. Is there any way we can avoid reshuffling the data by the same cluster by, and just repartition? Perhaps not without any supersorter changes, but I wanted to confirm.

Copy link
Copy Markdown
Contributor

@LakshSingla LakshSingla Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well maybe we can preserve the optimisation that we had earlier - if there's no orderBy and if doLimitOrOffset == true, we don't need to partition boost the intermediate shuffleSpec.

Actually, we don't need to partitionBoost the intermediate shuffle spec in any case (i.e. the shuffleSpec for the scan stage if there's a limit present) - Since it's all going into a single partition anyway, the partitionBoost won't have any use.
i.e. If there's a limit present, only the final stage should have partition boosting.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial ScanFP is the one who increments the boost column, if we do not apply the boost column at that stage, the limit processor output would have boost columns of 0, which can't be split.
I guess additional changes would be needed to allow boosting to work with LimitProcessors before this optimization can be made.

Comment thread
adarshsanjeev marked this conversation as resolved.
.processorFactory(
new OffsetLimitFrameProcessorFactory(
queryToRun.getScanRowsOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,52 @@ private List<String> readResultsFromFile(File resultFile) throws IOException
}
}

@Test
public void testExportWithLimit() throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();

File exportDir = newTempFolder("export");

Map<String, Object> queryContext = new HashMap<>(DEFAULT_MSQ_CONTEXT);
queryContext.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 1);

final String sql = StringUtils.format("insert into extern(local(exportPath=>'%s')) as csv select cnt, dim1 from foo limit 3", exportDir.getAbsolutePath());

testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
.setQueryContext(queryContext)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();

Assert.assertEquals(
ImmutableList.of(
"cnt,dim1",
"1,"
),
readResultsFromFile(new File(exportDir, "query-test-query-worker0-partition0.csv"))
);
Assert.assertEquals(
ImmutableList.of(
"cnt,dim1",
"1,10.1"
),
readResultsFromFile(new File(exportDir, "query-test-query-worker0-partition1.csv"))
);
Assert.assertEquals(
ImmutableList.of(
"cnt,dim1",
"1,2"
),
readResultsFromFile(new File(exportDir, "query-test-query-worker0-partition2.csv"))
);
}

private void verifyManifestFile(File exportDir, List<File> resultFiles) throws IOException
{
final File manifestFile = new File(exportDir, ExportMetadataManager.MANIFEST_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
Expand Down Expand Up @@ -291,7 +292,7 @@ public void testInsertWithTooManyPartitions() throws IOException
{
Map<String, Object> context = ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put("rowsPerSegment", 1)
.put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 1)
.build();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ public void testInsertOffsetThrowsException(String contextName, Map<String, Obje
+ "SELECT __time, m1 "
+ "FROM foo "
+ "LIMIT 50 "
+ "OFFSET 10"
+ "OFFSET 10 "
+ "PARTITIONED BY ALL TIME")
.setExpectedValidationErrorMatcher(
invalidSqlContains("INSERT and REPLACE queries cannot have an OFFSET")
Expand All @@ -1464,6 +1464,44 @@ public void testInsertOffsetThrowsException(String contextName, Map<String, Obje
.verifyPlanningErrors();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertOnFoo1WithLimit(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.
{
Map<String, Object> queryContext = ImmutableMap.<String, Object>builder()
.putAll(context)
.put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2)
.build();

List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946771200000L, "10.1", 1L},
new Object[]{978307200000L, "1", 1L},
new Object[]{946857600000L, "2", 1L},
new Object[]{978480000000L, "abc", 1L}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.build();

testIngestQuery().setSql(
"insert into foo1 select __time, dim1, cnt from foo where dim1 != '' limit 4 partitioned by all clustered by dim1")
.setExpectedDataSource("foo1")
.setQueryContext(queryContext)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0), SegmentId.of("foo1", Intervals.ETERNITY, "test", 1)))
.setExpectedResultRows(expectedRows)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Using NumberedShardSpec to generate segments since the query is inserting rows."
)
)
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit(String contextName, Map<String, Object> context) throws IOException
Expand Down
Loading