diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 63e9935c486649..56902e3823936d 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -504,6 +504,8 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::JDBC_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::ODBC_SCAN_NODE, nodes); } void ExecNode::init_runtime_profile(const std::string& name) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 3cd552e4792517..ceaf70ac123237 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -100,6 +100,7 @@ #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/scan/vmeta_scan_node.h" diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 75ca717050b8a4..a719081496b05b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -38,8 +38,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; -import org.apache.doris.planner.external.jdbc.JdbcScanNode; -import org.apache.doris.planner.external.odbc.OdbcScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TPartitionType; @@ -281,12 +279,8 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment) * TODO: hbase scans are range-partitioned on the row key */ private PlanFragment createScanFragment(PlanNode node) throws UserException { - if (node instanceof MysqlScanNode || node instanceof OdbcScanNode || node instanceof JdbcScanNode) { + if (node instanceof MysqlScanNode) { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED); - } else if (node instanceof SchemaScanNode) { - return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); - } else if (node instanceof DataGenScanNode) { - return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); } else if (node instanceof OlapScanNode) { // olap scan node OlapScanNode olapScanNode = (OlapScanNode) node; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java index 3bcbfb8b4dab7c..4854ce5a00b10c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.ExternalScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.statistics.query.StatsDelta; @@ -272,7 +273,8 @@ protected String debugString() { @Override public int getNumInstances() { - return 1; + return ConnectContext.get().getSessionVariable().getEnablePipelineEngine() + ? ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java index 2bac81218ff7b9..832922ef8197ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.ExternalScanNode; import org.apache.doris.planner.external.jdbc.JdbcScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.statistics.query.StatsDelta; @@ -213,4 +214,10 @@ public StatsDelta genStatsDelta() throws AnalysisException { Env.getCurrentEnv().getCurrentCatalog().getDbOrAnalysisException(tbl.getQualifiedDbName()).getId(), tbl.getId(), -1L); } + + @Override + public int getNumInstances() { + return ConnectContext.get().getSessionVariable().getEnablePipelineEngine() + ? ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1; + } }