diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java old mode 100644 new mode 100755 index 36be17c9266445..1ec958df508060 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -740,6 +740,9 @@ private DataSink createDataSink() throws AnalysisException { return dataSink; } if (targetTable instanceof OlapTable) { + if (targetPartitionIds.size() <= 0) { + throw new AnalysisException("no target partition found."); + } dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds); dataPartition = dataSink.getOutputPartition(); } else if (targetTable instanceof BrokerTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java old mode 100644 new mode 100755 index 803e4352b16315..9fb4ce8ab52909 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -127,6 +127,9 @@ public void plan(TUniqueId loadId, List> fileStatusesLis // 2. Olap table sink List partitionIds = getAllPartitionIds(); + if (partitionIds.size() <= 0) { + throw new UserException("no partition found in file groups."); + } OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionIds); olapTableSink.init(loadId, txnId, dbId, timeoutS); olapTableSink.complete(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java old mode 100644 new mode 100755 index 9e779b6d674640..7abe9686c676d2 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -135,6 +135,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { // create dest sink List partitionIds = getAllPartitionIds(); + if (partitionIds.size() <= 0) { + throw new UserException("no partition found in file groups."); + } OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds); olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout()); olapTableSink.complete(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java old mode 100644 new mode 100755 index 35b545c5af8196..7f5d9d8d34d6bf --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java @@ -26,7 +26,11 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.RandomDistributionInfo; +import org.apache.doris.catalog.KeysType; import org.apache.doris.common.UserException; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.task.StreamLoadTask; @@ -41,52 +45,42 @@ import org.junit.Test; import java.io.StringReader; +import java.util.LinkedList; import java.util.List; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; +import org.apache.doris.thrift.TStorageType; public class StreamLoadPlannerTest { @Injectable Database db; - @Injectable - OlapTable destTable; - - @Mocked - StreamLoadScanNode scanNode; - - @Mocked - OlapTableSink sink; - @Test public void testNormalPlan() throws UserException { - List columns = Lists.newArrayList(); - Column c1 = new Column("c1", PrimitiveType.BIGINT, false); - columns.add(c1); - Column c2 = new Column("c2", PrimitiveType.BIGINT, true); - columns.add(c2); - new Expectations() { - { - destTable.getBaseSchema(); - minTimes = 0; - result = columns; - scanNode.init((Analyzer) any); - minTimes = 0; - scanNode.getChildren(); - minTimes = 0; - result = Lists.newArrayList(); - scanNode.getId(); - minTimes = 0; - result = new PlanNodeId(5); - } - }; + MaterializedIndex baseIndex = new MaterializedIndex(30001, MaterializedIndex.IndexState.NORMAL); + RandomDistributionInfo distributionInfo = new RandomDistributionInfo(10); + Partition partition = new Partition(20000L, "testTbl", baseIndex, distributionInfo); + List baseSchema = new LinkedList(); + Column column = new Column(); + baseSchema.add(column); + SinglePartitionInfo info = new SinglePartitionInfo(); + info.setReplicationNum(partition.getId(), (short) 3); + OlapTable destTable = new OlapTable(30000, "testTbl", baseSchema, + KeysType.AGG_KEYS, info, distributionInfo); + destTable.setIndexMeta(baseIndex.getId(), "testTbl", baseSchema, 0, 1, (short) 1, + TStorageType.COLUMN, KeysType.AGG_KEYS); + destTable.addPartition(partition); + destTable.setBaseIndexId(baseIndex.getId()); TStreamLoadPutRequest request = new TStreamLoadPutRequest(); request.setTxnId(1); + request.unsetIsTempPartition(); request.setLoadId(new TUniqueId(2, 3)); request.setFileType(TFileType.FILE_STREAM); request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN); + request.setPartitions("testTbl"); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db); StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, streamLoadTask); planner.plan(streamLoadTask.getId());