From d8bdc2a2e5e4727b6ada67cc9895fee8024147b0 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 8 Jul 2024 15:07:44 +0530 Subject: [PATCH 01/13] Add a segmentMorphFactory to MSQ. --- .../apache/druid/msq/exec/ControllerImpl.java | 64 ++++++++++++++++++- .../destination/DataSourceMSQDestination.java | 40 +++++++++++- .../druid/msq/sql/MSQTaskQueryMaker.java | 3 +- .../msq/exec/MSQParseExceptionsTest.java | 4 +- .../msq/indexing/MSQControllerTaskTest.java | 3 +- .../resources/SqlStatementResourceTest.java | 1 + .../util/SqlStatementResourceHelperTest.java | 1 + 7 files changed, 106 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index ad37c5380c56..f992be2c7846 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -146,6 +146,7 @@ import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.input.stage.StageInputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.StageDefinition; @@ -1193,8 +1194,35 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( { if (MSQControllerTask.isIngestion(querySpec) && stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) { - // noinspection unchecked,rawtypes - return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate); + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + if (!destination.doesSegmentMorphing()) { + // noinspection unchecked,rawtypes + return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate); + } else { + // worker info is the new lock version + if (destination.getReplaceTimeChunks().size() != 1) { + throw new ISE( + "Must have single interval in replaceTimeChunks, but got[%s]", + destination.getReplaceTimeChunks() + ); + } + try { + final List locks; + locks = context.taskActionClient().submit(new LockListAction()); + if (locks.size() == 1) { + final Int2ObjectMap retVal = new Int2ObjectAVLTreeMap<>(); + for (int worker : workerInputs.workers()) { + retVal.put(worker, locks.get(0).getVersion()); + } + return retVal; + } else { + throw new ISE("Got number of locks other than one: [%s]", locks); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } } else { return null; } @@ -1810,6 +1838,35 @@ private static QueryDefinition makeQueryDefinition( } } + // Possibly add a segment morpher stage. + if (((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) { + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + final FrameProcessorFactory segmentMorphFactory = destination.getSegmentMorphFactory(); + + if (!destination.isReplaceTimeChunks()) { + throw new MSQException(UnknownFault.forMessage("segmentMorphFactory requires replaceTimeChunks")); + } + + builder.add( + StageDefinition.builder(queryDef.getNextStageNumber()) + .inputs( + new TableInputSpec( + destination.getDataSource(), + destination.getReplaceTimeChunks(), + null, + null + ), + new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()) + ) + .broadcastInputs(IntSet.of(1)) + .maxWorkerCount(tuningConfig.getMaxNumWorkers()) + .processorFactory(segmentMorphFactory) + ); + + // If there was a segment morpher, return immediately; don't add a segment-generation stage. + return builder.build(); + } + // Then, add a segment-generation stage. final DataSchema dataSchema = makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); @@ -2723,7 +2780,8 @@ private void startStages() throws IOException, InterruptedException for (final StageId stageId : newStageIds) { // Allocate segments, if this is the final stage of an ingestion. if (MSQControllerTask.isIngestion(querySpec) - && stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()) { + && stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber() + && !((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) { populateSegmentsToGenerate(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java index ea3072bfe45a..78df189cc658 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java @@ -20,12 +20,14 @@ package org.apache.druid.msq.indexing.destination; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.server.security.Resource; @@ -49,18 +51,24 @@ public class DataSourceMSQDestination implements MSQDestination @Nullable private final List replaceTimeChunks; + @Nullable + @SuppressWarnings("rawtypes") + private final FrameProcessorFactory segmentMorphFactory; + @JsonCreator public DataSourceMSQDestination( @JsonProperty("dataSource") String dataSource, @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("segmentSortOrder") @Nullable List segmentSortOrder, - @JsonProperty("replaceTimeChunks") @Nullable List replaceTimeChunks + @JsonProperty("replaceTimeChunks") @Nullable List replaceTimeChunks, + @JsonProperty("segmentMorphFactory") @Nullable FrameProcessorFactory segmentMorphFactory ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity"); this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList(); this.replaceTimeChunks = replaceTimeChunks; + this.segmentMorphFactory = segmentMorphFactory; if (replaceTimeChunks != null) { // Verify that if replaceTimeChunks is provided, it is nonempty. @@ -98,6 +106,30 @@ public String getDataSource() return dataSource; } + /** + * Returns the segment morph factory, if one is present, else null. + *

+ * The segment morph factory if present, is a way to tell the MSQ task to funnel the results at the final stage to + * the {@link FrameProcessorFactory} instead of a segment generation stage. + */ + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public FrameProcessorFactory getSegmentMorphFactory() + { + return segmentMorphFactory; + } + + /** + * Checks if the destination uses a segmentMorphFactory. If one is present, that means that the query would modify + * existing segments instead of generating new ones. + */ + @JsonIgnore + public boolean doesSegmentMorphing() + { + return segmentMorphFactory != null; + } + @JsonProperty public Granularity getSegmentGranularity() { @@ -158,13 +190,14 @@ public boolean equals(Object o) return Objects.equals(dataSource, that.dataSource) && Objects.equals(segmentGranularity, that.segmentGranularity) && Objects.equals(segmentSortOrder, that.segmentSortOrder) - && Objects.equals(replaceTimeChunks, that.replaceTimeChunks); + && Objects.equals(replaceTimeChunks, that.replaceTimeChunks) + && Objects.equals(segmentMorphFactory, that.segmentMorphFactory); } @Override public int hashCode() { - return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks); + return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, segmentMorphFactory); } @Override @@ -175,6 +208,7 @@ public String toString() ", segmentGranularity=" + segmentGranularity + ", segmentSortOrder=" + segmentSortOrder + ", replaceTimeChunks=" + replaceTimeChunks + + (segmentMorphFactory != null ? ", segmentMorphFactory=" + segmentMorphFactory : "") + '}'; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index c6396c0b3060..7af34a1eb55b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -246,7 +246,8 @@ public QueryResponse runQuery(final DruidQuery druidQuery) targetDataSource.getDestinationName(), segmentGranularityObject, segmentSortOrder, - replaceTimeChunks + replaceTimeChunks, + null ); MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext, dataSourceMSQDestination.isReplaceTimeChunks()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java index 330f1cdbbe6d..bc8d517ffba2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java @@ -225,7 +225,7 @@ public void testIngestWithSanitizedNullByte() throws IOException new ColumnMapping("v1", "agent_category") ) )) - .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null)) + .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null)) .tuningConfig(MSQTuningConfig.defaultConfig()) .build()) .setQueryContext(DEFAULT_MSQ_CONTEXT) @@ -318,7 +318,7 @@ public void testIngestWithSanitizedNullByteUsingContextParameter() throws IOExce new ColumnMapping("agent_category", "agent_category") ) )) - .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null)) + .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null)) .tuningConfig(MSQTuningConfig.defaultConfig()) .build()) .setQueryContext(runtimeContext) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 9de14610f19c..bfb307449768 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -58,7 +58,8 @@ public class MSQControllerTaskTest "target", Granularities.DAY, null, - INTERVALS + INTERVALS, + null )) .query(new Druids.ScanQueryBuilder() .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index 2da5fd42caf1..89df8180a5c6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -199,6 +199,7 @@ public class SqlStatementResourceTest extends MSQTestBase "test", Granularities.DAY, null, + null, null )) .tuningConfig( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index 1966d1e5b10a..58856adf3664 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -375,6 +375,7 @@ public void testEmptyCountersForDataSourceDestination() "test", Granularities.DAY, null, + null, null ) ); From a11d9c31317b25c02608957126e8f65a75630ca3 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 10 Jul 2024 16:25:54 +0530 Subject: [PATCH 02/13] Add test --- .../apache/druid/msq/exec/ControllerImpl.java | 9 +- .../druid/msq/sql/MSQTaskQueryMaker.java | 10 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 19 +- .../msq/sql/SegmentMorphFactoryCreator.java | 30 +++ .../apache/druid/msq/exec/MSQReplaceTest.java | 4 - .../druid/msq/exec/MSQSegmentMorphTest.java | 90 +++++++++ ...stSegmentMorpherFrameProcessorFactory.java | 186 ++++++++++++++++++ .../apache/druid/msq/test/MSQTestBase.java | 7 +- .../MSQTestDelegateDataSegmentPusher.java | 2 +- .../test/TestSegmentMorphFactoryCreator.java | 41 ++++ 10 files changed, 381 insertions(+), 17 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index f992be2c7846..66cf2f986d6c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1201,10 +1201,9 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( } else { // worker info is the new lock version if (destination.getReplaceTimeChunks().size() != 1) { - throw new ISE( - "Must have single interval in replaceTimeChunks, but got[%s]", - destination.getReplaceTimeChunks() - ); + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Must have single interval in replaceTimeChunks, but got [%s]", destination.getReplaceTimeChunks()); } try { final List locks; @@ -1216,7 +1215,7 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( } return retVal; } else { - throw new ISE("Got number of locks other than one: [%s]", locks); + throw DruidException.defensive("Got number of locks other than one: [%s]", locks); } } catch (IOException e) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 7af34a1eb55b..0995a20f31f2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -92,6 +92,7 @@ public class MSQTaskQueryMaker implements QueryMaker private final PlannerContext plannerContext; private final ObjectMapper jsonMapper; private final List> fieldMapping; + private final SegmentMorphFactoryCreator segmentMorphFactoryCreator; MSQTaskQueryMaker( @@ -99,7 +100,8 @@ public class MSQTaskQueryMaker implements QueryMaker final OverlordClient overlordClient, final PlannerContext plannerContext, final ObjectMapper jsonMapper, - final List> fieldMapping + final List> fieldMapping, + final SegmentMorphFactoryCreator segmentMorphFactoryCreator ) { this.targetDataSource = targetDataSource; @@ -107,6 +109,7 @@ public class MSQTaskQueryMaker implements QueryMaker this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext"); this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); this.fieldMapping = Preconditions.checkNotNull(fieldMapping, "fieldMapping"); + this.segmentMorphFactoryCreator = Preconditions.checkNotNull(segmentMorphFactoryCreator, "segmentMorphFactoryCreator"); } @Override @@ -247,7 +250,10 @@ public QueryResponse runQuery(final DruidQuery druidQuery) segmentGranularityObject, segmentSortOrder, replaceTimeChunks, - null + segmentMorphFactoryCreator == null ? null : segmentMorphFactoryCreator.createSegmentMorphFactory( + druidQuery, + plannerContext + ) ); MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext, dataSourceMSQDestination.isReplaceTimeChunks()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index e3baa058b41a..2b886ca1e0c8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -82,15 +82,26 @@ public class MSQTaskSqlEngine implements SqlEngine private final OverlordClient overlordClient; private final ObjectMapper jsonMapper; + private final SegmentMorphFactoryCreator segmentMorphFactoryCreator; - @Inject public MSQTaskSqlEngine( final OverlordClient overlordClient, final ObjectMapper jsonMapper ) + { + this(overlordClient, jsonMapper, null); + } + + @Inject + public MSQTaskSqlEngine( + final OverlordClient overlordClient, + final ObjectMapper jsonMapper, + final SegmentMorphFactoryCreator segmentMorphFactoryCreator + ) { this.overlordClient = overlordClient; this.jsonMapper = jsonMapper; + this.segmentMorphFactoryCreator = segmentMorphFactoryCreator; } @Override @@ -159,7 +170,8 @@ public QueryMaker buildQueryMakerForSelect( overlordClient, plannerContext, jsonMapper, - relRoot.fields + relRoot.fields, + segmentMorphFactoryCreator ); } @@ -193,7 +205,8 @@ public QueryMaker buildQueryMakerForInsert( overlordClient, plannerContext, jsonMapper, - relRoot.fields + relRoot.fields, + segmentMorphFactoryCreator ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java new file mode 100644 index 000000000000..42021df1ca9b --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.sql; + +import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.DruidQuery; + +public interface SegmentMorphFactoryCreator +{ + @SuppressWarnings("rawtypes") + FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext); +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 7d7f4e310c62..6378b384956b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -942,7 +942,6 @@ public void testReplaceTimeChunksLargerThanData(String contextName, Map= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' " + "PARTITIONED BY MONTH") .setExpectedDataSource("foo") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedRowSignature(rowSignature) .setQueryContext(context) .setExpectedDestinationIntervals(Collections.singletonList(Intervals.of("2000-01-01T/2002-01-01T"))) @@ -998,7 +997,6 @@ public void testReplaceAllOverEternitySegment(String contextName, Map= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' " + "PARTITIONED BY MONTH") .setExpectedDataSource("foo") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedRowSignature(rowSignature) .setQueryContext(context) .setExpectedDestinationIntervals(Collections.singletonList(Intervals.ETERNITY)) @@ -1045,7 +1043,6 @@ public void testReplaceOnFoo1Range(String contextName, Map conte "REPLACE INTO foo1 OVERWRITE ALL " + "select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1") .setExpectedDataSource("foo1") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedMSQSegmentReport( new MSQSegmentReport( @@ -1093,7 +1090,6 @@ public void testReplaceOnFoo1RangeClusteredBySubset(String contextName, Map= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-04'" + + "SELECT __time, m1 " + + "FROM foo " + + "PARTITIONED BY DAY ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("1970-01-01T00:00:00.000Z/2001-01-03T00:00:00.001Z"))) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedResultRows(ImmutableList.of()) + .setExpectedDestinationIntervals(ImmutableList.of(Intervals.of("2000-01-01T/2000-01-04T"))) + .verifyResults(); + } + + @Test + public void testSegmentMorphFactoryWithMultipleReplaceTimeChunks() + { + testSegmentMorphFactoryCreator.setFrameProcessorFactory(new TestSegmentMorpherFrameProcessorFactory()); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("m1", ColumnType.FLOAT) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-04' OR __time >= TIMESTAMP '2001-01-01' AND __time < TIMESTAMP '2001-01-04'" + + "SELECT __time, m1 " + + "FROM foo " + + "PARTITIONED BY DAY ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedExecutionErrorMatcher( + CoreMatchers.allOf( + CoreMatchers.instanceOf(ISE.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString("Must have single interval in replaceTimeChunks, " + + "but got [[2000-01-01T00:00:00.000Z/2000-01-04T00:00:00.000Z, " + + "2001-01-01T00:00:00.000Z/2001-01-04T00:00:00.000Z]]") + ) + ) + ) + .verifyExecutionError(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java new file mode 100644 index 000000000000..7f1d92d02752 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableMap; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.OutputChannelFactory; +import org.apache.druid.frame.processor.OutputChannels; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.processor.manager.ProcessorManager; +import org.apache.druid.frame.processor.manager.ProcessorManagers; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.msq.input.ReadableInputs; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.kernel.ExtraInfoHolder; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.msq.kernel.ProcessorsAndChannels; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.test.MSQTestTaskActionClient; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.Assert; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +/** + * A segment morpher which does nothing but updates the version of the segment by touching it. + */ +@JsonTypeName("testSegmentMorpherFrameProcessorFactory") +public class TestSegmentMorpherFrameProcessorFactory implements FrameProcessorFactory, Object> +{ + @Override + public ProcessorsAndChannels> makeProcessors( + StageDefinition stageDefinition, + int workerNumber, + List inputSlices, + InputSliceReader inputSliceReader, + @Nullable Object extra, + OutputChannelFactory outputChannelFactory, + FrameContext frameContext, + int maxOutstandingProcessors, + CounterTracker counters, + Consumer warningPublisher, + boolean removeNullBytes + ) + { + if (inputSlices.get(0) instanceof NilInputSlice) { + return new ProcessorsAndChannels<>( + ProcessorManagers.of(Sequences.empty()) + .withAccumulation(new HashSet<>(), (acc, segment) -> acc), + OutputChannels.none() + ); + } + + final SegmentsInputSlice segmentsSlice = (SegmentsInputSlice) inputSlices.get(0); + final ReadableInputs segments = + inputSliceReader.attach(0, segmentsSlice, counters, warningPublisher); + + Sequence sequence = Sequences.simple(segments) + .map(segment -> { + final RichSegmentDescriptor descriptor = segment.getSegment() + .getDescriptor(); + final SegmentId sourceSegmentId = SegmentId.of( + segmentsSlice.getDataSource(), + descriptor.getFullInterval(), + descriptor.getVersion(), + descriptor.getPartitionNumber() + ); + + final DataSegment newDataSegment = new DataSegment( + sourceSegmentId.getDataSource(), + sourceSegmentId.getInterval(), + MSQTestTaskActionClient.VERSION, + ImmutableMap.of(), + Collections.emptyList(), + Collections.emptyList(), + new NumberedShardSpec(0, 1), + null, + 0 + ); + return new MorphFrameProcessor(newDataSegment); + }); + + final ProcessorManager> processorManager = + ProcessorManagers.of(sequence) + .withAccumulation( + new HashSet<>(), + (set, segment) -> { + set.add(segment); + return set; + } + ); + return new ProcessorsAndChannels<>(processorManager, OutputChannels.none()); + } + + @Nullable + @Override + public TypeReference> getResultTypeReference() + { + return new TypeReference>() {}; + } + + @Override + public Set mergeAccumulatedResult(Set accumulated, Set otherAccumulated) + { + return Collections.emptySet(); + } + + @Override + public ExtraInfoHolder makeExtraInfoHolder(@Nullable Object extra) + { + Assert.assertEquals(MSQTestTaskActionClient.VERSION, extra); + return new ExtraInfoHolder(extra) + { + }; + } + + private static class MorphFrameProcessor implements FrameProcessor + { + private final DataSegment segment; + + public MorphFrameProcessor(DataSegment segment) + { + this.segment = segment; + } + + @Override + public List inputChannels() + { + return Collections.emptyList(); + } + + @Override + public List outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait runIncrementally(IntSet readableInputs) + { + return ReturnOrAwait.returnObject(segment); + } + + @Override + public void cleanup() + { + } + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index f7c9b3296cab..a81dce42a899 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -86,6 +86,7 @@ import org.apache.druid.msq.exec.DataServerQueryHandler; import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.exec.ResultsContext; +import org.apache.druid.msq.exec.TestSegmentMorpherFrameProcessorFactory; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.guice.MSQDurableStorageModule; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; @@ -208,7 +209,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -328,6 +328,7 @@ public class MSQTestBase extends BaseCalciteQueryTest protected List loadedSegmentsMetadata = new ArrayList<>(); // Mocks the return of data from data servers protected DataServerQueryHandler dataServerQueryHandler = mock(DataServerQueryHandler.class); + protected TestSegmentMorphFactoryCreator testSegmentMorphFactoryCreator = new TestSegmentMorphFactoryCreator(); private MSQTestSegmentManager segmentManager; private SegmentCacheManager segmentCacheManager; @@ -557,7 +558,8 @@ public String getFormatString() final SqlEngine engine = new MSQTaskSqlEngine( indexingServiceClient, - qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()) + qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()), + testSegmentMorphFactoryCreator ); PlannerFactory plannerFactory = new PlannerFactory( @@ -746,6 +748,7 @@ public static ObjectMapper setupObjectMapper(Injector injector) DruidSecondaryModule.setupJackson(injector, mapper); mapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); + mapper.registerSubtypes(new NamedType(TestSegmentMorpherFrameProcessorFactory.class, "testSegmentMorpher")); // This should be reusing guice instead of using static classes InsertLockPreemptedFaultTest.LockPreemptedHelper.preempt(false); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java index 61a4389e93d9..73fca53682c6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java @@ -40,7 +40,7 @@ public MSQTestDelegateDataSegmentPusher( MSQTestSegmentManager segmentManager ) { - delegate = dataSegmentPusher; + this.delegate = dataSegmentPusher; this.segmentManager = segmentManager; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java new file mode 100644 index 000000000000..38ed3c7723eb --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.msq.sql.SegmentMorphFactoryCreator; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.DruidQuery; + +public class TestSegmentMorphFactoryCreator implements SegmentMorphFactoryCreator +{ + private FrameProcessorFactory frameProcessorFactory; + + public void setFrameProcessorFactory(FrameProcessorFactory frameProcessorFactory) + { + this.frameProcessorFactory = frameProcessorFactory; + } + + @Override + public FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext) + { + return frameProcessorFactory; + } +} From 51a619b459343b47c05fe045468302a876f1d43b Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 10 Jul 2024 23:34:38 +0530 Subject: [PATCH 03/13] Make argument nullable --- .../main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 0995a20f31f2..547e60c04b15 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -101,7 +101,7 @@ public class MSQTaskQueryMaker implements QueryMaker final PlannerContext plannerContext, final ObjectMapper jsonMapper, final List> fieldMapping, - final SegmentMorphFactoryCreator segmentMorphFactoryCreator + @Nullable final SegmentMorphFactoryCreator segmentMorphFactoryCreator ) { this.targetDataSource = targetDataSource; @@ -109,7 +109,7 @@ public class MSQTaskQueryMaker implements QueryMaker this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext"); this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); this.fieldMapping = Preconditions.checkNotNull(fieldMapping, "fieldMapping"); - this.segmentMorphFactoryCreator = Preconditions.checkNotNull(segmentMorphFactoryCreator, "segmentMorphFactoryCreator"); + this.segmentMorphFactoryCreator = segmentMorphFactoryCreator; } @Override From c2af5b88addac913d4071743536696b0df47b62d Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 11 Jul 2024 13:06:50 +0530 Subject: [PATCH 04/13] Fix Guice issues --- .../apache/druid/msq/guice/MSQSqlModule.java | 18 ++++++++++++++++++ .../druid/msq/sql/MSQTaskQueryMaker.java | 4 ++-- .../msq/sql/SegmentMorphFactoryCreator.java | 9 +++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java index ea6eb364cece..516573adb5aa 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java @@ -27,12 +27,17 @@ import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.initialization.DruidModule; import org.apache.druid.metadata.input.InputSourceModule; +import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.msq.sql.SegmentMorphFactoryCreator; import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.guice.SqlBindings; +import javax.annotation.Nullable; import java.util.List; /** @@ -54,6 +59,19 @@ public void configure(Binder binder) // We want this module to bring InputSourceModule along for the ride. binder.install(new InputSourceModule()); + // Currently, there are no supported segment morph factories, so bind an implementation which always + // returns an empty morph factory. + binder.bind(SegmentMorphFactoryCreator.class).toInstance( + new SegmentMorphFactoryCreator() + { + @Nullable + @Override + public FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext) + { + return null; + } + }); + binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class); // Set up the EXTERN macro. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 547e60c04b15..bc51bf28d862 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -101,7 +101,7 @@ public class MSQTaskQueryMaker implements QueryMaker final PlannerContext plannerContext, final ObjectMapper jsonMapper, final List> fieldMapping, - @Nullable final SegmentMorphFactoryCreator segmentMorphFactoryCreator + final SegmentMorphFactoryCreator segmentMorphFactoryCreator ) { this.targetDataSource = targetDataSource; @@ -250,7 +250,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) segmentGranularityObject, segmentSortOrder, replaceTimeChunks, - segmentMorphFactoryCreator == null ? null : segmentMorphFactoryCreator.createSegmentMorphFactory( + segmentMorphFactoryCreator.createSegmentMorphFactory( druidQuery, plannerContext ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java index 42021df1ca9b..516e9deb0699 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java @@ -23,8 +23,17 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; +import javax.annotation.Nullable; + public interface SegmentMorphFactoryCreator { + /** + * Creates a {@link FrameProcessorFactory} which acts as a final stage, if the {@link DruidQuery} requires one. This + * is generally the case with ingest queries which do not want to create new segments, but rather modify existing + * segments in some way. + * If no segment morphing stage is required, returns null. + */ + @Nullable @SuppressWarnings("rawtypes") FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext); } From 0df3bc68d09a0d8367cf0bf4b1e44d85371a8ae1 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sun, 14 Jul 2024 22:58:48 +0530 Subject: [PATCH 05/13] Merge with master --- .../org/apache/druid/msq/indexing/MSQCompactionRunner.java | 3 ++- .../apache/druid/msq/indexing/MSQCompactionRunnerTest.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index ac43e7c864b8..c01ec1d838c8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -234,7 +234,8 @@ private static DataSourceMSQDestination buildMSQDestination( dataSchema.getDataSource(), dataSchema.getGranularitySpec().getSegmentGranularity(), null, - ImmutableList.of(replaceInterval) + ImmutableList.of(replaceInterval), + null ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 6f1a4396ada9..aceb9b0d2d22 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -275,7 +275,8 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce DATA_SOURCE, SEGMENT_GRANULARITY.getDefaultGranularity(), null, - Collections.singletonList(COMPACTION_INTERVAL) + Collections.singletonList(COMPACTION_INTERVAL), + null ), actualMSQSpec.getDestination() ); @@ -340,7 +341,8 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess DATA_SOURCE, SEGMENT_GRANULARITY.getDefaultGranularity(), null, - Collections.singletonList(COMPACTION_INTERVAL) + Collections.singletonList(COMPACTION_INTERVAL), + null ), actualMSQSpec.getDestination() ); From afb88e0721f9f0d65f3b4c641d5bb100f2cf846d Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 15 Jul 2024 20:03:21 +0530 Subject: [PATCH 06/13] Remove extra information --- .../apache/druid/msq/exec/ControllerImpl.java | 26 +------------------ 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 4b003fa824d1..dc9a62eb8503 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1197,33 +1197,9 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( if (!destination.doesSegmentMorphing()) { // noinspection unchecked,rawtypes return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate); - } else { - // worker info is the new lock version - if (destination.getReplaceTimeChunks().size() != 1) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build("Must have single interval in replaceTimeChunks, but got [%s]", destination.getReplaceTimeChunks()); - } - try { - final List locks; - locks = context.taskActionClient().submit(new LockListAction()); - if (locks.size() == 1) { - final Int2ObjectMap retVal = new Int2ObjectAVLTreeMap<>(); - for (int worker : workerInputs.workers()) { - retVal.put(worker, locks.get(0).getVersion()); - } - return retVal; - } else { - throw DruidException.defensive("Got number of locks other than one: [%s]", locks); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } } - } else { - return null; } + return null; } @SuppressWarnings("rawtypes") From 6591107a3530f9b3b28f5604c635eb7266022594 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 15 Jul 2024 21:10:26 +0530 Subject: [PATCH 07/13] Fix tests --- .../druid/msq/exec/MSQSegmentMorphTest.java | 34 ------------------- ...stSegmentMorpherFrameProcessorFactory.java | 1 - 2 files changed, 35 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java index 71bf18cedb23..22979e36c371 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java @@ -21,13 +21,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.hamcrest.CoreMatchers; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.jupiter.api.Test; public class MSQSegmentMorphTest extends MSQTestBase @@ -56,35 +53,4 @@ public void testSegmentMorphFactory() .setExpectedDestinationIntervals(ImmutableList.of(Intervals.of("2000-01-01T/2000-01-04T"))) .verifyResults(); } - - @Test - public void testSegmentMorphFactoryWithMultipleReplaceTimeChunks() - { - testSegmentMorphFactoryCreator.setFrameProcessorFactory(new TestSegmentMorpherFrameProcessorFactory()); - - RowSignature rowSignature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("m1", ColumnType.FLOAT) - .build(); - - testIngestQuery().setSql(" REPLACE INTO foo " - + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-04' OR __time >= TIMESTAMP '2001-01-01' AND __time < TIMESTAMP '2001-01-04'" - + "SELECT __time, m1 " - + "FROM foo " - + "PARTITIONED BY DAY ") - .setExpectedDataSource("foo") - .setExpectedRowSignature(rowSignature) - .setQueryContext(DEFAULT_MSQ_CONTEXT) - .setExpectedExecutionErrorMatcher( - CoreMatchers.allOf( - CoreMatchers.instanceOf(ISE.class), - ThrowableMessageMatcher.hasMessage( - CoreMatchers.containsString("Must have single interval in replaceTimeChunks, " - + "but got [[2000-01-01T00:00:00.000Z/2000-01-04T00:00:00.000Z, " - + "2001-01-01T00:00:00.000Z/2001-01-04T00:00:00.000Z]]") - ) - ) - ) - .verifyExecutionError(); - } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java index 7f1d92d02752..39378ea9a056 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java @@ -145,7 +145,6 @@ public Set mergeAccumulatedResult(Set accumulated, Set @Override public ExtraInfoHolder makeExtraInfoHolder(@Nullable Object extra) { - Assert.assertEquals(MSQTestTaskActionClient.VERSION, extra); return new ExtraInfoHolder(extra) { }; From 021a52754413e9c0d3e651e2c72f1a67177440ba Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 16 Jul 2024 11:39:32 +0530 Subject: [PATCH 08/13] Create a utils class --- .../apache/druid/msq/exec/ControllerImpl.java | 117 +------ .../destination/SegmentGenerationUtils.java | 330 ++++++++++++++++++ ...stSegmentMorpherFrameProcessorFactory.java | 1 - 3 files changed, 338 insertions(+), 110 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index dc9a62eb8503..f147a5a23ebe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -41,7 +41,6 @@ import org.apache.druid.data.input.StringTuple; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.BrokerClient; import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; @@ -82,7 +81,6 @@ import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -102,6 +100,7 @@ import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.ExportMSQDestination; +import org.apache.druid.msq.indexing.destination.SegmentGenerationUtils; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.FaultsExceededChecker; @@ -178,7 +177,6 @@ import org.apache.druid.query.QueryContext; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.DimensionHandlerUtils; @@ -188,7 +186,6 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -196,7 +193,6 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.timeline.CompactionState; @@ -1844,8 +1840,13 @@ private static QueryDefinition makeQueryDefinition( } // Then, add a segment-generation stage. - final DataSchema dataSchema = - makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); + final DataSchema dataSchema = SegmentGenerationUtils.makeDataSchemaForIngestion( + querySpec, + querySignature, + queryClusterBy, + columnMappings, + jsonMapper + ); builder.add( StageDefinition.builder(queryDef.getNextStageNumber()) @@ -1931,108 +1932,6 @@ private static String getDataSourceForIngestion(final MSQSpec querySpec) return ((DataSourceMSQDestination) querySpec.getDestination()).getDataSource(); } - private static DataSchema makeDataSchemaForIngestion( - MSQSpec querySpec, - RowSignature querySignature, - ClusterBy queryClusterBy, - ColumnMappings columnMappings, - ObjectMapper jsonMapper - ) - { - final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); - final boolean isRollupQuery = isRollupQuery(querySpec.getQuery()); - - final Pair, List> dimensionsAndAggregators = - makeDimensionsAndAggregatorsForIngestion( - querySignature, - queryClusterBy, - destination.getSegmentSortOrder(), - columnMappings, - isRollupQuery, - querySpec.getQuery() - ); - - return new DataSchema( - destination.getDataSource(), - new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), - new DimensionsSpec(dimensionsAndAggregators.lhs), - dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]), - makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper), - new TransformSpec(null, Collections.emptyList()) - ); - } - - private static GranularitySpec makeGranularitySpecForIngestion( - final Query query, - final ColumnMappings columnMappings, - final boolean isRollupQuery, - final ObjectMapper jsonMapper - ) - { - if (isRollupQuery) { - final String queryGranularityString = - query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, ""); - - if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) { - final Granularity queryGranularity; - - try { - queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY); - } - return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY); - } else { - return new ArbitraryGranularitySpec(Granularities.NONE, false, Intervals.ONLY_ETERNITY); - } - } - - /** - * Checks that a {@link GroupByQuery} is grouping on the primary time column. - *

- * The logic here is roundabout. First, we check which column in the {@link GroupByQuery} corresponds to the - * output column {@link ColumnHolder#TIME_COLUMN_NAME}, using our {@link ColumnMappings}. Then, we check for the - * presence of an optimization done in {@link DruidQuery#toGroupByQuery()}, where the context parameter - * {@link GroupByQuery#CTX_TIMESTAMP_RESULT_FIELD} and various related parameters are set when one of the dimensions - * is detected to be a time-floor. Finally, we check that the name of that dimension, and the name of our time field - * from {@link ColumnMappings}, are the same. - */ - private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings) - { - final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME); - - if (positions.size() == 1) { - final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0)); - return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD)); - } else { - return false; - } - } - - /** - * Whether a native query represents an ingestion with rollup. - *

- * Checks for three things: - *

- * - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and - * aggregations. - * - The query must not finalize aggregations, because rollup requires inserting the intermediate type of - * complex aggregations, not the finalized type. (So further rollup is possible.) - * - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because - * groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time - * (rollup expects multi-value dimensions to be treated as arrays). - */ - private static boolean isRollupQuery(Query query) - { - return query instanceof GroupByQuery - && !MultiStageQueryContext.isFinalizeAggregations(query.context()) - && !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true); - } - /** * Compute shard columns for {@link DimensionRangeShardSpec}. Returns an empty list if range-based sharding * is not applicable. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java new file mode 100644 index 000000000000..d17919e45e35 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.destination; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.util.ArrayIngestMode; +import org.apache.druid.msq.util.DimensionSchemaUtils; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.msq.util.PassthroughAggregatorFactory; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.rel.DruidQuery; +import org.apache.druid.utils.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SegmentGenerationUtils +{ + private static final Logger log = new Logger(SegmentGenerationUtils.class); + + public static DataSchema makeDataSchemaForIngestion( + MSQSpec querySpec, + RowSignature querySignature, + ClusterBy queryClusterBy, + ColumnMappings columnMappings, + ObjectMapper jsonMapper + ) + { + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + final boolean isRollupQuery = isRollupQuery(querySpec.getQuery()); + + final Pair, List> dimensionsAndAggregators = + makeDimensionsAndAggregatorsForIngestion( + querySignature, + queryClusterBy, + destination.getSegmentSortOrder(), + columnMappings, + isRollupQuery, + querySpec.getQuery() + ); + + return new DataSchema( + destination.getDataSource(), + new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), + new DimensionsSpec(dimensionsAndAggregators.lhs), + dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]), + makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper), + new TransformSpec(null, Collections.emptyList()) + ); + } + + private static GranularitySpec makeGranularitySpecForIngestion( + final Query query, + final ColumnMappings columnMappings, + final boolean isRollupQuery, + final ObjectMapper jsonMapper + ) + { + if (isRollupQuery) { + final String queryGranularityString = + query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, ""); + + if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) { + final Granularity queryGranularity; + + try { + queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY); + } + return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY); + } else { + return new ArbitraryGranularitySpec(Granularities.NONE, false, Intervals.ONLY_ETERNITY); + } + } + + + /** + * Checks that a {@link GroupByQuery} is grouping on the primary time column. + *

+ * The logic here is roundabout. First, we check which column in the {@link GroupByQuery} corresponds to the + * output column {@link ColumnHolder#TIME_COLUMN_NAME}, using our {@link ColumnMappings}. Then, we check for the + * presence of an optimization done in {@link DruidQuery#toGroupByQuery()}, where the context parameter + * {@link GroupByQuery#CTX_TIMESTAMP_RESULT_FIELD} and various related parameters are set when one of the dimensions + * is detected to be a time-floor. Finally, we check that the name of that dimension, and the name of our time field + * from {@link ColumnMappings}, are the same. + */ + private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings) + { + final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME); + + if (positions.size() == 1) { + final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0)); + return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD)); + } else { + return false; + } + } + + private static Pair, List> makeDimensionsAndAggregatorsForIngestion( + final RowSignature querySignature, + final ClusterBy queryClusterBy, + final List segmentSortOrder, + final ColumnMappings columnMappings, + final boolean isRollupQuery, + final Query query + ) + { + // Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to + // deprecation and removal in future + if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) { + log.warn( + "%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as " + + "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be " + + "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer " + + "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write " + + "out multi-value string dimensions using ARRAY_TO_MV. " + + "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.", + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE + ); + } + + final List dimensions = new ArrayList<>(); + final List aggregators = new ArrayList<>(); + + // During ingestion, segment sort order is determined by the order of fields in the DimensionsSchema. We want + // this to match user intent as dictated by the declared segment sort order and CLUSTERED BY, so add things in + // that order. + + // Start with segmentSortOrder. + final Set outputColumnsInOrder = new LinkedHashSet<>(segmentSortOrder); + + // Then the query-level CLUSTERED BY. + // Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected. + // Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows + // within an individual segment. + for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { + final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()); + for (final int outputColumn : outputColumns) { + outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn)); + } + } + + // Then all other columns. + outputColumnsInOrder.addAll(columnMappings.getOutputColumnNames()); + + Map outputColumnAggregatorFactories = new HashMap<>(); + + if (isRollupQuery) { + // Populate aggregators from the native query when doing an ingest in rollup mode. + for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) { + for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) { + final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); + if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { + throw new ISE("There can only be one aggregation for column [%s].", outputColumn); + } else { + outputColumnAggregatorFactories.put( + outputColumnName, + aggregatorFactory.withName(outputColumnName).getCombiningFactory() + ); + } + } + } + } + + // Each column can be of either time, dimension, aggregator. For this method. we can ignore the time column. + // For non-complex columns, If the aggregator factory of the column is not available, we treat the column as + // a dimension. For complex columns, certains hacks are in place. + for (final String outputColumnName : outputColumnsInOrder) { + // CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require + // that output names be unique. + final int outputColumn = CollectionUtils.getOnlyElement( + columnMappings.getOutputColumnsByName(outputColumnName), + xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs) + ); + final String queryColumn = columnMappings.getQueryColumnName(outputColumn); + final ColumnType type = + querySignature.getColumnType(queryColumn) + .orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName)); + + if (!outputColumnName.equals(ColumnHolder.TIME_COLUMN_NAME)) { + + if (!type.is(ValueType.COMPLEX)) { + // non complex columns + populateDimensionsAndAggregators( + dimensions, + aggregators, + outputColumnAggregatorFactories, + outputColumnName, + type, + query.context() + ); + } else { + // complex columns only + if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) { + dimensions.add( + DimensionSchemaUtils.createDimensionSchema( + outputColumnName, + type, + MultiStageQueryContext.useAutoColumnSchemas(query.context()), + MultiStageQueryContext.getArrayIngestMode(query.context()) + ) + ); + } else if (!isRollupQuery) { + aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName())); + } else { + populateDimensionsAndAggregators( + dimensions, + aggregators, + outputColumnAggregatorFactories, + outputColumnName, + type, + query.context() + ); + } + } + } + } + + return Pair.of(dimensions, aggregators); + } + + /** + * If the output column is present in the outputColumnAggregatorFactories that means we already have the aggregator information for this column. + * else treat this column as a dimension. + * + * @param dimensions list is poulated if the output col is deemed to be a dimension + * @param aggregators list is populated with the aggregator if the output col is deemed to be a aggregation column. + * @param outputColumnAggregatorFactories output col -> AggregatorFactory map + * @param outputColumn column name + * @param type columnType + */ + private static void populateDimensionsAndAggregators( + List dimensions, + List aggregators, + Map outputColumnAggregatorFactories, + String outputColumn, + ColumnType type, + QueryContext context + ) + { + if (outputColumnAggregatorFactories.containsKey(outputColumn)) { + aggregators.add(outputColumnAggregatorFactories.get(outputColumn)); + } else { + dimensions.add( + DimensionSchemaUtils.createDimensionSchema( + outputColumn, + type, + MultiStageQueryContext.useAutoColumnSchemas(context), + MultiStageQueryContext.getArrayIngestMode(context) + ) + ); + } + } + + /** + * Whether a native query represents an ingestion with rollup. + *

+ * Checks for three things: + *

+ * - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and + * aggregations. + * - The query must not finalize aggregations, because rollup requires inserting the intermediate type of + * complex aggregations, not the finalized type. (So further rollup is possible.) + * - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because + * groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time + * (rollup expects multi-value dimensions to be treated as arrays). + */ + private static boolean isRollupQuery(Query query) + { + return query instanceof GroupByQuery + && !MultiStageQueryContext.isFinalizeAggregations(query.context()) + && !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java index 39378ea9a056..90e4795a2cab 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java @@ -50,7 +50,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; -import org.junit.Assert; import javax.annotation.Nullable; import java.util.Collections; From 75be018529380c7d96f40aa12f8b5e5c9b900330 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 16 Jul 2024 19:22:36 +0530 Subject: [PATCH 09/13] Refactor segment generation --- .../apache/druid/msq/exec/ControllerImpl.java | 300 +----------------- .../apache/druid/msq/guice/MSQSqlModule.java | 16 +- .../guice/MSQTerminalStageSpecFactory.java} | 24 +- .../destination/DataSourceMSQDestination.java | 37 +-- .../SegmentGenerationStageSpec.java | 145 +++++++++ .../destination/SegmentGenerationUtils.java | 7 +- .../destination/TerminalStageSpec.java | 31 ++ .../druid/msq/sql/MSQTaskQueryMaker.java | 9 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 11 +- .../druid/msq/exec/MSQSegmentMorphTest.java | 56 ---- .../apache/druid/msq/test/MSQTestBase.java | 4 +- .../test/TestTerminalStageSpecFactory.java} | 35 +- 12 files changed, 251 insertions(+), 424 deletions(-) rename extensions-core/multi-stage-query/src/{test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java => main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java} (61%) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java delete mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java rename extensions-core/multi-stage-query/src/{main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java => test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java} (56%) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index f147a5a23ebe..b0f984d79f8f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -30,7 +30,6 @@ import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntArraySet; @@ -39,7 +38,6 @@ import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.discovery.BrokerClient; import org.apache.druid.error.DruidException; @@ -100,7 +98,8 @@ import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.ExportMSQDestination; -import org.apache.druid.msq.indexing.destination.SegmentGenerationUtils; +import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec; +import org.apache.druid.msq.indexing.destination.TerminalStageSpec; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.FaultsExceededChecker; @@ -129,7 +128,6 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicerFactory; -import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.input.MapInputSpecSlicer; import org.apache.druid.msq.input.external.ExternalInputSpec; import org.apache.druid.msq.input.external.ExternalInputSpecSlicer; @@ -138,12 +136,9 @@ import org.apache.druid.msq.input.lookup.LookupInputSpec; import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer; import org.apache.druid.msq.input.stage.InputChannels; -import org.apache.druid.msq.input.stage.ReadablePartition; -import org.apache.druid.msq.input.stage.StageInputSlice; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.input.stage.StageInputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpec; -import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.StageDefinition; @@ -166,25 +161,19 @@ import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory; import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; -import org.apache.druid.msq.util.ArrayIngestMode; -import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.msq.util.IntervalUtils; import org.apache.druid.msq.util.MSQFutureUtils; import org.apache.druid.msq.util.MSQTaskQueryMakerUtils; import org.apache.druid.msq.util.MultiStageQueryContext; -import org.apache.druid.msq.util.PassthroughAggregatorFactory; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; -import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; @@ -202,7 +191,6 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CloseableUtils; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -215,7 +203,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -1190,9 +1177,12 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( if (MSQControllerTask.isIngestion(querySpec) && stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) { final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); - if (!destination.doesSegmentMorphing()) { - // noinspection unchecked,rawtypes - return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate); + TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec(); + if (destination.getTerminalStageSpec() instanceof SegmentGenerationStageSpec) { + return (Int2ObjectMap) ((SegmentGenerationStageSpec) terminalStageSpec).makeSegmentGeneratorWorkerFactoryInfos( + workerInputs, + segmentsToGenerate + ); } } return null; @@ -1211,35 +1201,6 @@ private QueryKit makeQueryControllerToolKit() return new MultiQueryKit(kitMap); } - private Int2ObjectMap> makeSegmentGeneratorWorkerFactoryInfos( - final WorkerInputs workerInputs, - final List segmentsToGenerate - ) - { - final Int2ObjectMap> retVal = new Int2ObjectAVLTreeMap<>(); - - // Empty segments validation already happens when the stages are started -- so we cannot have both - // isFailOnEmptyInsertEnabled and segmentsToGenerate.isEmpty() be true here. - if (segmentsToGenerate.isEmpty()) { - return retVal; - } - - for (final int workerNumber : workerInputs.workers()) { - // SegmentGenerator stage has a single input from another stage. - final StageInputSlice stageInputSlice = - (StageInputSlice) Iterables.getOnlyElement(workerInputs.inputsForWorker(workerNumber)); - - final List workerSegments = new ArrayList<>(); - retVal.put(workerNumber, workerSegments); - - for (final ReadablePartition partition : stageInputSlice.getPartitions()) { - workerSegments.add(segmentsToGenerate.get(partition.getPartitionNumber())); - } - } - - return retVal; - } - /** * A blocking function used to contact multiple workers. Checks if all the workers are running before contacting them. * @@ -1781,87 +1742,14 @@ private static QueryDefinition makeQueryDefinition( } if (MSQControllerTask.isIngestion(querySpec)) { - final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature(); - final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy(); - - // Find the stage that provides shuffled input to the final segment-generation stage. - StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); - - while (!finalShuffleStageDef.doesShuffle() - && InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) { - finalShuffleStageDef = queryDef.getStageDefinition( - Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs())) - ); - } - - if (!finalShuffleStageDef.doesShuffle()) { - finalShuffleStageDef = null; - } - - // Add all query stages. - // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); - - for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { - if (stageDef.equals(finalShuffleStageDef)) { - builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true)); - } else { - builder.add(StageDefinition.builder(stageDef)); - } - } - - // Possibly add a segment morpher stage. - if (((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) { - final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); - final FrameProcessorFactory segmentMorphFactory = destination.getSegmentMorphFactory(); - - if (!destination.isReplaceTimeChunks()) { - throw new MSQException(UnknownFault.forMessage("segmentMorphFactory requires replaceTimeChunks")); - } - - builder.add( - StageDefinition.builder(queryDef.getNextStageNumber()) - .inputs( - new TableInputSpec( - destination.getDataSource(), - destination.getReplaceTimeChunks(), - null, - null - ), - new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()) - ) - .broadcastInputs(IntSet.of(1)) - .maxWorkerCount(tuningConfig.getMaxNumWorkers()) - .processorFactory(segmentMorphFactory) - ); - - // If there was a segment morpher, return immediately; don't add a segment-generation stage. - return builder.build(); + DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec(); + if (terminalStageSpec instanceof SegmentGenerationStageSpec) { + return ((SegmentGenerationStageSpec) terminalStageSpec).constructFinalStage(queryId, queryDef, querySpec, jsonMapper); + } else { + throw DruidException.defensive("Unknown segment generation strategy [%s]", terminalStageSpec); } - // Then, add a segment-generation stage. - final DataSchema dataSchema = SegmentGenerationUtils.makeDataSchemaForIngestion( - querySpec, - querySignature, - queryClusterBy, - columnMappings, - jsonMapper - ); - - builder.add( - StageDefinition.builder(queryDef.getNextStageNumber()) - .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) - .maxWorkerCount(tuningConfig.getMaxNumWorkers()) - .processorFactory( - new SegmentGeneratorFrameProcessorFactory( - dataSchema, - columnMappings, - tuningConfig - ) - ) - ); - - return builder.build(); } else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) { return queryDef; } else if (MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) { @@ -2020,164 +1908,6 @@ private static StringTuple makeStringTuple( return new StringTuple(array); } - private static Pair, List> makeDimensionsAndAggregatorsForIngestion( - final RowSignature querySignature, - final ClusterBy queryClusterBy, - final List segmentSortOrder, - final ColumnMappings columnMappings, - final boolean isRollupQuery, - final Query query - ) - { - // Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to - // deprecation and removal in future - if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) { - log.warn( - "%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as " - + "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be " - + "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer " - + "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write " - + "out multi-value string dimensions using ARRAY_TO_MV. " - + "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.", - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE - ); - } - - final List dimensions = new ArrayList<>(); - final List aggregators = new ArrayList<>(); - - // During ingestion, segment sort order is determined by the order of fields in the DimensionsSchema. We want - // this to match user intent as dictated by the declared segment sort order and CLUSTERED BY, so add things in - // that order. - - // Start with segmentSortOrder. - final Set outputColumnsInOrder = new LinkedHashSet<>(segmentSortOrder); - - // Then the query-level CLUSTERED BY. - // Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected. - // Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows - // within an individual segment. - for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { - final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()); - for (final int outputColumn : outputColumns) { - outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn)); - } - } - - // Then all other columns. - outputColumnsInOrder.addAll(columnMappings.getOutputColumnNames()); - - Map outputColumnAggregatorFactories = new HashMap<>(); - - if (isRollupQuery) { - // Populate aggregators from the native query when doing an ingest in rollup mode. - for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) { - for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) { - final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); - if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { - throw new ISE("There can only be one aggregation for column [%s].", outputColumn); - } else { - outputColumnAggregatorFactories.put( - outputColumnName, - aggregatorFactory.withName(outputColumnName).getCombiningFactory() - ); - } - } - } - } - - // Each column can be of either time, dimension, aggregator. For this method. we can ignore the time column. - // For non-complex columns, If the aggregator factory of the column is not available, we treat the column as - // a dimension. For complex columns, certains hacks are in place. - for (final String outputColumnName : outputColumnsInOrder) { - // CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require - // that output names be unique. - final int outputColumn = CollectionUtils.getOnlyElement( - columnMappings.getOutputColumnsByName(outputColumnName), - xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs) - ); - final String queryColumn = columnMappings.getQueryColumnName(outputColumn); - final ColumnType type = - querySignature.getColumnType(queryColumn) - .orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName)); - - if (!outputColumnName.equals(ColumnHolder.TIME_COLUMN_NAME)) { - - if (!type.is(ValueType.COMPLEX)) { - // non complex columns - populateDimensionsAndAggregators( - dimensions, - aggregators, - outputColumnAggregatorFactories, - outputColumnName, - type, - query.context() - ); - } else { - // complex columns only - if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) { - dimensions.add( - DimensionSchemaUtils.createDimensionSchema( - outputColumnName, - type, - MultiStageQueryContext.useAutoColumnSchemas(query.context()), - MultiStageQueryContext.getArrayIngestMode(query.context()) - ) - ); - } else if (!isRollupQuery) { - aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName())); - } else { - populateDimensionsAndAggregators( - dimensions, - aggregators, - outputColumnAggregatorFactories, - outputColumnName, - type, - query.context() - ); - } - } - } - } - - return Pair.of(dimensions, aggregators); - } - - - /** - * If the output column is present in the outputColumnAggregatorFactories that means we already have the aggregator information for this column. - * else treat this column as a dimension. - * - * @param dimensions list is poulated if the output col is deemed to be a dimension - * @param aggregators list is populated with the aggregator if the output col is deemed to be a aggregation column. - * @param outputColumnAggregatorFactories output col -> AggregatorFactory map - * @param outputColumn column name - * @param type columnType - */ - private static void populateDimensionsAndAggregators( - List dimensions, - List aggregators, - Map outputColumnAggregatorFactories, - String outputColumn, - ColumnType type, - QueryContext context - ) - { - if (outputColumnAggregatorFactories.containsKey(outputColumn)) { - aggregators.add(outputColumnAggregatorFactories.get(outputColumn)); - } else { - dimensions.add( - DimensionSchemaUtils.createDimensionSchema( - outputColumn, - type, - MultiStageQueryContext.useAutoColumnSchemas(context), - MultiStageQueryContext.getArrayIngestMode(context) - ) - ); - } - } - private static DateTime getBucketDateTime( final ClusterByPartition partitionBoundary, final Granularity segmentGranularity, @@ -2656,7 +2386,7 @@ private void startStages() throws IOException, InterruptedException // Allocate segments, if this is the final stage of an ingestion. if (MSQControllerTask.isIngestion(querySpec) && stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber() - && !((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) { + && (((DataSourceMSQDestination) querySpec.getDestination()).getTerminalStageSpec() instanceof SegmentGenerationStageSpec)) { populateSegmentsToGenerate(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java index 516573adb5aa..92a44067d0e8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java @@ -27,17 +27,12 @@ import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.initialization.DruidModule; import org.apache.druid.metadata.input.InputSourceModule; -import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.sql.MSQTaskSqlEngine; -import org.apache.druid.msq.sql.SegmentMorphFactoryCreator; import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; -import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.guice.SqlBindings; -import javax.annotation.Nullable; import java.util.List; /** @@ -61,16 +56,7 @@ public void configure(Binder binder) // Currently, there are no supported segment morph factories, so bind an implementation which always // returns an empty morph factory. - binder.bind(SegmentMorphFactoryCreator.class).toInstance( - new SegmentMorphFactoryCreator() - { - @Nullable - @Override - public FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext) - { - return null; - } - }); + binder.bind(MSQTerminalStageSpecFactory.class).toInstance(new MSQTerminalStageSpecFactory()); binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java similarity index 61% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java rename to extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java index 38ed3c7723eb..d347322a7fb4 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestSegmentMorphFactoryCreator.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java @@ -17,25 +17,21 @@ * under the License. */ -package org.apache.druid.msq.test; +package org.apache.druid.msq.guice; -import org.apache.druid.msq.kernel.FrameProcessorFactory; -import org.apache.druid.msq.sql.SegmentMorphFactoryCreator; +import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec; +import org.apache.druid.msq.indexing.destination.TerminalStageSpec; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; -public class TestSegmentMorphFactoryCreator implements SegmentMorphFactoryCreator +public class MSQTerminalStageSpecFactory { - private FrameProcessorFactory frameProcessorFactory; - - public void setFrameProcessorFactory(FrameProcessorFactory frameProcessorFactory) - { - this.frameProcessorFactory = frameProcessorFactory; - } - - @Override - public FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext) + /** + * Creates a {@link TerminalStageSpec} which determines the final of a query. Currently, always returns a segment + * generation spec, but this can be used to configure a wide range of behaviours. + */ + public TerminalStageSpec createTerminalStageSpec(DruidQuery druidQuery, PlannerContext plannerContext) { - return frameProcessorFactory; + return SegmentGenerationStageSpec.instance(); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java index 78df189cc658..b0d70b11b68d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java @@ -20,14 +20,12 @@ package org.apache.druid.msq.indexing.destination; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.server.security.Resource; @@ -51,9 +49,7 @@ public class DataSourceMSQDestination implements MSQDestination @Nullable private final List replaceTimeChunks; - @Nullable - @SuppressWarnings("rawtypes") - private final FrameProcessorFactory segmentMorphFactory; + private final TerminalStageSpec terminalStageSpec; @JsonCreator public DataSourceMSQDestination( @@ -61,14 +57,14 @@ public DataSourceMSQDestination( @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("segmentSortOrder") @Nullable List segmentSortOrder, @JsonProperty("replaceTimeChunks") @Nullable List replaceTimeChunks, - @JsonProperty("segmentMorphFactory") @Nullable FrameProcessorFactory segmentMorphFactory + @JsonProperty("terminalStageSpec") @Nullable TerminalStageSpec terminalStageSpec ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity"); this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList(); this.replaceTimeChunks = replaceTimeChunks; - this.segmentMorphFactory = segmentMorphFactory; + this.terminalStageSpec = terminalStageSpec != null ? terminalStageSpec : SegmentGenerationStageSpec.instance(); if (replaceTimeChunks != null) { // Verify that if replaceTimeChunks is provided, it is nonempty. @@ -107,27 +103,14 @@ public String getDataSource() } /** - * Returns the segment morph factory, if one is present, else null. + * Returns the terminal stage spec. *

- * The segment morph factory if present, is a way to tell the MSQ task to funnel the results at the final stage to - * the {@link FrameProcessorFactory} instead of a segment generation stage. + * The terminal stage spec, is a way to tell the MSQ task how to convert the results into segments at the final stage. */ - @Nullable @JsonProperty - @JsonInclude(JsonInclude.Include.NON_NULL) - public FrameProcessorFactory getSegmentMorphFactory() - { - return segmentMorphFactory; - } - - /** - * Checks if the destination uses a segmentMorphFactory. If one is present, that means that the query would modify - * existing segments instead of generating new ones. - */ - @JsonIgnore - public boolean doesSegmentMorphing() + public TerminalStageSpec getTerminalStageSpec() { - return segmentMorphFactory != null; + return terminalStageSpec; } @JsonProperty @@ -191,13 +174,13 @@ public boolean equals(Object o) && Objects.equals(segmentGranularity, that.segmentGranularity) && Objects.equals(segmentSortOrder, that.segmentSortOrder) && Objects.equals(replaceTimeChunks, that.replaceTimeChunks) - && Objects.equals(segmentMorphFactory, that.segmentMorphFactory); + && Objects.equals(terminalStageSpec, that.terminalStageSpec); } @Override public int hashCode() { - return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, segmentMorphFactory); + return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, terminalStageSpec); } @Override @@ -208,7 +191,7 @@ public String toString() ", segmentGranularity=" + segmentGranularity + ", segmentSortOrder=" + segmentSortOrder + ", replaceTimeChunks=" + replaceTimeChunks + - (segmentMorphFactory != null ? ", segmentMorphFactory=" + segmentMorphFactory : "") + + ", terminalStageSpec=" + terminalStageSpec + '}'; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java new file mode 100644 index 000000000000..95fcc1003227 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.destination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.indexing.MSQTuningConfig; +import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory; +import org.apache.druid.msq.input.InputSpecs; +import org.apache.druid.msq.input.stage.ReadablePartition; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.controller.WorkerInputs; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.sql.calcite.planner.ColumnMappings; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class SegmentGenerationStageSpec implements TerminalStageSpec +{ + public static final String TYPE = "segmentGeneration"; + + private static final SegmentGenerationStageSpec INSTANCE = new SegmentGenerationStageSpec(); + + private SegmentGenerationStageSpec() + { + } + + @JsonCreator + public static SegmentGenerationStageSpec instance() + { + return INSTANCE; + } + + public QueryDefinition constructFinalStage(String queryId, QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper) + { + final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); + final ColumnMappings columnMappings = querySpec.getColumnMappings(); + final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature(); + final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy(); + + // Find the stage that provides shuffled input to the final segment-generation stage. + StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); + + while (!finalShuffleStageDef.doesShuffle() + && InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) { + finalShuffleStageDef = queryDef.getStageDefinition( + Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs())) + ); + } + + if (!finalShuffleStageDef.doesShuffle()) { + finalShuffleStageDef = null; + } + + // Add all query stages. + // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + + for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { + if (stageDef.equals(finalShuffleStageDef)) { + builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true)); + } else { + builder.add(StageDefinition.builder(stageDef)); + } + } + + // Then, add a segment-generation stage. + final DataSchema dataSchema = + SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); + + builder.add( + StageDefinition.builder(queryDef.getNextStageNumber()) + .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) + .maxWorkerCount(tuningConfig.getMaxNumWorkers()) + .processorFactory( + new SegmentGeneratorFrameProcessorFactory( + dataSchema, + columnMappings, + tuningConfig + ) + ) + ); + + return builder.build(); + } + + public Int2ObjectMap> makeSegmentGeneratorWorkerFactoryInfos( + final WorkerInputs workerInputs, + @Nullable final List segmentsToGenerate + ) + { + final Int2ObjectMap> retVal = new Int2ObjectAVLTreeMap<>(); + + // Empty segments validation already happens when the stages are started -- so we cannot have both + // isFailOnEmptyInsertEnabled and segmentsToGenerate.isEmpty() be true here. + if (segmentsToGenerate == null || segmentsToGenerate.isEmpty()) { + return retVal; + } + + for (final int workerNumber : workerInputs.workers()) { + // SegmentGenerator stage has a single input from another stage. + final StageInputSlice stageInputSlice = + (StageInputSlice) Iterables.getOnlyElement(workerInputs.inputsForWorker(workerNumber)); + + final List workerSegments = new ArrayList<>(); + retVal.put(workerNumber, workerSegments); + + for (final ReadablePartition partition : stageInputSlice.getPartitions()) { + workerSegments.add(segmentsToGenerate.get(partition.getPartitionNumber())); + } + } + + return retVal; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java index d17919e45e35..cd3342088011 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java @@ -64,7 +64,7 @@ import java.util.Map; import java.util.Set; -public class SegmentGenerationUtils +public final class SegmentGenerationUtils { private static final Logger log = new Logger(SegmentGenerationUtils.class); @@ -128,7 +128,6 @@ private static GranularitySpec makeGranularitySpecForIngestion( } } - /** * Checks that a {@link GroupByQuery} is grouping on the primary time column. *

@@ -327,4 +326,8 @@ private static boolean isRollupQuery(Query query) && !MultiStageQueryContext.isFinalizeAggregations(query.context()) && !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true); } + + private SegmentGenerationUtils() + { + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java new file mode 100644 index 000000000000..ce210de8b58b --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.destination; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = SegmentGenerationStageSpec.TYPE, value = SegmentGenerationStageSpec.class) +}) +public interface TerminalStageSpec +{ +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index bc51bf28d862..098a7384c384 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.msq.exec.MSQTasks; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; @@ -92,7 +93,7 @@ public class MSQTaskQueryMaker implements QueryMaker private final PlannerContext plannerContext; private final ObjectMapper jsonMapper; private final List> fieldMapping; - private final SegmentMorphFactoryCreator segmentMorphFactoryCreator; + private final MSQTerminalStageSpecFactory terminalStageSpecFactory; MSQTaskQueryMaker( @@ -101,7 +102,7 @@ public class MSQTaskQueryMaker implements QueryMaker final PlannerContext plannerContext, final ObjectMapper jsonMapper, final List> fieldMapping, - final SegmentMorphFactoryCreator segmentMorphFactoryCreator + final MSQTerminalStageSpecFactory terminalStageSpecFactory ) { this.targetDataSource = targetDataSource; @@ -109,7 +110,7 @@ public class MSQTaskQueryMaker implements QueryMaker this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext"); this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); this.fieldMapping = Preconditions.checkNotNull(fieldMapping, "fieldMapping"); - this.segmentMorphFactoryCreator = segmentMorphFactoryCreator; + this.terminalStageSpecFactory = terminalStageSpecFactory; } @Override @@ -250,7 +251,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) segmentGranularityObject, segmentSortOrder, replaceTimeChunks, - segmentMorphFactoryCreator.createSegmentMorphFactory( + terminalStageSpecFactory.createTerminalStageSpec( druidQuery, plannerContext ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 2b886ca1e0c8..2a772171d06a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.util.ArrayIngestMode; import org.apache.druid.msq.util.DimensionSchemaUtils; @@ -82,7 +83,7 @@ public class MSQTaskSqlEngine implements SqlEngine private final OverlordClient overlordClient; private final ObjectMapper jsonMapper; - private final SegmentMorphFactoryCreator segmentMorphFactoryCreator; + private final MSQTerminalStageSpecFactory terminalStageSpecFactory; public MSQTaskSqlEngine( final OverlordClient overlordClient, @@ -96,12 +97,12 @@ public MSQTaskSqlEngine( public MSQTaskSqlEngine( final OverlordClient overlordClient, final ObjectMapper jsonMapper, - final SegmentMorphFactoryCreator segmentMorphFactoryCreator + final MSQTerminalStageSpecFactory terminalStageSpecFactory ) { this.overlordClient = overlordClient; this.jsonMapper = jsonMapper; - this.segmentMorphFactoryCreator = segmentMorphFactoryCreator; + this.terminalStageSpecFactory = terminalStageSpecFactory; } @Override @@ -171,7 +172,7 @@ public QueryMaker buildQueryMakerForSelect( plannerContext, jsonMapper, relRoot.fields, - segmentMorphFactoryCreator + terminalStageSpecFactory ); } @@ -206,7 +207,7 @@ public QueryMaker buildQueryMakerForInsert( plannerContext, jsonMapper, relRoot.fields, - segmentMorphFactoryCreator + terminalStageSpecFactory ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java deleted file mode 100644 index 22979e36c371..000000000000 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSegmentMorphTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.exec; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.msq.test.MSQTestBase; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.junit.jupiter.api.Test; - -public class MSQSegmentMorphTest extends MSQTestBase -{ - @Test - public void testSegmentMorphFactory() - { - testSegmentMorphFactoryCreator.setFrameProcessorFactory(new TestSegmentMorpherFrameProcessorFactory()); - - RowSignature rowSignature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("m1", ColumnType.FLOAT) - .build(); - - testIngestQuery().setSql(" REPLACE INTO foo " - + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-04'" - + "SELECT __time, m1 " - + "FROM foo " - + "PARTITIONED BY DAY ") - .setExpectedDataSource("foo") - .setExpectedRowSignature(rowSignature) - .setQueryContext(DEFAULT_MSQ_CONTEXT) - .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("1970-01-01T00:00:00.000Z/2001-01-03T00:00:00.001Z"))) - .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) - .setExpectedResultRows(ImmutableList.of()) - .setExpectedDestinationIntervals(ImmutableList.of(Intervals.of("2000-01-01T/2000-01-04T"))) - .verifyResults(); - } -} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 6a555d48a4dd..c7809509cae1 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -92,6 +92,7 @@ import org.apache.druid.msq.guice.MSQExternalDataSourceModule; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.MSQControllerTask; @@ -328,7 +329,6 @@ public class MSQTestBase extends BaseCalciteQueryTest protected List loadedSegmentsMetadata = new ArrayList<>(); // Mocks the return of data from data servers protected DataServerQueryHandler dataServerQueryHandler = mock(DataServerQueryHandler.class); - protected TestSegmentMorphFactoryCreator testSegmentMorphFactoryCreator = new TestSegmentMorphFactoryCreator(); private MSQTestSegmentManager segmentManager; private SegmentCacheManager segmentCacheManager; @@ -559,7 +559,7 @@ public String getFormatString() final SqlEngine engine = new MSQTaskSqlEngine( indexingServiceClient, qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()), - testSegmentMorphFactoryCreator + new MSQTerminalStageSpecFactory() ); PlannerFactory plannerFactory = new PlannerFactory( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java similarity index 56% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java index 516e9deb0699..6db92fada1e7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SegmentMorphFactoryCreator.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java @@ -17,23 +17,30 @@ * under the License. */ -package org.apache.druid.msq.sql; +package org.apache.druid.msq.test; -import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; +import org.apache.druid.msq.indexing.destination.TerminalStageSpec; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; -import javax.annotation.Nullable; - -public interface SegmentMorphFactoryCreator +public class TestTerminalStageSpecFactory extends MSQTerminalStageSpecFactory { - /** - * Creates a {@link FrameProcessorFactory} which acts as a final stage, if the {@link DruidQuery} requires one. This - * is generally the case with ingest queries which do not want to create new segments, but rather modify existing - * segments in some way. - * If no segment morphing stage is required, returns null. - */ - @Nullable - @SuppressWarnings("rawtypes") - FrameProcessorFactory createSegmentMorphFactory(DruidQuery druidQuery, PlannerContext plannerContext); + private TerminalStageSpec terminalStageSpec; + + public TestTerminalStageSpecFactory(TerminalStageSpec terminalStageSpec) + { + this.terminalStageSpec = terminalStageSpec; + } + + public void setTerminalStageSpec(TerminalStageSpec terminalStageSpec) + { + this.terminalStageSpec = terminalStageSpec; + } + + @Override + public TerminalStageSpec createTerminalStageSpec(DruidQuery druidQuery, PlannerContext plannerContext) + { + return terminalStageSpec; + } } From f6d185cf23b9d9bd509e7bd49b461673728bd660 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 16 Jul 2024 20:26:36 +0530 Subject: [PATCH 10/13] Fix javadoc --- .../org/apache/druid/msq/util/PassthroughAggregatorFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java index 8de47936eb48..f821c169505e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java @@ -39,7 +39,7 @@ * Hack that allows "passing through" arbitrary complex types into * {@link org.apache.druid.segment.incremental.IncrementalIndex}. * - * Used by {@link org.apache.druid.msq.exec.ControllerImpl#makeDimensionsAndAggregatorsForIngestion}. + * Used by {@link org.apache.druid.msq.indexing.destination.SegmentGenerationUtils#makeDimensionsAndAggregatorsForIngestion}. * * To move away from this, it would need to be possible to create complex columns in segments only knowing the complex * type; in particular, without knowing the type of an aggregator factory or dimension schema that corresponds to From 5fb16d5e3fe98e109db4686bbbcaf7c10cf1fc27 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 20 Jul 2024 16:31:06 +0530 Subject: [PATCH 11/13] Refactor --- .../apache/druid/msq/guice/MSQSqlModule.java | 2 - .../destination/TerminalStageSpec.java | 3 + ...stSegmentMorpherFrameProcessorFactory.java | 184 ------------------ .../DataSourceMSQDestinationTest.java | 9 + .../apache/druid/msq/test/MSQTestBase.java | 2 - .../test/TestTerminalStageSpecFactory.java | 46 ----- 6 files changed, 12 insertions(+), 234 deletions(-) delete mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java delete mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java index 92a44067d0e8..a523fd6c012e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java @@ -54,8 +54,6 @@ public void configure(Binder binder) // We want this module to bring InputSourceModule along for the ride. binder.install(new InputSourceModule()); - // Currently, there are no supported segment morph factories, so bind an implementation which always - // returns an empty morph factory. binder.bind(MSQTerminalStageSpecFactory.class).toInstance(new MSQTerminalStageSpecFactory()); binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java index ce210de8b58b..798fd825b066 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +/** + * Determines the final stage of a {@link DataSourceMSQDestination}. + */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = SegmentGenerationStageSpec.TYPE, value = SegmentGenerationStageSpec.class) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java deleted file mode 100644 index 90e4795a2cab..000000000000 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestSegmentMorpherFrameProcessorFactory.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.exec; - -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.ImmutableMap; -import it.unimi.dsi.fastutil.ints.IntSet; -import org.apache.druid.frame.channel.ReadableFrameChannel; -import org.apache.druid.frame.channel.WritableFrameChannel; -import org.apache.druid.frame.processor.FrameProcessor; -import org.apache.druid.frame.processor.OutputChannelFactory; -import org.apache.druid.frame.processor.OutputChannels; -import org.apache.druid.frame.processor.ReturnOrAwait; -import org.apache.druid.frame.processor.manager.ProcessorManager; -import org.apache.druid.frame.processor.manager.ProcessorManagers; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.msq.counters.CounterTracker; -import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor; -import org.apache.druid.msq.input.InputSlice; -import org.apache.druid.msq.input.InputSliceReader; -import org.apache.druid.msq.input.NilInputSlice; -import org.apache.druid.msq.input.ReadableInputs; -import org.apache.druid.msq.input.table.RichSegmentDescriptor; -import org.apache.druid.msq.input.table.SegmentsInputSlice; -import org.apache.druid.msq.kernel.ExtraInfoHolder; -import org.apache.druid.msq.kernel.FrameContext; -import org.apache.druid.msq.kernel.FrameProcessorFactory; -import org.apache.druid.msq.kernel.ProcessorsAndChannels; -import org.apache.druid.msq.kernel.StageDefinition; -import org.apache.druid.msq.test.MSQTestTaskActionClient; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.partition.NumberedShardSpec; - -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.Consumer; - -/** - * A segment morpher which does nothing but updates the version of the segment by touching it. - */ -@JsonTypeName("testSegmentMorpherFrameProcessorFactory") -public class TestSegmentMorpherFrameProcessorFactory implements FrameProcessorFactory, Object> -{ - @Override - public ProcessorsAndChannels> makeProcessors( - StageDefinition stageDefinition, - int workerNumber, - List inputSlices, - InputSliceReader inputSliceReader, - @Nullable Object extra, - OutputChannelFactory outputChannelFactory, - FrameContext frameContext, - int maxOutstandingProcessors, - CounterTracker counters, - Consumer warningPublisher, - boolean removeNullBytes - ) - { - if (inputSlices.get(0) instanceof NilInputSlice) { - return new ProcessorsAndChannels<>( - ProcessorManagers.of(Sequences.empty()) - .withAccumulation(new HashSet<>(), (acc, segment) -> acc), - OutputChannels.none() - ); - } - - final SegmentsInputSlice segmentsSlice = (SegmentsInputSlice) inputSlices.get(0); - final ReadableInputs segments = - inputSliceReader.attach(0, segmentsSlice, counters, warningPublisher); - - Sequence sequence = Sequences.simple(segments) - .map(segment -> { - final RichSegmentDescriptor descriptor = segment.getSegment() - .getDescriptor(); - final SegmentId sourceSegmentId = SegmentId.of( - segmentsSlice.getDataSource(), - descriptor.getFullInterval(), - descriptor.getVersion(), - descriptor.getPartitionNumber() - ); - - final DataSegment newDataSegment = new DataSegment( - sourceSegmentId.getDataSource(), - sourceSegmentId.getInterval(), - MSQTestTaskActionClient.VERSION, - ImmutableMap.of(), - Collections.emptyList(), - Collections.emptyList(), - new NumberedShardSpec(0, 1), - null, - 0 - ); - return new MorphFrameProcessor(newDataSegment); - }); - - final ProcessorManager> processorManager = - ProcessorManagers.of(sequence) - .withAccumulation( - new HashSet<>(), - (set, segment) -> { - set.add(segment); - return set; - } - ); - return new ProcessorsAndChannels<>(processorManager, OutputChannels.none()); - } - - @Nullable - @Override - public TypeReference> getResultTypeReference() - { - return new TypeReference>() {}; - } - - @Override - public Set mergeAccumulatedResult(Set accumulated, Set otherAccumulated) - { - return Collections.emptySet(); - } - - @Override - public ExtraInfoHolder makeExtraInfoHolder(@Nullable Object extra) - { - return new ExtraInfoHolder(extra) - { - }; - } - - private static class MorphFrameProcessor implements FrameProcessor - { - private final DataSegment segment; - - public MorphFrameProcessor(DataSegment segment) - { - this.segment = segment; - } - - @Override - public List inputChannels() - { - return Collections.emptyList(); - } - - @Override - public List outputChannels() - { - return Collections.emptyList(); - } - - @Override - public ReturnOrAwait runIncrementally(IntSet readableInputs) - { - return ReturnOrAwait.returnObject(segment); - } - - @Override - public void cleanup() - { - } - } -} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java index 6d3a5ebfd9b5..e7521b3fd47d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java @@ -21,6 +21,8 @@ import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.junit.Assert; import org.junit.Test; public class DataSourceMSQDestinationTest @@ -34,4 +36,11 @@ public void testEquals() .usingGetClass() .verify(); } + + @Test + public void testBackwardCompatibility() + { + DataSourceMSQDestination destination = new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null); + Assert.assertEquals(SegmentGenerationStageSpec.instance(), destination.getTerminalStageSpec()); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index c7809509cae1..4bc11f363ec0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -86,7 +86,6 @@ import org.apache.druid.msq.exec.DataServerQueryHandler; import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.exec.ResultsContext; -import org.apache.druid.msq.exec.TestSegmentMorpherFrameProcessorFactory; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.guice.MSQDurableStorageModule; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; @@ -748,7 +747,6 @@ public static ObjectMapper setupObjectMapper(Injector injector) DruidSecondaryModule.setupJackson(injector, mapper); mapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); - mapper.registerSubtypes(new NamedType(TestSegmentMorpherFrameProcessorFactory.class, "testSegmentMorpher")); // This should be reusing guice instead of using static classes InsertLockPreemptedFaultTest.LockPreemptedHelper.preempt(false); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java deleted file mode 100644 index 6db92fada1e7..000000000000 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestTerminalStageSpecFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.test; - -import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; -import org.apache.druid.msq.indexing.destination.TerminalStageSpec; -import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.rel.DruidQuery; - -public class TestTerminalStageSpecFactory extends MSQTerminalStageSpecFactory -{ - private TerminalStageSpec terminalStageSpec; - - public TestTerminalStageSpecFactory(TerminalStageSpec terminalStageSpec) - { - this.terminalStageSpec = terminalStageSpec; - } - - public void setTerminalStageSpec(TerminalStageSpec terminalStageSpec) - { - this.terminalStageSpec = terminalStageSpec; - } - - @Override - public TerminalStageSpec createTerminalStageSpec(DruidQuery druidQuery, PlannerContext plannerContext) - { - return terminalStageSpec; - } -} From b588fa614382302ec99b21fb7ba142691ffae906 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 22 Jul 2024 14:55:14 +0530 Subject: [PATCH 12/13] Refactor --- .../apache/druid/msq/exec/ControllerImpl.java | 42 +++++++++++++++---- .../SegmentGenerationStageSpec.java | 42 +++---------------- .../destination/TerminalStageSpec.java | 5 +++ .../DataSourceMSQDestinationTest.java | 10 ++++- 4 files changed, 55 insertions(+), 44 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b0f984d79f8f..912038681bdd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -128,6 +128,7 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicerFactory; +import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.input.MapInputSpecSlicer; import org.apache.druid.msq.input.external.ExternalInputSpec; import org.apache.druid.msq.input.external.ExternalInputSpecSlicer; @@ -1179,7 +1180,7 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec(); if (destination.getTerminalStageSpec() instanceof SegmentGenerationStageSpec) { - return (Int2ObjectMap) ((SegmentGenerationStageSpec) terminalStageSpec).makeSegmentGeneratorWorkerFactoryInfos( + return (Int2ObjectMap) ((SegmentGenerationStageSpec) terminalStageSpec).getWorkerInfo( workerInputs, segmentsToGenerate ); @@ -1742,14 +1743,41 @@ private static QueryDefinition makeQueryDefinition( } if (MSQControllerTask.isIngestion(querySpec)) { - DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); - TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec(); - if (terminalStageSpec instanceof SegmentGenerationStageSpec) { - return ((SegmentGenerationStageSpec) terminalStageSpec).constructFinalStage(queryId, queryDef, querySpec, jsonMapper); - } else { - throw DruidException.defensive("Unknown segment generation strategy [%s]", terminalStageSpec); + // Find the stage that provides shuffled input to the final segment-generation stage. + StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); + + while (!finalShuffleStageDef.doesShuffle() + && InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) { + finalShuffleStageDef = queryDef.getStageDefinition( + Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs())) + ); + } + + if (!finalShuffleStageDef.doesShuffle()) { + finalShuffleStageDef = null; } + // Add all query stages. + // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + + for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { + if (stageDef.equals(finalShuffleStageDef)) { + builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true)); + } else { + builder.add(StageDefinition.builder(stageDef)); + } + } + + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + return builder.add( + destination.getTerminalStageSpec() + .constructFinalStage( + queryDef, + querySpec, + jsonMapper) + ) + .build(); } else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) { return queryDef; } else if (MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java index 95fcc1003227..131d926288b5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java @@ -28,13 +28,12 @@ import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory; -import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.input.stage.ReadablePartition; import org.apache.druid.msq.input.stage.StageInputSlice; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.kernel.QueryDefinition; -import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageDefinitionBuilder; import org.apache.druid.msq.kernel.controller.WorkerInputs; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; @@ -61,45 +60,19 @@ public static SegmentGenerationStageSpec instance() return INSTANCE; } - public QueryDefinition constructFinalStage(String queryId, QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper) + @Override + public StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature(); final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy(); - // Find the stage that provides shuffled input to the final segment-generation stage. - StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); - - while (!finalShuffleStageDef.doesShuffle() - && InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs()).size() == 1) { - finalShuffleStageDef = queryDef.getStageDefinition( - Iterables.getOnlyElement(InputSpecs.getStageNumbers(finalShuffleStageDef.getInputSpecs())) - ); - } - - if (!finalShuffleStageDef.doesShuffle()) { - finalShuffleStageDef = null; - } - - // Add all query stages. - // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); - - for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { - if (stageDef.equals(finalShuffleStageDef)) { - builder.add(StageDefinition.builder(stageDef).shuffleCheckHasMultipleValues(true)); - } else { - builder.add(StageDefinition.builder(stageDef)); - } - } - - // Then, add a segment-generation stage. + // Add a segment-generation stage. final DataSchema dataSchema = SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); - builder.add( - StageDefinition.builder(queryDef.getNextStageNumber()) + return StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) .maxWorkerCount(tuningConfig.getMaxNumWorkers()) .processorFactory( @@ -108,13 +81,10 @@ public QueryDefinition constructFinalStage(String queryId, QueryDefinition query columnMappings, tuningConfig ) - ) ); - - return builder.build(); } - public Int2ObjectMap> makeSegmentGeneratorWorkerFactoryInfos( + public Int2ObjectMap> getWorkerInfo( final WorkerInputs workerInputs, @Nullable final List segmentsToGenerate ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java index 798fd825b066..6bae67954fdf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java @@ -21,6 +21,10 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.StageDefinitionBuilder; /** * Determines the final stage of a {@link DataSourceMSQDestination}. @@ -31,4 +35,5 @@ }) public interface TerminalStageSpec { + StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java index e7521b3fd47d..b0afcdb4012b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java @@ -20,7 +20,9 @@ package org.apache.druid.msq.indexing.destination; +import com.fasterxml.jackson.core.JsonProcessingException; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.granularity.Granularities; import org.junit.Assert; import org.junit.Test; @@ -38,9 +40,15 @@ public void testEquals() } @Test - public void testBackwardCompatibility() + public void testBackwardCompatibility() throws JsonProcessingException { DataSourceMSQDestination destination = new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null); Assert.assertEquals(SegmentGenerationStageSpec.instance(), destination.getTerminalStageSpec()); + + DataSourceMSQDestination dataSourceMSQDestination = new DefaultObjectMapper().readValue( + "{\"type\":\"dataSource\",\"dataSource\":\"datasource1\",\"segmentGranularity\":\"DAY\",\"rowsInTaskReport\":0,\"destinationResource\":{\"empty\":false,\"present\":true}}", + DataSourceMSQDestination.class + ); + Assert.assertEquals(SegmentGenerationStageSpec.instance(), dataSourceMSQDestination.getTerminalStageSpec()); } } From 29406f4bfb9c7a508d216f92b3d2faf14849b159 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 13 Aug 2024 11:29:42 +0530 Subject: [PATCH 13/13] Fix injection --- .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 2a772171d06a..e3db4e4fd17b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.sql; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; @@ -85,15 +86,13 @@ public class MSQTaskSqlEngine implements SqlEngine private final ObjectMapper jsonMapper; private final MSQTerminalStageSpecFactory terminalStageSpecFactory; - public MSQTaskSqlEngine( - final OverlordClient overlordClient, - final ObjectMapper jsonMapper - ) + @Inject + public MSQTaskSqlEngine(final OverlordClient overlordClient, final ObjectMapper jsonMapper) { - this(overlordClient, jsonMapper, null); + this(overlordClient, jsonMapper, new MSQTerminalStageSpecFactory()); } - @Inject + @VisibleForTesting public MSQTaskSqlEngine( final OverlordClient overlordClient, final ObjectMapper jsonMapper,