diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java b/fe/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java index 07cc6385eb5a63..3851f261ece0eb 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java @@ -44,6 +44,10 @@ public LoadColumnsInfo(List columnNames, List columnMappingList) { this.columnMappingList = columnMappingList; } + public Map getParsedExprMap() { + return parsedExprMap; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException { checkColumnNames(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 2714b63a7521f5..fa1415f4bf3738 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -75,7 +75,7 @@ public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAd // TODO(ml): I will change it after ut. @VisibleForTesting - public KafkaRoutineLoadJob(String id, String name, long dbId, long tableId, + public KafkaRoutineLoadJob(long id, String name, long dbId, long tableId, RoutineLoadDesc routineLoadDesc, int desireTaskConcurrentNum, int maxErrorNum, String serverAddress, String topic, KafkaProgress kafkaProgress) { @@ -90,6 +90,14 @@ public KafkaRoutineLoadJob(String id, String name, long dbId, long tableId, setConsumer(); } + public String getTopic() { + return topic; + } + + public String getServerAddress() { + return serverAddress; + } + private void setCustomKafkaPartitions(List kafkaPartitions) throws LoadException { writeLock(); try { @@ -123,7 +131,7 @@ public List divideRoutineLoadJob(int currentConcurrentTaskN // divide kafkaPartitions into tasks for (int i = 0; i < currentConcurrentTaskNum; i++) { try { - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID().toString(), id); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id); routineLoadTaskInfoList.add(kafkaTaskInfo); needScheduleTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 81aa67a4a69a30..5ffd7c3b3dd8e6 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -17,12 +17,26 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.task.KafkaRoutineLoadTask; import org.apache.doris.task.RoutineLoadTask; +import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TKafkaLoadInfo; +import org.apache.doris.thrift.TLoadSourceType; +import org.apache.doris.thrift.TRoutineLoadTask; +import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import com.google.common.collect.Maps; @@ -38,7 +52,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private List partitions; - public KafkaTaskInfo(String id, String jobId) throws LabelAlreadyUsedException, + public KafkaTaskInfo(UUID id, long jobId) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { super(id, jobId); this.partitions = new ArrayList<>(); @@ -46,7 +60,7 @@ public KafkaTaskInfo(String id, String jobId) throws LabelAlreadyUsedException, public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { - super(UUID.randomUUID().toString(), kafkaTaskInfo.getJobId()); + super(UUID.randomUUID(), kafkaTaskInfo.getJobId()); this.partitions = kafkaTaskInfo.getPartitions(); } @@ -58,9 +72,10 @@ public List getPartitions() { return partitions; } + // todo: reuse plan fragment of stream load @Override - public RoutineLoadTask createStreamLoadTask(long beId) throws LoadException { - RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); + public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException { + KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); Map partitionIdToOffset = Maps.newHashMap(); for (Integer partitionId : partitions) { KafkaProgress kafkaProgress = (KafkaProgress) routineLoadJob.getProgress(); @@ -69,13 +84,41 @@ public RoutineLoadTask createStreamLoadTask(long beId) throws LoadException { } partitionIdToOffset.put(partitionId, kafkaProgress.getPartitionIdToOffset().get(partitionId)); } - RoutineLoadTask routineLoadTask = new KafkaRoutineLoadTask(routineLoadJob.getResourceInfo(), - beId, routineLoadJob.getDbId(), - routineLoadJob.getTableId(), - id, txnId, partitionIdToOffset); - if (routineLoadJob.getRoutineLoadDesc() != null) { - routineLoadTask.setRoutineLoadDesc(routineLoadJob.getRoutineLoadDesc()); - } - return routineLoadTask; + + // init tRoutineLoadTask and create plan fragment + TRoutineLoadTask tRoutineLoadTask = new TRoutineLoadTask(); + TUniqueId queryId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); + tRoutineLoadTask.setId(queryId); + tRoutineLoadTask.setJob_id(jobId); + tRoutineLoadTask.setTxn_id(txnId); + Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId()); + tRoutineLoadTask.setDb(database.getFullName()); + tRoutineLoadTask.setTbl(database.getTable(routineLoadJob.getTableId()).getName()); + StringBuilder stringBuilder = new StringBuilder(); + // label = (serviceAddress_topic_partition1:offset_partition2:offset).hashcode() + String label = String.valueOf(stringBuilder.append(routineLoadJob.getServerAddress()).append("_") + .append(routineLoadJob.getTopic()).append("_") + .append(Joiner.on("_").withKeyValueSeparator(":") + .join(partitionIdToOffset)).toString().hashCode()); + tRoutineLoadTask.setLabel(label); + tRoutineLoadTask.setAuth_code(routineLoadJob.getAuthCode()); + TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo(); + tKafkaLoadInfo.setTopic((routineLoadJob).getTopic()); + tKafkaLoadInfo.setBrokers((routineLoadJob).getServerAddress()); + tKafkaLoadInfo.setPartition_begin_offset(partitionIdToOffset); + tRoutineLoadTask.setKafka_load_info(tKafkaLoadInfo); + tRoutineLoadTask.setType(TLoadSourceType.KAFKA); + tRoutineLoadTask.setParams(createTExecPlanFragmentParams(routineLoadJob)); + return tRoutineLoadTask; + } + + + private TExecPlanFragmentParams createTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { + StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadTaskInfo(this); + Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId()); + StreamLoadPlanner planner = new StreamLoadPlanner(database, + (OlapTable) database.getTable(routineLoadJob.getTableId()), + streamLoadTask); + return planner.plan(); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 90f1b1a795ce34..a41ac13f999064 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -29,35 +29,12 @@ // "numOfTotalData": "", "taskId": "", "jobId": ""} public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { - public enum LoadSourceType { - KAFKA(1); - - private final int flag; - - private LoadSourceType(int flag) { - this.flag = flag; - } - - public int value() { - return flag; - } - - public static LoadSourceType valueOf(int flag) { - switch (flag) { - case 1: - return KAFKA; - default: - return null; - } - } - } - private long jobId; private TUniqueId taskId; private long filteredRows; private long loadedRows; private RoutineLoadProgress progress; - private LoadSourceType loadSourceType; + private LoadDataSourceType loadDataSourceType; public RLTaskTxnCommitAttachment() { } @@ -70,7 +47,7 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac switch (rlTaskTxnCommitAttachment.getLoadSourceType()) { case KAFKA: - this.loadSourceType = LoadSourceType.KAFKA; + this.loadDataSourceType = LoadDataSourceType.KAFKA; this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress()); default: break; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e0302138edb26f..886428d98a37b0 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.IdGenerator; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -31,11 +32,16 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.TxnStateChangeListener; +import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendServiceImpl; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLoadTxnCommitRequest; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -107,10 +113,12 @@ public boolean isFinalState() { } - protected String id; + protected long id; protected String name; protected long dbId; protected long tableId; + // this code is used to verify be task request + protected long authCode; protected RoutineLoadDesc routineLoadDesc; // optional protected int desireTaskConcurrentNum; // optional protected JobState state; @@ -134,17 +142,23 @@ public boolean isFinalState() { protected List routineLoadTaskInfoList; protected List needScheduleTaskInfoList; + // plan fragment which will be initialized during job scheduler + protected TExecPlanFragmentParams tExecPlanFragmentParams; + protected ReentrantReadWriteLock lock; // TODO(ml): error sample public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType dataSourceType) { - this.id = UUID.randomUUID().toString(); + this.id = Catalog.getInstance().getNextId(); this.name = name; this.dbId = dbId; this.tableId = tableId; this.state = JobState.NEED_SCHEDULE; this.dataSourceType = dataSourceType; this.resourceInfo = ConnectContext.get().toResourceCtx(); + this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser()) + .append(ConnectContext.get().getRemoteIP()) + .append(id).append(System.currentTimeMillis()).toString().hashCode(); this.routineLoadTaskInfoList = new ArrayList<>(); this.needScheduleTaskInfoList = new ArrayList<>(); lock = new ReentrantReadWriteLock(true); @@ -152,7 +166,7 @@ public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType d // TODO(ml): I will change it after ut. @VisibleForTesting - public RoutineLoadJob(String id, String name, long dbId, long tableId, + public RoutineLoadJob(long id, String name, long dbId, long tableId, RoutineLoadDesc routineLoadDesc, int desireTaskConcurrentNum, LoadDataSourceType dataSourceType, int maxErrorNum) { @@ -187,7 +201,7 @@ public void writeUnlock() { lock.writeLock().unlock(); } - public String getId() { + public long getId() { return id; } @@ -227,6 +241,10 @@ public void setState(JobState state) { this.state = state; } + public long getAuthCode() { + return authCode; + } + protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException { writeLock(); try { @@ -324,7 +342,7 @@ public List processTimeoutTasks() { for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) > DEFAULT_TASK_TIMEOUT_SECONDS * 1000) { - String oldSignature = routineLoadTaskInfo.getId(); + String oldSignature = routineLoadTaskInfo.getId().toString(); // abort txn if not committed try { Catalog.getCurrentGlobalTransactionMgr() diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 4d0400fd2104c5..8f9b4f52a588df 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -55,7 +56,7 @@ public class RoutineLoadManager { private Map beIdToConcurrentTasks; // stream load job meta - private Map idToRoutineLoadJob; + private Map idToRoutineLoadJob; private Map>> dbToNameToRoutineLoadJob; private Queue needScheduleTasksQueue; @@ -328,7 +329,7 @@ public long getMinTaskBeId() throws LoadException { } } - public RoutineLoadJob getJob(String jobId) { + public RoutineLoadJob getJob(long jobId) { return idToRoutineLoadJob.get(jobId); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index d7259382b891fb..aae26a7fb87fb0 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -22,10 +22,15 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; import org.apache.doris.task.RoutineLoadTask; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; +import java.util.UUID; + /** * Routine load task info is the task info include the only id (signature). * For the kafka type of task info, it also include partitions which will be obtained data in this task. @@ -36,13 +41,14 @@ public abstract class RoutineLoadTaskInfo { private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); - protected String id; + protected UUID id; protected long txnId; - protected String jobId; + protected long jobId; private long createTimeMs; private long loadStartTimeMs; + private TExecPlanFragmentParams tExecPlanFragmentParams; - public RoutineLoadTaskInfo(String id, String jobId) throws BeginTransactionException, + public RoutineLoadTaskInfo(UUID id, long jobId) throws BeginTransactionException, LabelAlreadyUsedException, AnalysisException { this.id = id; this.jobId = jobId; @@ -50,15 +56,15 @@ public RoutineLoadTaskInfo(String id, String jobId) throws BeginTransactionExcep // begin a txn for task RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), id, -1, "streamLoad", + routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); } - public String getId() { + public UUID getId() { return id; } - public String getJobId() { + public long getJobId() { return jobId; } @@ -74,13 +80,13 @@ public long getTxnId() { return txnId; } - abstract RoutineLoadTask createStreamLoadTask(long beId) throws LoadException; + abstract TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException; @Override public boolean equals(Object obj) { if (obj instanceof RoutineLoadTaskInfo) { RoutineLoadTaskInfo routineLoadTaskInfo = (RoutineLoadTaskInfo) obj; - return this.id.equals(routineLoadTaskInfo.getId()); + return this.id.toString().equals(routineLoadTaskInfo.getId().toString()); } else { return false; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 76da250fbb190a..c78e67e1891091 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -17,17 +17,23 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.Daemon; -import org.apache.doris.task.AgentBatchTask; -import org.apache.doris.task.AgentTaskExecutor; -import org.apache.doris.task.AgentTaskQueue; -import org.apache.doris.task.RoutineLoadTask; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; /** @@ -62,13 +68,13 @@ protected void runOneCycle() { } } - private void process() throws LoadException { + private void process() throws LoadException, UserException { // update current beIdMaps for tasks routineLoadManager.updateBeIdTaskMaps(); LOG.info("There are {} need schedule task in queue when {}", needScheduleTasksQueue.size(), System.currentTimeMillis()); - AgentBatchTask batchTask = new AgentBatchTask(); + Map> beIdTobatchTask = Maps.newHashMap(); int sizeOfTasksQueue = needScheduleTasksQueue.size(); int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum; @@ -88,27 +94,56 @@ private void process() throws LoadException { long beId = routineLoadManager.getMinTaskBeId(); RoutineLoadJob routineLoadJob = null; try { - routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId()); + routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString()); } catch (MetaNotFoundException e) { LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId()); return; } - RoutineLoadTask routineLoadTask = routineLoadTaskInfo.createStreamLoadTask(beId); + TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(beId); // remove task for needScheduleTasksList in job routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo); routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); - AgentTaskQueue.addTask(routineLoadTask); - batchTask.addTask(routineLoadTask); + // add to batch task map + if (beIdTobatchTask.containsKey(beId)) { + beIdTobatchTask.get(beId).add(tRoutineLoadTask); + } else { + List tRoutineLoadTaskList = Lists.newArrayList(); + tRoutineLoadTaskList.add(tRoutineLoadTask); + beIdTobatchTask.put(beId, tRoutineLoadTaskList); + } + // count clusterIdleSlotNum--; scheduledTaskNum++; routineLoadManager.addNumOfConcurrentTasksByBeId(beId); - needScheduleTaskNum--; } + submitBatchTask(beIdTobatchTask); LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum); + } + + // todo: change to batch submit and reuse client + private void submitBatchTask(Map> beIdToRoutineLoadTask) { + for (Map.Entry> entry : beIdToRoutineLoadTask.entrySet()) { + Backend backend = Catalog.getCurrentSystemInfo().getBackend(entry.getKey()); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + BackendService.Client client = null; + boolean ok = false; + try { + client = ClientPool.backendPool.borrowObject(address); + for (TRoutineLoadTask tRoutineLoadTask : entry.getValue()) { + client.submit_routine_load_task(tRoutineLoadTask); + } + ok = true; + } catch (Exception e) { + LOG.warn("task exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } - if (batchTask.getTaskNum() > 0) { - AgentTaskExecutor.submit(batchTask); } } } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 78ec6abf4b53b4..0d4b00d536b1ca 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.UserException; import org.apache.doris.load.LoadErrorHub; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TLoadErrorHubInfo; @@ -63,15 +64,15 @@ public class StreamLoadPlanner { // Data will load to this table private Database db; private OlapTable destTable; - private TStreamLoadPutRequest request; + private StreamLoadTask streamLoadTask; private Analyzer analyzer; private DescriptorTable descTable; - public StreamLoadPlanner(Database db, OlapTable destTable, TStreamLoadPutRequest request) { + public StreamLoadPlanner(Database db, OlapTable destTable, StreamLoadTask streamLoadTask) { this.db = db; this.destTable = destTable; - this.request = request; + this.streamLoadTask = streamLoadTask; analyzer = new Analyzer(Catalog.getInstance(), null); descTable = analyzer.getDescTbl(); @@ -92,14 +93,14 @@ public TExecPlanFragmentParams plan() throws UserException { } // create scan node - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(0), tupleDesc, destTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(0), tupleDesc, destTable, streamLoadTask); scanNode.init(analyzer); descTable.computeMemLayout(); scanNode.finalize(analyzer); // create dest sink - OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, request.getPartitions()); - olapTableSink.init(request.getLoadId(), request.getTxnId(), db.getId()); + OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, streamLoadTask.getPartitions()); + olapTableSink.init(streamLoadTask.getId(), streamLoadTask.getTxnId(), db.getId()); olapTableSink.finalize(); // for stream load, we only need one fragment, ScanNode -> DataSink. @@ -150,7 +151,7 @@ public TExecPlanFragmentParams plan() throws UserException { } } - LOG.debug("stream load txn id: {}, plan: {}", request.txnId, params); + LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params); return params; } } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 1d9f86e8e5ac87..e0abee1b2e8ce6 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -39,6 +39,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TBrokerScanNode; import org.apache.doris.thrift.TBrokerScanRange; @@ -70,7 +71,7 @@ public class StreamLoadScanNode extends ScanNode { // TODO(zc): now we use scanRange // input parameter private Table dstTable; - private TStreamLoadPutRequest request; + private StreamLoadTask streamLoadTask; // helper private Analyzer analyzer; @@ -82,10 +83,10 @@ public class StreamLoadScanNode extends ScanNode { // used to construct for streaming loading public StreamLoadScanNode( - PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, TStreamLoadPutRequest request) { + PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, StreamLoadTask streamLoadTask) { super(id, tupleDesc, "StreamLoadScanNode"); this.dstTable = dstTable; - this.request = request; + this.streamLoadTask = streamLoadTask; } @Override @@ -97,19 +98,19 @@ public void init(Analyzer analyzer) throws UserException { brokerScanRange = new TBrokerScanRange(); TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); - rangeDesc.file_type = request.getFileType(); - rangeDesc.format_type = request.getFormatType(); + rangeDesc.file_type = streamLoadTask.getFileType(); + rangeDesc.format_type = streamLoadTask.getFormatType(); rangeDesc.splittable = false; - switch (request.getFileType()) { + switch (streamLoadTask.getFileType()) { case FILE_LOCAL: - rangeDesc.path = request.getPath(); + rangeDesc.path = streamLoadTask.getPath(); break; case FILE_STREAM: rangeDesc.path = "Invalid Path"; - rangeDesc.load_id = request.getLoadId(); + rangeDesc.load_id = streamLoadTask.getId(); break; default: - throw new UserException("unsupported file type, type=" + request.getFileType()); + throw new UserException("unsupported file type, type=" + streamLoadTask.getFileType()); } rangeDesc.start_offset = 0; rangeDesc.size = -1; @@ -123,35 +124,14 @@ public void init(Analyzer analyzer) throws UserException { // columns: k1, k2, v1, v2=k1 + k2 // this means that there are three columns(k1, k2, v1) in source file, // and v2 is derived from (k1 + k2) - if (request.isSetColumns()) { - String columnsSQL = new String("COLUMNS " + request.getColumns()); - SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL))); - ImportColumnsStmt columnsStmt; - try { - columnsStmt = (ImportColumnsStmt) parser.parse().value; - } catch (Error e) { - LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e); - throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character"); - } catch (AnalysisException e) { - LOG.warn("analyze columns' statement failed, sql={}, error={}", - columnsSQL, parser.getErrorMsg(columnsSQL), e); - String errorMessage = parser.getErrorMsg(columnsSQL); - if (errorMessage == null) { - throw e; - } else { - throw new AnalysisException(errorMessage, e); - } - } catch (Exception e) { - LOG.warn("failed to parse columns header, sql={}", columnsSQL, e); - throw new UserException("parse columns header failed", e); - } - - for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { + if (streamLoadTask.getColumnToColumnExpr() != null || streamLoadTask.getColumnToColumnExpr().size() != 0) { + for (Map.Entry entry : streamLoadTask.getColumnToColumnExpr().entrySet()) { // make column name case match with real column name - String realColName = dstTable.getColumn(columnDesc.getColumn()) == null ? columnDesc.getColumn() - : dstTable.getColumn(columnDesc.getColumn()).getName(); - if (columnDesc.getExpr() != null) { - exprsByName.put(realColName, columnDesc.getExpr()); + String column = entry.getKey(); + String realColName = dstTable.getColumn(column) == null ? column + : dstTable.getColumn(column).getName(); + if (entry.getValue() != null) { + exprsByName.put(realColName, entry.getValue()); } else { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); @@ -203,36 +183,14 @@ public void init(Analyzer analyzer) throws UserException { } // analyze where statement - if (request.isSetWhere()) { + if (streamLoadTask.getWhereExpr() != null) { Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); for (SlotDescriptor slotDescriptor : desc.getSlots()) { dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); } - String whereSQL = new String("WHERE " + request.getWhere()); - SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL))); - ImportWhereStmt whereStmt; - try { - whereStmt = (ImportWhereStmt) parser.parse().value; - } catch (Error e) { - LOG.warn("error happens when parsing where header, sql={}", whereSQL, e); - throw new AnalysisException("failed to parsing where header, maybe contain unsupported character"); - } catch (AnalysisException e) { - LOG.warn("analyze where statement failed, sql={}, error={}", - whereSQL, parser.getErrorMsg(whereSQL), e); - String errorMessage = parser.getErrorMsg(whereSQL); - if (errorMessage == null) { - throw e; - } else { - throw new AnalysisException(errorMessage, e); - } - } catch (Exception e) { - LOG.warn("failed to parse where header, sql={}", whereSQL, e); - throw new UserException("parse columns header failed", e); - } - // substitute SlotRef in filter expression - Expr whereExpr = whereStmt.getExpr(); + Expr whereExpr = streamLoadTask.getWhereExpr(); List slots = Lists.newArrayList(); whereExpr.collect(SlotRef.class, slots); @@ -258,8 +216,8 @@ public void init(Analyzer analyzer) throws UserException { computeStats(analyzer); createDefaultSmap(analyzer); - if (request.isSetColumnSeparator()) { - String sep = ColumnSeparator.convertSeparator(request.getColumnSeparator()); + if (streamLoadTask.getColumnSeparator() != null) { + String sep = streamLoadTask.getColumnSeparator().getColumnSeparator(); params.setColumn_separator(sep.getBytes(Charset.forName("UTF-8"))[0]); } else { params.setColumn_separator((byte) '\t'); diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 546dff1fd8e9ce..570d718fdcbcaa 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -814,7 +814,7 @@ private void handleShowRoutineLoad() throws AnalysisException { // get routine load info List> rows = Lists.newArrayList(); List row = Lists.newArrayList(); - row.add(routineLoadJob.getId()); + row.add(String.valueOf(routineLoadJob.getId())); row.add(routineLoadJob.getName()); row.add(String.valueOf(routineLoadJob.getDbId())); row.add(String.valueOf(routineLoadJob.getTableId())); diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 068d0de45916c3..41c726469fc72a 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -47,6 +47,7 @@ import org.apache.doris.qe.VariableMgr; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.FrontendServiceVersion; import org.apache.doris.thrift.TColumnDef; @@ -728,7 +729,7 @@ private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) if (!(table instanceof OlapTable)) { throw new UserException("load table type is not OlapTable, type=" + table.getClass()); } - StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, request); + StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, StreamLoadTask.fromTStreamLoadPutRequest(request)); return planner.plan(); } finally { db.readUnlock(); diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java new file mode 100644 index 00000000000000..30d9a4896b628f --- /dev/null +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.task; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ImportColumnsStmt; +import org.apache.doris.analysis.ImportWhereStmt; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.load.routineload.RoutineLoadTaskInfo; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.StringReader; +import java.util.Map; + +public class StreamLoadTask { + + private static final Logger LOG = LogManager.getLogger(StreamLoadTask.class); + + private TUniqueId id; + private long txnId; + private TFileType fileType; + private TFileFormatType formatType; + + // optional + private Map columnToColumnExpr; + private Expr whereExpr; + private ColumnSeparator columnSeparator; + private String partitions; + private String path; + + public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { + this.id = id; + this.txnId = txnId; + this.fileType = fileType; + this.formatType = formatType; + } + + public TUniqueId getId() { + return id; + } + + public long getTxnId() { + return txnId; + } + + public TFileType getFileType() { + return fileType; + } + + public TFileFormatType getFormatType() { + return formatType; + } + + public Map getColumnToColumnExpr() { + return columnToColumnExpr; + } + + public Expr getWhereExpr() { + return whereExpr; + } + + public ColumnSeparator getColumnSeparator() { + return columnSeparator; + } + + public String getPartitions() { + return partitions; + } + + public String getPath() { + return path; + } + + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { + StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), + request.getFileType(), request.getFormatType()); + streamLoadTask.setOptionalFromTSLPutRequest(request); + return streamLoadTask; + } + + private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws UserException { + if (request.isSetColumns()) { + setColumnToColumnExpr(request.getColumns()); + } + if (request.isSetWhere()) { + setWhereExpr(request.getWhere()); + } + if (request.isSetColumnSeparator()) { + setColumnSeparator(request.getColumnSeparator()); + } + if (request.isSetPartitions()) { + partitions = request.getPartitions(); + } + switch (request.getFileType()) { + case FILE_LOCAL: + path = request.getPath(); + } + } + + public static StreamLoadTask fromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo) { + TUniqueId queryId = new TUniqueId(routineLoadTaskInfo.getId().getMostSignificantBits(), + routineLoadTaskInfo.getId().getLeastSignificantBits()); + StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, routineLoadTaskInfo.getTxnId(), + TFileType.FILE_STREAM, TFileFormatType.FORMAT_CSV_PLAIN); + RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); + streamLoadTask.setOptionalFromRoutineLoadTaskInfo(routineLoadTaskInfo, + routineLoadManager.getJob(routineLoadTaskInfo.getJobId())); + return streamLoadTask; + } + + private void setOptionalFromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo, + RoutineLoadJob routineLoadJob) { + if (routineLoadJob.getRoutineLoadDesc() != null) { + RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc(); + if (routineLoadDesc.getColumnsInfo() != null) { + columnToColumnExpr = routineLoadDesc.getColumnsInfo().getParsedExprMap(); + } + if (routineLoadDesc.getWherePredicate() != null) { + whereExpr = routineLoadDesc.getWherePredicate(); + } + if (routineLoadDesc.getColumnSeparator() != null) { + columnSeparator = routineLoadDesc.getColumnSeparator(); + } + if (routineLoadDesc.getPartitionNames() != null && routineLoadDesc.getPartitionNames().size() != 0) { + partitions = Joiner.on(",").join(routineLoadDesc.getPartitionNames()); + } + } + } + + private void setColumnToColumnExpr(String columns) throws UserException { + String columnsSQL = new String("COLUMNS " + columns); + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL))); + ImportColumnsStmt columnsStmt; + try { + columnsStmt = (ImportColumnsStmt) parser.parse().value; + } catch (Error e) { + LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e); + throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character"); + } catch (AnalysisException e) { + LOG.warn("analyze columns' statement failed, sql={}, error={}", + columnsSQL, parser.getErrorMsg(columnsSQL), e); + String errorMessage = parser.getErrorMsg(columnsSQL); + if (errorMessage == null) { + throw e; + } else { + throw new AnalysisException(errorMessage, e); + } + } catch (Exception e) { + LOG.warn("failed to parse columns header, sql={}", columnsSQL, e); + throw new UserException("parse columns header failed", e); + } + + if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { + columnToColumnExpr = Maps.newHashMap(); + for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { + columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); + } + } + } + + private void setWhereExpr(String whereString) throws UserException { + String whereSQL = new String("WHERE " + whereString); + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL))); + ImportWhereStmt whereStmt; + try { + whereStmt = (ImportWhereStmt) parser.parse().value; + } catch (Error e) { + LOG.warn("error happens when parsing where header, sql={}", whereSQL, e); + throw new AnalysisException("failed to parsing where header, maybe contain unsupported character"); + } catch (AnalysisException e) { + LOG.warn("analyze where statement failed, sql={}, error={}", + whereSQL, parser.getErrorMsg(whereSQL), e); + String errorMessage = parser.getErrorMsg(whereSQL); + if (errorMessage == null) { + throw e; + } else { + throw new AnalysisException(errorMessage, e); + } + } catch (Exception e) { + LOG.warn("failed to parse where header, sql={}", whereSQL, e); + throw new UserException("parse columns header failed", e); + } + whereExpr = whereStmt.getExpr(); + } + + private void setColumnSeparator(String oriSeparator) throws AnalysisException { + columnSeparator = new ColumnSeparator(oriSeparator); + columnSeparator.analyze(); + } +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index be923dd457f757..76a29589fd544f 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -62,6 +62,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; public class KafkaRoutineLoadJobTest { @@ -125,7 +126,7 @@ public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, }; KafkaRoutineLoadJob kafkaRoutineLoadJob = - new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, + new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, 1L, routineLoadDesc, 3, 0, "", "", new KafkaProgress()); Deencapsulation.setField(kafkaRoutineLoadJob, "consumer", kafkaConsumer); @@ -149,7 +150,7 @@ public void testDivideRoutineLoadJob(@Mocked KafkaConsumer kafkaConsumer, }; KafkaRoutineLoadJob kafkaRoutineLoadJob = - new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, + new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, 1L, routineLoadDesc, 3, 0, "", "", null); @@ -200,7 +201,7 @@ public void testProcessTimeOutTasks(@Mocked KafkaConsumer kafkaConsumer, }; RoutineLoadJob routineLoadJob = - new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, + new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, 1L, routineLoadDesc, 3, 0, "", "", null); new Expectations() { @@ -214,7 +215,7 @@ public void testProcessTimeOutTasks(@Mocked KafkaConsumer kafkaConsumer, }; List routineLoadTaskInfoList = new ArrayList<>(); - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo("1", "1"); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L); kafkaTaskInfo.addKafkaPartition(100); kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000); routineLoadTaskInfoList.add(kafkaTaskInfo); @@ -231,7 +232,7 @@ public long getNextId() { new Expectations() { { - routineLoadManager.getJob("1"); + routineLoadManager.getJob(1L); result = routineLoadJob; } }; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index b0e54a032cb937..98578f0e729c5d 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -74,7 +74,7 @@ public void testNormalRunOneCycle(@Mocked KafkaConsumer consumer, }; RoutineLoadJob routineLoadJob = - new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, + new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, 1L, routineLoadDesc ,3, 0, "", "", new KafkaProgress()); routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULE); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index c9a4daadc39ecc..05d26e31b28426 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Queue; +import java.util.UUID; import mockit.Deencapsulation; import mockit.Expectations; @@ -64,7 +65,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 long beId = 100L; Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo("1", "1"); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l); routineLoadTaskInfo1.addKafkaPartition(1); routineLoadTaskInfo1.addKafkaPartition(2); routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); @@ -116,7 +117,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 result = beId; routineLoadManager.getJobByTaskId(anyString); result = kafkaRoutineLoadJob1; - routineLoadManager.getJob(anyString); + routineLoadManager.getJob(anyLong); result = kafkaRoutineLoadJob1; } }; @@ -127,7 +128,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 // // new Expectations() { // { -// routineLoadTaskInfo1.createStreamLoadTask(anyLong); +// routineLoadTaskInfo1.createRoutineLoadTask(anyLong); // result = kafkaRoutineLoadTask; // } // }; diff --git a/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java b/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java index aff31fc986e4ed..8c4eca059d93ce 100644 --- a/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java +++ b/fe/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.UserException; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; @@ -70,7 +71,8 @@ public void testNormalPlan() throws UserException { TStreamLoadPutRequest request = new TStreamLoadPutRequest(); request.setTxnId(1); request.setLoadId(new TUniqueId(2, 3)); - StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, request); + StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); planner.plan(); } } \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java index bf26a8cdcd49b5..87093a6d4ef2b6 100644 --- a/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java +++ b/fe/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java @@ -35,6 +35,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TPlanNode; @@ -140,7 +141,8 @@ public void testNormal() throws UserException { } TStreamLoadPutRequest request = getBaseRequest(); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); new Expectations() {{ dstTable.getBaseSchema(); result = columns; }}; @@ -174,7 +176,8 @@ public void testLostV2() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1, k2, v1"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -203,7 +206,8 @@ public void testBadColumns() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1 k2 v1"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -249,7 +253,8 @@ public void testColumnsNormal() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_LOCAL); request.setColumns("k1,k2,v1, v2=k2"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -297,7 +302,8 @@ public void testHllColumnsNormal() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_LOCAL); request.setColumns("k1,k2, v1=hll_hash(k2)"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -345,7 +351,8 @@ public void testHllColumnsNoHllHash() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_LOCAL); request.setColumns("k1,k2, v1=hll_hash1(k2)"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -375,7 +382,8 @@ public void testHllColumnsFail() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_LOCAL); request.setColumns("k1,k2, v1=k2"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -405,7 +413,8 @@ public void testUnsupportedFType() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_BROKER); request.setColumns("k1,k2,v1, v2=k2"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -434,7 +443,8 @@ public void testColumnsUnknownRef() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k3"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -480,7 +490,8 @@ public void testWhereNormal() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k1"); request.setWhere("k1 = 1"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -526,7 +537,8 @@ public void testWhereBad() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k2"); request.setWhere("k1 1"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -556,7 +568,8 @@ public void testWhereUnknownRef() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k1"); request.setWhere("k5 = 1"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -586,7 +599,8 @@ public void testWhereNotBool() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k1"); request.setWhere("k1 + v2"); - StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request); + StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, + StreamLoadTask.fromTStreamLoadPutRequest(request)); scanNode.init(analyzer); scanNode.finalize(analyzer);