Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ public KafkaRoutineLoadJob() {
super(-1, LoadDataSourceType.KAFKA);
}

public KafkaRoutineLoadJob(Long id, String name, long dbId, long tableId, String brokerList, String topic) {
super(id, name, dbId, tableId, LoadDataSourceType.KAFKA);
public KafkaRoutineLoadJob(Long id, String name, String clusterName, long dbId, long tableId, String brokerList,
String topic) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
this.brokerList = brokerList;
this.topic = topic;
this.progress = new KafkaProgress();
Expand Down Expand Up @@ -119,7 +120,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) {
if (state == JobState.NEED_SCHEDULE) {
// divide kafkaPartitions into tasks
for (int i = 0; i < currentConcurrentTaskNum; i++) {
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName);
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
Expand All @@ -145,7 +146,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) {
@Override
public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
int aliveBeNum = systemInfoService.getClusterBackendIds(getClusterName(), true).size();
int aliveBeNum = systemInfoService.getClusterBackendIds(clusterName, true).size();
int partitionNum = currentKafkaPartitions.size();
if (desireTaskConcurrentNum == 0) {
desireTaskConcurrentNum = partitionNum;
Expand Down Expand Up @@ -290,8 +291,8 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr

// init kafka routine load job
long id = Catalog.getInstance().getNextId();
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), db.getId(),
tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
db.getClusterName(), db.getId(), tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
kafkaRoutineLoadJob.setOptional(stmt);

return kafkaRoutineLoadJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {

private List<Integer> partitions;

public KafkaTaskInfo(UUID id, long jobId) {
super(id, jobId);
public KafkaTaskInfo(UUID id, long jobId, String clusterName) {
super(id, jobId, clusterName);
this.partitions = new ArrayList<>();
}

public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getBeId());
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(), kafkaTaskInfo.getBeId());
this.partitions = kafkaTaskInfo.getPartitions();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public boolean isFinalState() {

protected long id;
protected String name;
protected String clusterName;
protected long dbId;
protected long tableId;
// this code is used to verify be task request
Expand Down Expand Up @@ -177,12 +178,13 @@ public RoutineLoadJob(long id, LoadDataSourceType type) {
this.dataSourceType = type;
}

public RoutineLoadJob(Long id, String name, long dbId, long tableId, LoadDataSourceType dataSourceType) {
this.id = id;
public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long tableId,
LoadDataSourceType dataSourceType) {
this(id, dataSourceType);
this.name = name;
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.dataSourceType = dataSourceType;
this.endTimestamp = -1;
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
Expand Down Expand Up @@ -323,19 +325,6 @@ public String getPartitions() {
}
}

public String getClusterName() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
database.readLock();
try {
return database.getClusterName();
} finally {
database.readUnlock();
}
}

public int getDesiredConcurrentNumber() {
return desireTaskConcurrentNum;
}
Expand Down Expand Up @@ -912,6 +901,7 @@ public void write(DataOutput out) throws IOException {

out.writeLong(id);
Text.writeString(out, name);
Text.writeString(out, clusterName);
out.writeLong(dbId);
out.writeLong(tableId);
out.writeInt(desireTaskConcurrentNum);
Expand All @@ -934,6 +924,7 @@ public void readFields(DataInput in) throws IOException {

id = in.readLong();
name = Text.readString(in);
clusterName = Text.readString(in);
dbId = in.readLong();
tableId = in.readLong();
desireTaskConcurrentNum = in.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,41 +88,16 @@ private void writeUnlock() {
public RoutineLoadManager() {
}

private void updateBeIdToMaxConcurrentTasks() {
public void updateBeIdToMaxConcurrentTasks() {
beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true)
.parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
.stream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
}

// this is not real-time number
public int getTotalMaxConcurrentTaskNum() {
return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum();
}

public void updateBeIdTaskMaps() {
writeLock();
try {
// step1: update backend number in all of cluster
updateBeIdToMaxConcurrentTasks();
List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(true);

// diff beIds and beIdToMaxConcurrentTasks.keys()
List<Long> newBeIds = beIds.parallelStream().filter(entity -> beIdToMaxConcurrentTasks.get(entity) == null)
.collect(Collectors.toList());
List<Long> unavailableBeIds = beIdToMaxConcurrentTasks.keySet().parallelStream()
.filter(entity -> !beIds.contains(entity))
.collect(Collectors.toList());
newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM));
for (long beId : unavailableBeIds) {
beIdToMaxConcurrentTasks.remove(beId);
}
LOG.info("There are {} backends which participate in routine load scheduler. "
+ "There are {} new backends and {} unavailable backends for routine load",
beIdToMaxConcurrentTasks.size(), newBeIds.size(), unavailableBeIds.size());
} finally {
writeUnlock();
}
}

private Map<Long, Integer> getBeIdConcurrentTaskMaps() {
Map<Long, Integer> beIdToConcurrentTasks = Maps.newHashMap();
for (RoutineLoadJob routineLoadJob : getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,24 @@ public abstract class RoutineLoadTaskInfo {
protected UUID id;
protected long txnId;
protected long jobId;
protected String clusterName;

private long createTimeMs;
private long loadStartTimeMs;
// the be id of previous task
protected long previousBeId = -1L;
// the be id of this task
protected long beId = -1L;

public RoutineLoadTaskInfo(UUID id, long jobId) {
public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName) {
this.id = id;
this.jobId = jobId;
this.clusterName = clusterName;
this.createTimeMs = System.currentTimeMillis();
}

public RoutineLoadTaskInfo(UUID id, long jobId, long previousBeId) {
this.id = id;
this.jobId = jobId;
this.createTimeMs = System.currentTimeMillis();
public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long previousBeId) {
this(id, jobId, clusterName);
this.previousBeId = previousBeId;
}

Expand All @@ -71,6 +72,10 @@ public long getJobId() {
return jobId;
}

public String getClusterName() {
return clusterName;
}

public void setLoadStartTimeMs(long loadStartTimeMs) {
this.loadStartTimeMs = loadStartTimeMs;
}
Expand All @@ -87,6 +92,10 @@ public long getBeId() {
return beId;
}

public long getCreateTimeMs() {
return createTimeMs;
}

public long getLoadStartTimeMs() {
return loadStartTimeMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ public class RoutineLoadTaskScheduler extends Daemon {

private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class);

private static final long BACKEND_SLOT_UPDATE_INTERVAL_MS = 10000; // 10s

private RoutineLoadManager routineLoadManager;
private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue;

private long lastBackendSlotUpdateTime = -1;

@VisibleForTesting
public RoutineLoadTaskScheduler() {
super("routine load task", 0);
Expand All @@ -83,23 +87,17 @@ protected void runOneCycle() {
}

private void process() throws LoadException, UserException, InterruptedException {
// update current beIdMaps for tasks
routineLoadManager.updateBeIdTaskMaps();
updateBackendSlotIfNecessary();

LOG.info("There are {} need schedule task in queue when {}",
needScheduleTasksQueue.size(), System.currentTimeMillis());
Map<Long, List<TRoutineLoadTask>> beIdTobatchTask = Maps.newHashMap();
int sizeOfTasksQueue = needScheduleTasksQueue.size();
int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum();
int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum;

LOG.info("There are {} tasks need to be scheduled in queue", needScheduleTasksQueue.size());

int scheduledTaskNum = 0;
// get idle be task num
// allocate task to be
// if (needScheduleTaskNum == 0) {
// Thread.sleep(1000);
// return;
// }
while (needScheduleTaskNum > 0) {
Map<Long, List<TRoutineLoadTask>> beIdTobatchTask = Maps.newHashMap();
while (needScheduleTaskNum-- > 0) {
// allocate be to task and begin transaction for task
RoutineLoadTaskInfo routineLoadTaskInfo = null;
try {
Expand All @@ -109,21 +107,17 @@ private void process() throws LoadException, UserException, InterruptedException
e.getMessage(),e);
return;
}
RoutineLoadJob routineLoadJob = null;
try {
routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId());
allocateTaskToBe(routineLoadTaskInfo, routineLoadJob);
allocateTaskToBe(routineLoadTaskInfo);
routineLoadTaskInfo.beginTxn();
} catch (MetaNotFoundException e) {
// task has been abandoned while renew task has been added in queue
// or database has been deleted
needScheduleTaskNum--;
LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId())
.add("error_msg", "task has been abandoned with error " + e.getMessage()).build(), e);
continue;
} catch (LoadException e) {
needScheduleTasksQueue.put(routineLoadTaskInfo);
needScheduleTaskNum--;
LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId())
.add("error_msg", "put task to the rear of queue with error " + e.getMessage())
.build(), e);
Expand All @@ -144,12 +138,21 @@ private void process() throws LoadException, UserException, InterruptedException
}
// count
scheduledTaskNum++;
needScheduleTaskNum--;
}
submitBatchTask(beIdTobatchTask);
LOG.info("{} tasks have been allocated to be.", scheduledTaskNum);
}

private void updateBackendSlotIfNecessary() {
long currentTime = System.currentTimeMillis();
if (lastBackendSlotUpdateTime != -1
&& currentTime - lastBackendSlotUpdateTime > BACKEND_SLOT_UPDATE_INTERVAL_MS) {
routineLoadManager.updateBeIdToMaxConcurrentTasks();
lastBackendSlotUpdateTime = currentTime;
LOG.debug("update backend max slot for routine load task scheduling");
}
}

public void addTaskInQueue(RoutineLoadTaskInfo routineLoadTaskInfo) {
needScheduleTasksQueue.add(routineLoadTaskInfo);
}
Expand Down Expand Up @@ -184,22 +187,22 @@ private void submitBatchTask(Map<Long, List<TRoutineLoadTask>> beIdToRoutineLoad
// check if previous be has idle slot
// true: allocate previous be to task
// false: allocate the most idle be to task
private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo, RoutineLoadJob routineLoadJob)
private void allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo)
throws MetaNotFoundException, LoadException {
if (routineLoadTaskInfo.getPreviousBeId() != -1L) {
if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadJob.getClusterName())) {
if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName())) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId())
.add("job_id", routineLoadJob.getId())
.add("job_id", routineLoadTaskInfo.getJobId())
.add("previous_be_id", routineLoadTaskInfo.getPreviousBeId())
.add("msg", "task use the previous be id")
.build());
routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId());
return;
}
}
routineLoadTaskInfo.setBeId(routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName()));
routineLoadTaskInfo.setBeId(routineLoadManager.getMinTaskBeId(routineLoadTaskInfo.getClusterName()));
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, routineLoadTaskInfo.getId())
.add("job_id", routineLoadJob.getId())
.add("job_id", routineLoadTaskInfo.getJobId())
.add("be_id", routineLoadTaskInfo.getBeId())
.add("msg", "task has been allocated to be")
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testProcessTimeOutTasks(@Mocked KafkaConsumer kafkaConsumer,
};

List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>();
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster");
kafkaTaskInfo.addKafkaPartition(100);
kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000);
routineLoadTaskInfoList.add(kafkaTaskInfo);
Expand Down
Loading