diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/BroadcastTablesTooLargeFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/BroadcastTablesTooLargeFault.java index 6912fefa6b65..31337cbcfe69 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/BroadcastTablesTooLargeFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/BroadcastTablesTooLargeFault.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java index cbb79c45702b..b47a450aa000 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java @@ -38,11 +38,11 @@ import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.query.DataSource; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.Query; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.SegmentReference; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import java.io.IOException; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 21848813e5d3..4acacdf53bec 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -46,6 +46,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContext; @@ -64,7 +65,6 @@ import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; @@ -212,10 +212,13 @@ public static DataSourcePlan forDataSource( broadcast ); } else if (dataSource instanceof JoinDataSource) { - final JoinAlgorithm preferredJoinAlgorithm = PlannerContext.getJoinAlgorithm(queryContext); + JoinDataSource joinDataSource = (JoinDataSource) dataSource; + final JoinAlgorithm preferredJoinAlgorithm = joinDataSource.getPreferredJoinAlgorithm() != null + ? joinDataSource.getPreferredJoinAlgorithm() + : PlannerContext.getJoinAlgorithm(queryContext); final JoinAlgorithm deducedJoinAlgorithm = deduceJoinAlgorithm( preferredJoinAlgorithm, - ((JoinDataSource) dataSource) + joinDataSource ); switch (deducedJoinAlgorithm) { @@ -223,7 +226,7 @@ public static DataSourcePlan forDataSource( return forBroadcastHashJoin( queryKitSpec, queryContext, - (JoinDataSource) dataSource, + joinDataSource, querySegmentSpec, filter, filterFields, @@ -234,7 +237,7 @@ public static DataSourcePlan forDataSource( case SORT_MERGE: return forSortMergeJoin( queryKitSpec, - (JoinDataSource) dataSource, + joinDataSource, querySegmentSpec, minStageNumber, broadcast diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 91a1983bfd68..b5c9675a411c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -50,6 +50,7 @@ import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.OrderBy; import org.apache.druid.query.Query; @@ -81,7 +82,6 @@ import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.util.CalciteTests; import org.hamcrest.CoreMatchers; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java index 55c6c48c1afe..3d26444a3d27 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java @@ -26,9 +26,9 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessorTest.java index 4270fe8bdccc..ee5db19daa2a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessorTest.java @@ -42,6 +42,7 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.query.DataSource; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; @@ -50,7 +51,6 @@ import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinType; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java index 30724dcd4ce0..42ee5bac3528 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java @@ -21,11 +21,11 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.sql.calcite.BaseCalciteQueryTest; import org.apache.druid.sql.calcite.CalciteJoinQueryTest; import org.apache.druid.sql.calcite.QueryTestBuilder; import org.apache.druid.sql.calcite.SqlTestFrameworkConfig; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import java.util.Map; diff --git a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq new file mode 100644 index 000000000000..3d6a1b1cc12c --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msqJoinHint.iq @@ -0,0 +1,759 @@ +!use druidtest://?componentSupplier=DrillWindowQueryMSQComponentSupplier +!set outputformat mysql + +select w1.cityName, w2.countryName +from (select cityName, countryName from wikipedia where cityName='New York') w1 +JOIN wikipedia w2 ON w1.cityName = w2.cityName +where w1.cityName='New York'; + + + + + + + +!hints + +[ { + "stageNumber" : 0, + "definition" : { + "id" : "_0", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ], + "filter" : { + "type" : "equals", + "column" : "cityName", + "matchValueType" : "STRING", + "matchValue" : "New York" + }, + "filterFields" : [ "cityName" ] + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "filter" : { + "type" : "equals", + "column" : "cityName", + "matchValueType" : "STRING", + "matchValue" : "New York" + }, + "columns" : [ "cityName" ], + "context" : { + "scanSignature" : "[{\"name\":\"cityName\",\"type\":\"STRING\"}]", + "sqlInsertSegmentGranularity" : null, + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "cityName", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +}, { + "stageNumber" : 1, + "definition" : { + "id" : "_1", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "columns" : [ "cityName", "countryName" ], + "context" : { + "scanSignature" : "[{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryName\",\"type\":\"STRING\"}]", + "sqlInsertSegmentGranularity" : null, + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING", "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "cityName", + "type" : "STRING" + }, { + "name" : "countryName", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +}, { + "stageNumber" : 2, + "definition" : { + "id" : "_2", + "input" : [ { + "type" : "stage", + "stage" : 0 + }, { + "type" : "stage", + "stage" : 1 + } ], + "broadcast" : [ 1 ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "join", + "left" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "right" : { + "type" : "inputNumber", + "inputNumber" : 1 + }, + "rightPrefix" : "j0.", + "condition" : "(\"cityName\" == \"j0.cityName\")", + "joinType" : "INNER", + "preferredJoinAlgorithm" : null + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "virtualColumns" : [ { + "type" : "expression", + "name" : "v0", + "expression" : "'New York'", + "outputType" : "STRING" + } ], + "resultFormat" : "compactedList", + "columns" : [ "j0.countryName", "v0" ], + "context" : { + "__user" : null, + "finalize" : true, + "maxParseExceptions" : 0, + "scanSignature" : "[{\"name\":\"j0.countryName\",\"type\":\"STRING\"},{\"name\":\"v0\",\"type\":\"STRING\"}]", + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING", "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "j0.countryName", + "type" : "STRING" + }, { + "name" : "v0", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +} ] +!msqPlan + +select /*+ broadcast(w1, w2) */ w1.cityName, w2.countryName +from (select cityName, countryName from wikipedia where cityName='New York') w1 +JOIN wikipedia w2 ON w1.cityName = w2.cityName +where w1.cityName='New York'; + +LogicalJoin:[[broadcast inheritPath:[0, 0] options:[w1, w2]]] + +!hints + +[ { + "stageNumber" : 0, + "definition" : { + "id" : "_0", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ], + "filter" : { + "type" : "equals", + "column" : "cityName", + "matchValueType" : "STRING", + "matchValue" : "New York" + }, + "filterFields" : [ "cityName" ] + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "filter" : { + "type" : "equals", + "column" : "cityName", + "matchValueType" : "STRING", + "matchValue" : "New York" + }, + "columns" : [ "cityName" ], + "context" : { + "scanSignature" : "[{\"name\":\"cityName\",\"type\":\"STRING\"}]", + "sqlInsertSegmentGranularity" : null, + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "cityName", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +}, { + "stageNumber" : 1, + "definition" : { + "id" : "_1", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "columns" : [ "cityName", "countryName" ], + "context" : { + "scanSignature" : "[{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryName\",\"type\":\"STRING\"}]", + "sqlInsertSegmentGranularity" : null, + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING", "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "cityName", + "type" : "STRING" + }, { + "name" : "countryName", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +}, { + "stageNumber" : 2, + "definition" : { + "id" : "_2", + "input" : [ { + "type" : "stage", + "stage" : 0 + }, { + "type" : "stage", + "stage" : 1 + } ], + "broadcast" : [ 1 ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "join", + "left" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "right" : { + "type" : "inputNumber", + "inputNumber" : 1 + }, + "rightPrefix" : "j0.", + "condition" : "(\"cityName\" == \"j0.cityName\")", + "joinType" : "INNER", + "preferredJoinAlgorithm" : null + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "virtualColumns" : [ { + "type" : "expression", + "name" : "v0", + "expression" : "'New York'", + "outputType" : "STRING" + } ], + "resultFormat" : "compactedList", + "columns" : [ "j0.countryName", "v0" ], + "context" : { + "__user" : null, + "finalize" : true, + "maxParseExceptions" : 0, + "scanSignature" : "[{\"name\":\"j0.countryName\",\"type\":\"STRING\"},{\"name\":\"v0\",\"type\":\"STRING\"}]", + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING", "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "j0.countryName", + "type" : "STRING" + }, { + "name" : "v0", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +} ] +!msqPlan + +select /*+ sort_merge(w1, w2) */ w1.cityName, w2.countryName +from (select cityName, countryName from wikipedia where cityName='New York') w1 +JOIN wikipedia w2 ON w1.cityName = w2.cityName +where w1.cityName='New York'; + + +LogicalJoin:[[sort_merge inheritPath:[0, 0] options:[w1, w2]]] + +!hints + +[ { + "stageNumber" : 0, + "definition" : { + "id" : "_0", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ], + "filter" : { + "type" : "equals", + "column" : "cityName", + "matchValueType" : "STRING", + "matchValue" : "New York" + }, + "filterFields" : [ "cityName" ] + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "filter" : { + "type" : "equals", + "column" : "cityName", + "matchValueType" : "STRING", + "matchValue" : "New York" + }, + "columns" : [ "cityName" ], + "context" : { + "scanSignature" : "[{\"name\":\"cityName\",\"type\":\"STRING\"}]", + "sqlInsertSegmentGranularity" : null, + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "cityName", + "type" : "STRING" + }, { + "name" : "__boost", + "type" : "LONG" + } ], + "shuffleSpec" : { + "type" : "hash", + "clusterBy" : { + "columns" : [ { + "columnName" : "cityName", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "hashLocalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +}, { + "stageNumber" : 1, + "definition" : { + "id" : "_1", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "columns" : [ "cityName", "countryName" ], + "context" : { + "scanSignature" : "[{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryName\",\"type\":\"STRING\"}]", + "sqlInsertSegmentGranularity" : null, + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING", "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "cityName", + "type" : "STRING" + }, { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "countryName", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "hash", + "clusterBy" : { + "columns" : [ { + "columnName" : "cityName", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "hashLocalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +}, { + "stageNumber" : 2, + "definition" : { + "id" : "_2", + "input" : [ { + "type" : "stage", + "stage" : 0 + }, { + "type" : "stage", + "stage" : 1 + } ], + "processor" : { + "type" : "sortMergeJoin", + "rightPrefix" : "j0.", + "condition" : "(\"cityName\" == \"j0.cityName\")", + "joinType" : "INNER" + }, + "signature" : [ { + "name" : "cityName", + "type" : "STRING" + }, { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "j0.cityName", + "type" : "STRING" + }, { + "name" : "j0.__boost", + "type" : "LONG" + }, { + "name" : "j0.countryName", + "type" : "STRING" + } ], + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ +}, { + "stageNumber" : 3, + "definition" : { + "id" : "_3", + "input" : [ { + "type" : "stage", + "stage" : 2 + } ], + "processor" : { + "type" : "scan", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "virtualColumns" : [ { + "type" : "expression", + "name" : "v0", + "expression" : "'New York'", + "outputType" : "STRING" + } ], + "resultFormat" : "compactedList", + "columns" : [ "j0.countryName", "v0" ], + "context" : { + "__user" : null, + "finalize" : true, + "maxParseExceptions" : 0, + "scanSignature" : "[{\"name\":\"j0.countryName\",\"type\":\"STRING\"},{\"name\":\"v0\",\"type\":\"STRING\"}]", + "sqlQueryId" : __SQL_QUERY_ID__ + "sqlStringifyArrays" : false + }, + "columnTypes" : [ "STRING", "STRING" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "signature" : [ { + "name" : "__boost", + "type" : "LONG" + }, { + "name" : "j0.countryName", + "type" : "STRING" + }, { + "name" : "v0", + "type" : "STRING" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "__boost", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1 + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : __TIMESTAMP__ + "duration" : __DURATION__ + "sort" : true +} ] +!msqPlan diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/JoinAlgorithm.java b/processing/src/main/java/org/apache/druid/query/JoinAlgorithm.java similarity index 97% rename from sql/src/main/java/org/apache/druid/sql/calcite/planner/JoinAlgorithm.java rename to processing/src/main/java/org/apache/druid/query/JoinAlgorithm.java index 2e53795c4ffa..94cd209e5780 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/JoinAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/JoinAlgorithm.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.sql.calcite.planner; +package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index 220f18a94855..976ab78728b8 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -96,6 +96,8 @@ public class JoinDataSource implements DataSource private final DimFilter leftFilter; @Nullable private final JoinableFactoryWrapper joinableFactoryWrapper; + @Nullable + private final JoinAlgorithm preferredJoinAlgorithm; private static final Logger log = new Logger(JoinDataSource.class); private final DataSourceAnalysis analysis; @@ -106,7 +108,8 @@ private JoinDataSource( JoinConditionAnalysis conditionAnalysis, JoinType joinType, @Nullable DimFilter leftFilter, - @Nullable JoinableFactoryWrapper joinableFactoryWrapper + @Nullable JoinableFactoryWrapper joinableFactoryWrapper, + @Nullable JoinAlgorithm preferredJoinAlgorithm ) { this.left = Preconditions.checkNotNull(left, "left"); @@ -116,6 +119,7 @@ private JoinDataSource( this.joinType = Preconditions.checkNotNull(joinType, "joinType"); this.leftFilter = validateLeftFilter(left, leftFilter); this.joinableFactoryWrapper = joinableFactoryWrapper; + this.preferredJoinAlgorithm = preferredJoinAlgorithm; this.analysis = this.getAnalysisForDataSource(); } @@ -123,7 +127,7 @@ private JoinDataSource( /** * Create a join dataSource from a string condition. */ - @JsonCreator + public static JoinDataSource create( @JsonProperty("left") DataSource left, @JsonProperty("right") DataSource right, @@ -134,6 +138,22 @@ public static JoinDataSource create( @JacksonInject ExprMacroTable macroTable, @Nullable @JacksonInject JoinableFactoryWrapper joinableFactoryWrapper ) + { + return create(left, right, rightPrefix, condition, joinType, leftFilter, macroTable, joinableFactoryWrapper, null); + } + + @JsonCreator + public static JoinDataSource create( + @JsonProperty("left") DataSource left, + @JsonProperty("right") DataSource right, + @JsonProperty("rightPrefix") String rightPrefix, + @JsonProperty("condition") String condition, + @JsonProperty("joinType") JoinType joinType, + @Nullable @JsonProperty("leftFilter") DimFilter leftFilter, + @JacksonInject ExprMacroTable macroTable, + @Nullable @JacksonInject JoinableFactoryWrapper joinableFactoryWrapper, + @Nullable @JsonProperty("preferredJoinAlgorithm") JoinAlgorithm preferredJoinAlgorithm + ) { return new JoinDataSource( left, @@ -146,7 +166,8 @@ public static JoinDataSource create( ), joinType, leftFilter, - joinableFactoryWrapper + joinableFactoryWrapper, + preferredJoinAlgorithm ); } @@ -170,10 +191,33 @@ public static JoinDataSource create( conditionAnalysis, joinType, leftFilter, - joinableFactoryWrapper + joinableFactoryWrapper, + null ); } + public static JoinDataSource create( + final DataSource left, + final DataSource right, + final String rightPrefix, + final JoinConditionAnalysis conditionAnalysis, + final JoinType joinType, + final DimFilter leftFilter, + @Nullable final JoinableFactoryWrapper joinableFactoryWrapper, + @Nullable final JoinAlgorithm preferredJoinAlgorithm + ) + { + return new JoinDataSource( + left, + right, + rightPrefix, + conditionAnalysis, + joinType, + leftFilter, + joinableFactoryWrapper, + preferredJoinAlgorithm + ); + } @Override public Set getTableNames() @@ -253,7 +297,8 @@ public DataSource withChildren(List children) conditionAnalysis, joinType, leftFilter, - joinableFactoryWrapper + joinableFactoryWrapper, + preferredJoinAlgorithm ); } @@ -363,6 +408,13 @@ public DataSourceAnalysis getAnalysis() return analysis; } + @Nullable + @JsonProperty + public JoinAlgorithm getPreferredJoinAlgorithm() + { + return preferredJoinAlgorithm; + } + @Override public boolean equals(Object o) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 8eb9541961c6..a074abcc1a70 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -38,6 +38,11 @@ import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.hint.HintPredicate; +import org.apache.calcite.rel.hint.HintPredicates; +import org.apache.calcite.rel.hint.HintStrategyTable; +import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; @@ -60,12 +65,14 @@ import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.ValidationException; import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; import javax.annotation.Nullable; import java.io.Reader; import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.stream.Collectors; /** * Calcite planner. Clone of Calcite's @@ -295,7 +302,8 @@ public RelRoot rel(SqlNode sql) rexBuilder ); final SqlToRelConverter.Config config = - sqlToRelConverterConfig.withTrimUnusedFields(false); + sqlToRelConverterConfig.withTrimUnusedFields(false) + .withHintStrategyTable(HintTools.HINT_STRATEGY_TABLE); final SqlToRelConverter sqlToRelConverter = new DruidSqlToRelConverter(this, validator, createCatalogReader(), cluster, convertletTable, config @@ -502,4 +510,41 @@ void from(CalcitePlanner planner) + " to " + this); } } + + /** Define some tool members and methods for hints. */ + private static class HintTools + { + static final HintStrategyTable HINT_STRATEGY_TABLE = createHintStrategies(); + + /** + * Creates hint strategies. + * + * @return HintStrategyTable instance + */ + private static HintStrategyTable createHintStrategies() + { + return HintStrategyTable.builder() + .hintStrategy("broadcast", HintPredicates.JOIN) + .hintStrategy("sort_merge", HintPredicates.JOIN) + //.hintStrategy("sort_merge", HintPredicates.and(HintPredicates.JOIN, joinWithFixedTableName())) + .build(); + } + + /** Returns a {@link HintPredicate} for join with specified table references. */ + private static HintPredicate joinWithFixedTableName() + { + return (hint, rel) -> { + if (!(rel instanceof LogicalJoin)) { + return false; + } + LogicalJoin join = (LogicalJoin) rel; + final List tableNames = hint.listOptions; + final List inputTables = join.getInputs().stream() + .filter(input -> input instanceof TableScan) + .map(scan -> Util.last(scan.getTable().getQualifiedName())) + .collect(Collectors.toList()); + return tableNames.equals(inputTables); + }; + } + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java index d6dd1310e6c5..ea4efd16d91a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java @@ -50,6 +50,7 @@ import org.apache.calcite.tools.Programs; import org.apache.calcite.tools.RelBuilder; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.sql.calcite.external.ExternalTableScanRule; import org.apache.druid.sql.calcite.rule.AggregatePullUpLookupRule; import org.apache.druid.sql.calcite.rule.CaseToCoalesceRule; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/JoinHint.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/JoinHint.java new file mode 100644 index 000000000000..b4201cd13675 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/JoinHint.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.JoinAlgorithm; + +import java.util.Arrays; + +public enum JoinHint +{ + SORT_MERGE("sort_merge") { + @Override + public JoinAlgorithm getJoinAlgorithm() + { + return JoinAlgorithm.SORT_MERGE; + } + }, + BROADCAST("broadcast") { + @Override + public JoinAlgorithm getJoinAlgorithm() + { + return JoinAlgorithm.BROADCAST; + } + }; + + private final String id; + + JoinHint(String id) + { + this.id = id; + } + + @JsonCreator + public static JoinHint fromString(final String id) + { + for (final JoinHint value : values()) { + if (value.id.equals(id)) { + return value; + } + } + + throw new IAE("No such join hint [%s]. Supported values are: %s", id, Arrays.toString(values())); + } + + @JsonValue + public String getId() + { + return id; + } + + /** + * Whether this join algorithm requires subqueries for all inputs. + */ + public abstract JoinAlgorithm getJoinAlgorithm(); + + @Override + public String toString() + { + return id; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index cd2e9401954e..c41971d3a671 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.filter.InDimFilter; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryUtils.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryUtils.java index e0a5cce94187..82f694cdebf1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryUtils.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryUtils.java @@ -19,6 +19,8 @@ package org.apache.druid.sql.calcite.planner; +import org.apache.calcite.rel.core.Join; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.sql.calcite.rel.DruidQuery; import java.util.ArrayList; @@ -55,4 +57,13 @@ public static List buildColumnMappings( return columnMappings; } + + public static JoinAlgorithm getJoinAlgorithm(Join join, PlannerContext plannerContext) + { + if (join.getHints().isEmpty()) { + return plannerContext.getJoinAlgorithm(); + } + + return JoinHint.fromString(join.getHints().get(0).hintName).getJoinAlgorithm(); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryValidations.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryValidations.java index c516496af31e..27c5ca7e9e82 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryValidations.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryValidations.java @@ -25,6 +25,7 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.tools.ValidationException; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.sql.calcite.run.EngineFeature; /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java index 5615da3344fe..312ba1885dd9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java @@ -55,6 +55,7 @@ import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.QueryUtils; import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc; import org.apache.druid.sql.calcite.table.RowSignatures; @@ -151,7 +152,7 @@ private SourceDesc buildLeftSourceDesc() final DruidQuery leftQuery = Preconditions.checkNotNull(leftDruidRel.toDruidQuery(false), "leftQuery"); final RowSignature leftSignature = leftQuery.getOutputRowSignature(); final DataSource leftDataSource; - if (computeLeftRequiresSubquery(getPlannerContext(), leftDruidRel)) { + if (computeLeftRequiresSubquery(getPlannerContext(), leftDruidRel, joinRel)) { leftDataSource = new QueryDataSource(leftQuery.getQuery()); if (leftFilter != null) { throw new ISE("Filter on left table is supposed to be null if left child is a query source"); @@ -225,7 +226,8 @@ public static SourceDesc buildJoinSourceDesc(final SourceDesc leftDesc, final So ), toDruidJoinType(joinRel.getJoinType()), getDimFilter(plannerContext, leftDesc.rowSignature, leftFilter), - plannerContext.getJoinableFactoryWrapper() + plannerContext.getJoinableFactoryWrapper(), + QueryUtils.getJoinAlgorithm(joinRel, plannerContext) ); SourceDesc sourceDesc = new SourceDesc(joinDataSource, signature, virtualColumnRegistry); @@ -360,7 +362,7 @@ public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadata joinCost *= CostEstimates.MULTIPLIER_OUTER_QUERY; } else { // Penalize subqueries if we don't have to do them. - if (computeLeftRequiresSubquery(getPlannerContext(), getSomeDruidChild(left))) { + if (computeLeftRequiresSubquery(getPlannerContext(), getSomeDruidChild(left), joinRel)) { joinCost += CostEstimates.COST_SUBQUERY; } else { if (joinRel.getJoinType() == JoinRelType.INNER && plannerConfig.isComputeInnerJoinCostAsFilter()) { @@ -400,9 +402,9 @@ public static JoinType toDruidJoinType(JoinRelType calciteJoinType) } } - public static boolean computeLeftRequiresSubquery(final PlannerContext plannerContext, final DruidRel left) + public static boolean computeLeftRequiresSubquery(final PlannerContext plannerContext, final DruidRel left, final Join joinRel) { - if (plannerContext.getJoinAlgorithm().requiresSubquery()) { + if (QueryUtils.getJoinAlgorithm(joinRel, plannerContext).requiresSubquery()) { return true; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java index 8d6230afcc8f..d76230da2a5e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java @@ -49,8 +49,10 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.LookupDataSource; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.QueryUtils; import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel; import org.apache.druid.sql.calcite.rel.DruidQueryRel; import org.apache.druid.sql.calcite.rel.DruidRel; @@ -130,7 +132,8 @@ public void onMatch(RelOptRuleCall call) plannerContext.setPlanningError(conditionAnalysis.errorStr); final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel); - if (!plannerContext.getJoinAlgorithm().requiresSubquery() + final JoinAlgorithm joinAlgorithm = QueryUtils.getJoinAlgorithm(join, plannerContext); + if (!joinAlgorithm.requiresSubquery() && left.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT && (isLeftDirectAccessPossible || left.getPartialDruidQuery().getWhereFilter() == null)) { // Swap the left-side projection above the join, so the left side is a simple scan or mapping. This helps us @@ -153,7 +156,7 @@ public void onMatch(RelOptRuleCall call) leftFilter = null; } - if (!plannerContext.getJoinAlgorithm().requiresSubquery() + if (!joinAlgorithm.requiresSubquery() && right.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT && right.getPartialDruidQuery().getWhereFilter() == null && !right.getPartialDruidQuery().getSelectProject().isMapping() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java index 97e81c2ef5c3..9bf4fac8e1e7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.run; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.sql.calcite.external.ExternalDataSource; /** @@ -103,7 +104,7 @@ public enum EngineFeature UNNEST, /** - * Planner is permitted to use {@link org.apache.druid.sql.calcite.planner.JoinAlgorithm#BROADCAST} with RIGHT + * Planner is permitted to use {@link JoinAlgorithm#BROADCAST} with RIGHT * and FULL join. Not guaranteed to produce correct results in either the native or MSQ engines, but we allow * it in native for two reasons: legacy (the docs caution against it, but it's always been allowed), and the fact * that it actually *does* generate correct results in native when the join is processed on the Broker. It is much diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index 4f3d86b1b420..16228a19e127 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -27,12 +27,12 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.destination.IngestDestination; diff --git a/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java b/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java index a183ba9ecad1..5bf6d18050f5 100644 --- a/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java +++ b/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java @@ -26,7 +26,9 @@ import net.hydromatic.quidem.CommandHandler; import net.hydromatic.quidem.Quidem.SqlCommand; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelHomogeneousShuttle; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.hint.Hintable; import org.apache.calcite.runtime.Hook; import org.apache.calcite.sql.SqlExplainFormat; import org.apache.calcite.sql.SqlExplainLevel; @@ -59,6 +61,9 @@ public Command parseCommand(List lines, List content, String lin if (line.startsWith("druidPlan")) { return new DruidPlanCommand(lines, content); } + if (line.startsWith("hints")) { + return new HintPlanCommand(lines, content); + } if (line.startsWith("nativePlan")) { return new NativePlanCommand(lines, content); } @@ -202,10 +207,16 @@ protected final void executeExplain(Context context) throws IOException if (node instanceof DruidRel) { node = ((DruidRel) node).unwrapLogicalPlan(); } - String str = RelOptUtil.dumpPlan("", node, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + String str = convertRelToString(node); context.echo(ImmutableList.of(str)); } } + + protected String convertRelToString(RelNode node) + { + String str = RelOptUtil.dumpPlan("", node, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + return str; + } } /** @@ -245,6 +256,54 @@ static class DruidPlanCommand extends AbstractRelPlanCommand } } + static class HintPlanCommand extends AbstractRelPlanCommand + { + HintPlanCommand(List lines, List content) + { + super(lines, content, DruidHook.DRUID_PLAN); + } + + @Override + protected String convertRelToString(RelNode node) + { + final HintCollector collector = new HintCollector(); + node.accept(collector); + return collector.getCollectedHintsAsString(); + } + + private static class HintCollector extends RelHomogeneousShuttle + { + private final List hintsCollect; + + HintCollector() + { + this.hintsCollect = new ArrayList<>(); + } + + @Override + public RelNode visit(RelNode relNode) + { + if (relNode instanceof Hintable) { + Hintable hintableRelNode = (Hintable) relNode; + if (!hintableRelNode.getHints().isEmpty()) { + this.hintsCollect.add(relNode.getClass().getSimpleName() + ":" + hintableRelNode.getHints()); + } + } + return super.visit(relNode); + } + + public String getCollectedHintsAsString() + { + StringBuilder builder = new StringBuilder(); + for (String hintLine : hintsCollect) { + builder.append(hintLine).append("\n"); + } + + return builder.toString(); + } + } + } + static class ConvertedPlanCommand extends AbstractRelPlanCommand { ConvertedPlanCommand(List lines, List content) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java index fa76ba3a3c90..7b9ed21f89e9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java @@ -31,9 +31,9 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.QueryContext; import org.apache.druid.sql.calcite.planner.DruidTypeSystem; -import org.apache.druid.sql.calcite.planner.JoinAlgorithm; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Before;