Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public LoadColumnsInfo(List<String> columnNames, List<Expr> columnMappingList) {
this.columnMappingList = columnMappingList;
}

public Map<String, Expr> getParsedExprMap() {
return parsedExprMap;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
checkColumnNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAd

// TODO(ml): I will change it after ut.
@VisibleForTesting
public KafkaRoutineLoadJob(String id, String name, long dbId, long tableId,
public KafkaRoutineLoadJob(long id, String name, long dbId, long tableId,
RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, int maxErrorNum,
String serverAddress, String topic, KafkaProgress kafkaProgress) {
Expand All @@ -90,6 +90,14 @@ public KafkaRoutineLoadJob(String id, String name, long dbId, long tableId,
setConsumer();
}

public String getTopic() {
return topic;
}

public String getServerAddress() {
return serverAddress;
}

private void setCustomKafkaPartitions(List<Integer> kafkaPartitions) throws LoadException {
writeLock();
try {
Expand Down Expand Up @@ -123,7 +131,7 @@ public List<RoutineLoadTaskInfo> divideRoutineLoadJob(int currentConcurrentTaskN
// divide kafkaPartitions into tasks
for (int i = 0; i < currentConcurrentTaskNum; i++) {
try {
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID().toString(), id);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id);
routineLoadTaskInfoList.add(kafkaTaskInfo);
needScheduleTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,26 @@

package org.apache.doris.load.routineload;

import com.google.common.base.Joiner;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.task.KafkaRoutineLoadTask;
import org.apache.doris.task.RoutineLoadTask;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;

import com.google.common.collect.Maps;
Expand All @@ -38,15 +52,15 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {

private List<Integer> partitions;

public KafkaTaskInfo(String id, String jobId) throws LabelAlreadyUsedException,
public KafkaTaskInfo(UUID id, long jobId) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
super(id, jobId);
this.partitions = new ArrayList<>();
}

public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
super(UUID.randomUUID().toString(), kafkaTaskInfo.getJobId());
super(UUID.randomUUID(), kafkaTaskInfo.getJobId());
this.partitions = kafkaTaskInfo.getPartitions();
}

Expand All @@ -58,9 +72,10 @@ public List<Integer> getPartitions() {
return partitions;
}

// todo: reuse plan fragment of stream load
@Override
public RoutineLoadTask createStreamLoadTask(long beId) throws LoadException {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException {
KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId);
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
for (Integer partitionId : partitions) {
KafkaProgress kafkaProgress = (KafkaProgress) routineLoadJob.getProgress();
Expand All @@ -69,13 +84,41 @@ public RoutineLoadTask createStreamLoadTask(long beId) throws LoadException {
}
partitionIdToOffset.put(partitionId, kafkaProgress.getPartitionIdToOffset().get(partitionId));
}
RoutineLoadTask routineLoadTask = new KafkaRoutineLoadTask(routineLoadJob.getResourceInfo(),
beId, routineLoadJob.getDbId(),
routineLoadJob.getTableId(),
id, txnId, partitionIdToOffset);
if (routineLoadJob.getRoutineLoadDesc() != null) {
routineLoadTask.setRoutineLoadDesc(routineLoadJob.getRoutineLoadDesc());
}
return routineLoadTask;

// init tRoutineLoadTask and create plan fragment
TRoutineLoadTask tRoutineLoadTask = new TRoutineLoadTask();
TUniqueId queryId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
tRoutineLoadTask.setId(queryId);
tRoutineLoadTask.setJob_id(jobId);
tRoutineLoadTask.setTxn_id(txnId);
Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId());
tRoutineLoadTask.setDb(database.getFullName());
tRoutineLoadTask.setTbl(database.getTable(routineLoadJob.getTableId()).getName());
StringBuilder stringBuilder = new StringBuilder();
// label = (serviceAddress_topic_partition1:offset_partition2:offset).hashcode()
String label = String.valueOf(stringBuilder.append(routineLoadJob.getServerAddress()).append("_")
.append(routineLoadJob.getTopic()).append("_")
.append(Joiner.on("_").withKeyValueSeparator(":")
.join(partitionIdToOffset)).toString().hashCode());
tRoutineLoadTask.setLabel(label);
tRoutineLoadTask.setAuth_code(routineLoadJob.getAuthCode());
TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo();
tKafkaLoadInfo.setTopic((routineLoadJob).getTopic());
tKafkaLoadInfo.setBrokers((routineLoadJob).getServerAddress());
tKafkaLoadInfo.setPartition_begin_offset(partitionIdToOffset);
tRoutineLoadTask.setKafka_load_info(tKafkaLoadInfo);
tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
tRoutineLoadTask.setParams(createTExecPlanFragmentParams(routineLoadJob));
return tRoutineLoadTask;
}


private TExecPlanFragmentParams createTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException {
StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadTaskInfo(this);
Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId());
StreamLoadPlanner planner = new StreamLoadPlanner(database,
(OlapTable) database.getTable(routineLoadJob.getTableId()),
streamLoadTask);
return planner.plan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,12 @@
// "numOfTotalData": "", "taskId": "", "jobId": ""}
public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {

public enum LoadSourceType {
KAFKA(1);

private final int flag;

private LoadSourceType(int flag) {
this.flag = flag;
}

public int value() {
return flag;
}

public static LoadSourceType valueOf(int flag) {
switch (flag) {
case 1:
return KAFKA;
default:
return null;
}
}
}

private long jobId;
private TUniqueId taskId;
private long filteredRows;
private long loadedRows;
private RoutineLoadProgress progress;
private LoadSourceType loadSourceType;
private LoadDataSourceType loadDataSourceType;

public RLTaskTxnCommitAttachment() {
}
Expand All @@ -70,7 +47,7 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac

switch (rlTaskTxnCommitAttachment.getLoadSourceType()) {
case KAFKA:
this.loadSourceType = LoadSourceType.KAFKA;
this.loadDataSourceType = LoadDataSourceType.KAFKA;
this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress());
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.TxnStateChangeListener;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendServiceImpl;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnCommitRequest;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.transaction.AbortTransactionException;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -107,10 +113,12 @@ public boolean isFinalState() {

}

protected String id;
protected long id;
protected String name;
protected long dbId;
protected long tableId;
// this code is used to verify be task request
protected long authCode;
protected RoutineLoadDesc routineLoadDesc; // optional
protected int desireTaskConcurrentNum; // optional
protected JobState state;
Expand All @@ -134,25 +142,31 @@ public boolean isFinalState() {
protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList;
protected List<RoutineLoadTaskInfo> needScheduleTaskInfoList;

// plan fragment which will be initialized during job scheduler
protected TExecPlanFragmentParams tExecPlanFragmentParams;

protected ReentrantReadWriteLock lock;
// TODO(ml): error sample

public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType dataSourceType) {
this.id = UUID.randomUUID().toString();
this.id = Catalog.getInstance().getNextId();
this.name = name;
this.dbId = dbId;
this.tableId = tableId;
this.state = JobState.NEED_SCHEDULE;
this.dataSourceType = dataSourceType;
this.resourceInfo = ConnectContext.get().toResourceCtx();
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
.append(id).append(System.currentTimeMillis()).toString().hashCode();
this.routineLoadTaskInfoList = new ArrayList<>();
this.needScheduleTaskInfoList = new ArrayList<>();
lock = new ReentrantReadWriteLock(true);
}

// TODO(ml): I will change it after ut.
@VisibleForTesting
public RoutineLoadJob(String id, String name, long dbId, long tableId,
public RoutineLoadJob(long id, String name, long dbId, long tableId,
RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, LoadDataSourceType dataSourceType,
int maxErrorNum) {
Expand Down Expand Up @@ -187,7 +201,7 @@ public void writeUnlock() {
lock.writeLock().unlock();
}

public String getId() {
public long getId() {
return id;
}

Expand Down Expand Up @@ -227,6 +241,10 @@ public void setState(JobState state) {
this.state = state;
}

public long getAuthCode() {
return authCode;
}

protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException {
writeLock();
try {
Expand Down Expand Up @@ -324,7 +342,7 @@ public List<RoutineLoadTaskInfo> processTimeoutTasks() {
for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs())
> DEFAULT_TASK_TIMEOUT_SECONDS * 1000) {
String oldSignature = routineLoadTaskInfo.getId();
String oldSignature = routineLoadTaskInfo.getId().toString();
// abort txn if not committed
try {
Catalog.getCurrentGlobalTransactionMgr()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

Expand All @@ -55,7 +56,7 @@ public class RoutineLoadManager {
private Map<Long, Integer> beIdToConcurrentTasks;

// stream load job meta
private Map<String, RoutineLoadJob> idToRoutineLoadJob;
private Map<Long, RoutineLoadJob> idToRoutineLoadJob;
private Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob;

private Queue<RoutineLoadTaskInfo> needScheduleTasksQueue;
Expand Down Expand Up @@ -328,7 +329,7 @@ public long getMinTaskBeId() throws LoadException {
}
}

public RoutineLoadJob getJob(String jobId) {
public RoutineLoadJob getJob(long jobId) {
return idToRoutineLoadJob.get(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.task.RoutineLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;

import java.util.UUID;

/**
* Routine load task info is the task info include the only id (signature).
* For the kafka type of task info, it also include partitions which will be obtained data in this task.
Expand All @@ -36,29 +41,30 @@ public abstract class RoutineLoadTaskInfo {

private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager();

protected String id;
protected UUID id;
protected long txnId;
protected String jobId;
protected long jobId;
private long createTimeMs;
private long loadStartTimeMs;
private TExecPlanFragmentParams tExecPlanFragmentParams;

public RoutineLoadTaskInfo(String id, String jobId) throws BeginTransactionException,
public RoutineLoadTaskInfo(UUID id, long jobId) throws BeginTransactionException,
LabelAlreadyUsedException, AnalysisException {
this.id = id;
this.jobId = jobId;
this.createTimeMs = System.currentTimeMillis();
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), id, -1, "streamLoad",
routineLoadJob.getDbId(), id.toString(), -1, "streamLoad",
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob);
}

public String getId() {
public UUID getId() {
return id;
}

public String getJobId() {
public long getJobId() {
return jobId;
}

Expand All @@ -74,13 +80,13 @@ public long getTxnId() {
return txnId;
}

abstract RoutineLoadTask createStreamLoadTask(long beId) throws LoadException;
abstract TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException;

@Override
public boolean equals(Object obj) {
if (obj instanceof RoutineLoadTaskInfo) {
RoutineLoadTaskInfo routineLoadTaskInfo = (RoutineLoadTaskInfo) obj;
return this.id.equals(routineLoadTaskInfo.getId());
return this.id.toString().equals(routineLoadTaskInfo.getId().toString());
} else {
return false;
}
Expand Down
Loading