From 547fd00ff21159c694b9d70f6c03e5ed98150b70 Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 10 Aug 2021 19:46:20 +0800 Subject: [PATCH 01/10] save code --- .../java/org/apache/doris/common/Config.java | 5 + .../apache/doris/load/sync/SyncChannel.java | 18 +- .../doris/load/sync/SyncChannelCallback.java | 2 - .../doris/load/sync/SyncChannelHandle.java | 53 --- .../apache/doris/load/sync/SyncChecker.java | 1 + .../apache/doris/load/sync/SyncLifeCycle.java | 5 + .../load/sync/canal/CanalSyncChannel.java | 91 +++--- .../sync/canal/CanalSyncDataConsumer.java | 20 +- .../doris/load/sync/canal/CanalSyncJob.java | 13 +- .../load/sync/canal/SyncCanalClient.java | 54 +--- .../doris/qe/InsertStreamTxnExecutor.java | 3 +- .../org/apache/doris/task/StripedObject.java | 22 ++ .../apache/doris/task/StripedRunnable.java | 21 ++ .../doris/task/StripedTaskExecutor.java | 301 ++++++++++++++++++ .../{load/sync => task}/SyncPendingTask.java | 4 +- .../java/org/apache/doris/task/SyncTask.java | 57 ++++ .../org/apache/doris/task/SyncTaskPool.java | 35 ++ .../load/sync/canal/CanalSyncDataTest.java | 16 +- thirdparty/build-thirdparty.sh | 14 +- 19 files changed, 548 insertions(+), 187 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java rename fe/fe-core/src/main/java/org/apache/doris/{load/sync => task}/SyncPendingTask.java (95%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index a32b4dca83ed75..bdc424c3cc04d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -652,6 +652,11 @@ public class Config extends ConfigBase { */ @ConfField public static int sync_checker_interval_second = 5; + /** + * max num of thread to handle sync task in sync task thread-pool. + */ + @ConfField public static int max_sync_task_threads_num = 1024; + /** * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java index 85644b13f830d9..9ba34ca5a90ef6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java @@ -32,7 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class SyncChannel extends SyncLifeCycle { +public class SyncChannel { private static final Logger LOG = LogManager.getLogger(SyncChannel.class); protected long id; @@ -57,22 +57,6 @@ public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List c this.srcTable = srcTable.toLowerCase(); } - @Override - public void start() { - super.start(); - LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable); - } - - @Override - public void stop() { - super.stop(); - LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable); - } - - @Override - public void process() { - } - public void beginTxn(long batchId) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java index 8b2f2392362005..2cdf7174674c95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java @@ -19,8 +19,6 @@ public interface SyncChannelCallback { - public boolean state(); - public void onFinished(long channelId); public void onFailed(String errMsg); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java index 4e3a397c9955f9..8f7721ed5b171d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java @@ -24,14 +24,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.concurrent.atomic.AtomicBoolean; - public class SyncChannelHandle implements SyncChannelCallback { private Logger LOG = LogManager.getLogger(SyncChannelHandle.class); // channel id -> dummy value(-1) private MarkedCountDownLatch latch; - private Sync sync = new Sync(); public void reset(int size) { this.latch = new MarkedCountDownLatch<>(size); @@ -41,19 +38,6 @@ public void mark(SyncChannel channel) { latch.addMark(channel.getId(), -1L); } - public void set(Boolean mutex) { - if (mutex) { - this.sync.innerSetTrue(); - } else { - this.sync.innerSetFalse(); - } - } - - @Override - public boolean state() { - return this.sync.innerState(); - } - @Override public void onFinished(long channelId) { this.latch.markedCountDown(channelId, -1L); @@ -71,41 +55,4 @@ public void join() throws InterruptedException { public Status getStatus() { return latch.getStatus(); } - - // This class describes the inner state. - private final class Sync { - private AtomicBoolean state; - - boolean innerState() { - return this.state.get(); - } - - public boolean getState() { - return state.get(); - } - - void innerSetTrue() { - boolean s; - do { - s = getState(); - if (s) { - return; - } - } while(!state.compareAndSet(s, true)); - } - - void innerSetFalse() { - boolean s; - do { - s = getState(); - if (!s) { - return; - } - } while(!state.compareAndSet(s, false)); - } - - private Sync() { - state = new AtomicBoolean(false); - } - } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index 830c449f41f636..b61c153bb32ab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -25,6 +25,7 @@ import com.google.common.collect.Maps; +import org.apache.doris.task.SyncPendingTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java index 3c98137fa7ac23..9109c92bed55e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java @@ -62,6 +62,11 @@ public void stop() { this.running = false; if (thread != null) { + // Deadlock prevention + if (thread == Thread.currentThread()) { + return; + } + try { thread.join(); } catch (InterruptedException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 8b5dc54c70643a..695eb536a2f553 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,8 @@ import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.task.SyncTask; +import org.apache.doris.task.SyncTaskPool; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TMergeType; @@ -49,7 +51,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,8 +59,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CanalSyncChannel extends SyncChannel { @@ -69,44 +68,48 @@ public class CanalSyncChannel extends SyncChannel { private static final String DELETE_CONDITION = DELETE_COLUMN + "=1"; private static final String NULL_VALUE_FOR_LOAD = "\\N"; + private final Object stripe = new Object(); + private long timeoutSecond; private long lastBatchId; - private LinkedBlockingQueue> pendingQueue; + private Data batchBuffer; private InsertStreamTxnExecutor txnExecutor; public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { super(syncJob, db, table, columns, srcDataBase, srcTable); this.batchBuffer = new Data<>(); - this.pendingQueue = Queues.newLinkedBlockingQueue(128); this.lastBatchId = -1L; this.timeoutSecond = -1L; } - public void process() { - while (running) { - if (!isTxnInit()) { - continue; - } - // if txn has begun, send all data in queue - if (isTxnBegin()) { - while (!pendingQueue.isEmpty()) { - try { - Data rows = pendingQueue.poll(CanalConfigs.channelWaitingTimeoutMs, TimeUnit.MILLISECONDS); - if (rows != null) { - sendData(rows); - } - } catch (Exception e) { - String errMsg = "encounter exception in channel, channel " + id + ", " + - "msg: " + e.getMessage() + ", table: " + targetTable; - LOG.error(errMsg); - callback.onFailed(errMsg); - } - } - } - if (callback.state()) { - callback.onFinished(id); - } + final class SendTask extends SyncTask { + private final InsertStreamTxnExecutor executor; + private Data rows; + private long serial; + + public SendTask(long signature, Object stripe, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor, long serial) { + super(signature, stripe, callback); + this.executor = executor; + this.rows = rows; + this.serial = serial; + } + + public void exec() throws Exception { + TransactionEntry txnEntry = txnExecutor.getTxnEntry(); + txnEntry.setDataToSend(rows.getDatas()); + executor.sendData(); + } + } + + final class EOFTask extends SyncTask { + + public EOFTask(long signature, Object stripe, SyncChannelCallback callback) { + super(signature, stripe, callback); + } + + public void exec() throws Exception { + callback.onFinished(signature); } } @@ -189,10 +192,10 @@ public void abortTxn(String reason) throws TException, TimeoutException, Interru throw e; } finally { this.batchBuffer = new Data<>(); - this.pendingQueue.clear(); updateBatchId(-1L); } } + @Override public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException { if (!isTxnBegin()) { @@ -213,10 +216,10 @@ public void commitTxn() throws TException, TimeoutException, InterruptedExceptio throw e; } finally { this.batchBuffer = new Data<>(); - this.pendingQueue.clear(); updateBatchId(-1L); } } + @Override public void initTxn(long timeoutSecond) { if (!isTxnInit()) { @@ -254,7 +257,12 @@ public void submit(long batchId, CanalEntry.EventType eventType, CanalEntry.RowC } } - private void execute(long batchId, CanalEntry.EventType eventType, List columns) { + public void submitEOF() { + EOFTask task = new EOFTask(id, stripe, callback); + SyncTaskPool.submit(task); + } + + public void execute(long batchId, CanalEntry.EventType eventType, List columns) { InternalService.PDataRow row = parseRow(eventType, columns); try { Preconditions.checkState(isTxnInit()); @@ -262,7 +270,8 @@ private void execute(long batchId, CanalEntry.EventType eventType, List(); } updateBatchId(batchId); @@ -294,19 +303,13 @@ private InternalService.PDataRow parseRow(CanalEntry.EventType eventType, List rows) throws TException, TimeoutException, - InterruptedException, ExecutionException { - Preconditions.checkState(isTxnBegin()); - TransactionEntry txnEntry = txnExecutor.getTxnEntry(); - txnEntry.setDataToSend(rows.getDatas()); - this.txnExecutor.sendData(); - } - public void flushData() throws TException, TimeoutException, InterruptedException, ExecutionException { - if (batchBuffer.isNotEmpty()) { - sendData(batchBuffer); - batchBuffer = new Data<>(); + if (this.batchBuffer.isNotEmpty()) { + TransactionEntry txnEntry = txnExecutor.getTxnEntry(); + txnEntry.setDataToSend(batchBuffer.getDatas()); + this.txnExecutor.sendData(); + this.batchBuffer = new Data<>(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java index 10c393bb4ceb2f..d0bc38a2e6ede5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java @@ -102,7 +102,6 @@ public void stop(boolean needCleanUp) { @Override public void beginForTxn() { - handle.set(false); handle.reset(idToChannels.size()); for (CanalSyncChannel channel : idToChannels.values()) { channel.initTxn(Config.max_stream_load_timeout_second); @@ -161,15 +160,16 @@ public void commitForTxn() { } public Status waitForTxn() { + for (CanalSyncChannel channel : idToChannels.values()) { + channel.submitEOF(); + } + Status st = Status.CANCELLED; - handle.set(true); try { handle.join(); st = handle.getStatus(); } catch (InterruptedException e) { logger.warn("InterruptedException: ", e); - } finally { - handle.set(false); } return st; } @@ -190,7 +190,7 @@ public void process() { long totalMemSize = 0L; long startTime = System.currentTimeMillis(); beginForTxn(); - while (true) { + while (running) { Events dataEvents = null; try { dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS); @@ -227,7 +227,12 @@ public void process() { break; } } + Status st = waitForTxn(); + if (!running) { + abortForTxn("stopping client"); + continue; + } if (st.ok()) { commitForTxn(); } else { @@ -405,13 +410,16 @@ private void ack() { private void rollback() { holdGetLock(); try { - connector.rollback(); // Wait for the receiver to put the last message into the queue before clearing queue try { Thread.sleep(1000L); } catch (InterruptedException e) { // ignore } + + if (!ackBatches.isEmpty()) { + connector.rollback(); + } } finally { releaseGetLock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index dce10bab55d630..1960c016107132 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -183,7 +183,9 @@ public boolean isInit() { public void execute() throws UserException { LOG.info("try to start canal client. Remote ip: {}, remote port: {}, debug: {}", ip, port, debug); // init - init(); + if (!isInit()) { + init(); + } // start client unprotectedStartClient(); } @@ -193,10 +195,12 @@ public void cancel(MsgType msgType, String errMsg) { LOG.info("Cancel canal sync job {}. MsgType: {}, errMsg: {}", id, msgType.name(), errMsg); failMsg = new SyncFailMsg(msgType, errMsg); switch (msgType) { - case USER_CANCEL: case SUBMIT_FAIL: case RUN_FAIL: + unprotectedStopClient(JobState.PAUSED); + break; case UNKNOWN: + case USER_CANCEL: unprotectedStopClient(JobState.CANCELLED); break; default: @@ -251,15 +255,12 @@ public void replayUpdateSyncJobState(SyncJobUpdateStateInfo info) { JobState jobState = info.getJobState(); switch (jobState) { case RUNNING: - client.startup(); - updateState(JobState.RUNNING, true); + updateState(JobState.PENDING, true); break; case PAUSED: - client.shutdown(false); updateState(JobState.PAUSED, true); break; case CANCELLED: - client.shutdown(true); updateState(JobState.CANCELLED, true); break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java index 33cb8cf8b72253..421c46d5e298ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java @@ -53,8 +53,6 @@ protected void unlock() { lock.unlock(); } - private ShutDownWorker shutDownWorker; - public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) { this(syncJob, destination, connector, batchSize, debug, ".*\\..*"); } @@ -71,13 +69,9 @@ public void startup() { Preconditions.checkState(!idToChannels.isEmpty(), "no channel is registered"); lock(); try { - // 1.start all threads in channel - for (CanalSyncChannel channel : idToChannels.values()) { - channel.start(); - } - // 2. start executor + // 1. start executor consumer.start(); - // 3. start receiver + // 2. start receiver receiver.start(); } finally { unlock(); @@ -85,43 +79,17 @@ public void startup() { logger.info("canal client has been started."); } - // Stop client asynchronously public void shutdown(boolean needCleanUp) { - this.shutDownWorker = new ShutDownWorker(needCleanUp); - shutDownWorker.shutdown(); - logger.info("canal client shutdown worker has been started."); - } - - public class ShutDownWorker implements Runnable { - public Thread thread; - public boolean needCleanUp; - - public ShutDownWorker(boolean needCleanUp) { - this.thread = new Thread(this, "ShutDownWorker"); - this.needCleanUp = needCleanUp; - } - - public void shutdown() { - thread.start(); - } - - @Override - public void run() { - lock(); - try { - // 1. stop receiver - receiver.stop(); - // 2. stop executor - consumer.stop(needCleanUp); - // 3. stop channels - for (CanalSyncChannel channel : idToChannels.values()) { - channel.stop(); - } - } finally { - unlock(); - } - logger.info("canal client has been stopped."); + lock(); + try { + // 1. stop receiver + receiver.stop(); + // 2. stop executor + consumer.stop(needCleanUp); + } finally { + unlock(); } + logger.info("canal client has been stopped."); } public void registerChannels(List channels) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 85ddd1f7c2258e..97153954df76ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -159,9 +159,10 @@ public void sendData() throws TException, TimeoutException, if (code != TStatusCode.OK) { throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList()); } - txnEntry.clearDataToSend(); } catch (RpcException e) { throw new TException(e); + } finally { + txnEntry.clearDataToSend(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java b/fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java new file mode 100644 index 00000000000000..e2002b6a195ac4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +public interface StripedObject { + Object getStripe(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java b/fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java new file mode 100644 index 00000000000000..1602a2deb87a3a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +public interface StripedRunnable extends Runnable, StripedObject { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java new file mode 100644 index 00000000000000..9a6f3f2122a639 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This thread pool ensures that all tasks belonging to + * the same stripe are executed in the order of submission. + */ +public class StripedTaskExecutor extends AbstractExecutorService { + + private final ExecutorService executor; + + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition terminating = lock.newCondition(); + + private final Map executors = new IdentityHashMap<>(); + + private final static ThreadLocal stripes = new ThreadLocal<>(); + + private State state = State.RUNNING; + + private static enum State { + RUNNING, SHUTDOWN + } + + private StripedTaskExecutor(ExecutorService executor) { + this.executor = executor; + } + + public StripedTaskExecutor() { + this(ThreadPoolManager.newDaemonCacheThreadPool(Config.max_sync_task_threads_num, + "sync-task-pool", true)); + } + + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + saveStripedObject(runnable); + return super.newTaskFor(runnable, value); + } + + protected RunnableFuture newTaskFor(Callable callable) { + saveStripedObject(callable); + return super.newTaskFor(callable); + } + + private void saveStripedObject(Object task) { + if (isStripedObject(task)) { + stripes.set(((StripedObject) task).getStripe()); + } + } + + private static boolean isStripedObject(Object o) { + return o instanceof StripedObject; + } + + public Future submit(Runnable task) { + return submit(task, null); + } + + public Future submit(Runnable task, T result) { + lock.lock(); + try { + checkPoolIsRunning(); + if (isStripedObject(task)) { + return super.submit(task, result); + } else { + return executor.submit(task, result); + } + } finally { + lock.unlock(); + } + } + + public Future submit(Callable task) { + lock.lock(); + try { + checkPoolIsRunning(); + if (isStripedObject(task)) { + return super.submit(task); + } else { + return executor.submit(task); + } + } finally { + lock.unlock(); + } + } + + private void checkPoolIsRunning() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + if (state != State.RUNNING) { + throw new RejectedExecutionException( + "executor not running"); + } + } + + public void execute(Runnable command) { + lock.lock(); + try { + checkPoolIsRunning(); + Object stripe = getStripe(command); + if (stripe != null) { + SerialExecutor serialEx = executors.get(stripe); + if (serialEx == null) { + serialEx = new SerialExecutor(stripe); + executors.put(stripe, serialEx); + } + serialEx.execute(command); + } else { + executor.execute(command); + } + } finally { + lock.unlock(); + } + } + + private Object getStripe(Runnable command) { + Object stripe; + if (command instanceof StripedObject) { + stripe = (((StripedObject) command).getStripe()); + } else { + stripe = stripes.get(); + } + stripes.remove(); + return stripe; + } + + public void shutdown() { + lock.lock(); + try { + state = State.SHUTDOWN; + if (executors.isEmpty()) { + executor.shutdown(); + } + } finally { + lock.unlock(); + } + } + + public List shutdownNow() { + lock.lock(); + try { + shutdown(); + List result = new ArrayList<>(); + for (SerialExecutor serialEx : executors.values()) { + serialEx.tasks.drainTo(result); + } + result.addAll(executor.shutdownNow()); + return result; + } finally { + lock.unlock(); + } + } + + public boolean isShutdown() { + lock.lock(); + try { + return state == State.SHUTDOWN; + } finally { + lock.unlock(); + } + } + + public boolean isTerminated() { + lock.lock(); + try { + if (state == State.RUNNING) return false; + for (SerialExecutor executor : executors.values()) { + if (!executor.isEmpty()) return false; + } + return executor.isTerminated(); + } finally { + lock.unlock(); + } + } + + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + lock.lock(); + try { + long waitUntil = System.nanoTime() + unit.toNanos(timeout); + long remainingTime; + while ((remainingTime = waitUntil - System.nanoTime()) > 0 + && !executors.isEmpty()) { + terminating.awaitNanos(remainingTime); + } + if (remainingTime <= 0) return false; + if (executors.isEmpty()) { + return executor.awaitTermination( + remainingTime, TimeUnit.NANOSECONDS); + } + return false; + } finally { + lock.unlock(); + } + } + + private void removeEmptySerialExecutor(Object stripe, SerialExecutor serialEx) { + Preconditions.checkState(serialEx == executors.get(stripe)); + Preconditions.checkState(lock.isHeldByCurrentThread()); + Preconditions.checkState(serialEx.isEmpty()); + + executors.remove(stripe); + terminating.signalAll(); + if (state == State.SHUTDOWN && executors.isEmpty()) { + executor.shutdown(); + } + } + + private class SerialExecutor implements Executor { + + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + + private Runnable active; + + private final Object stripe; + + private SerialExecutor(Object stripe) { + this.stripe = stripe; + } + + public void execute(final Runnable r) { + lock.lock(); + try { + tasks.add(new Runnable() { + public void run() { + try { + r.run(); + } finally { + scheduleNext(); + } + } + }); + if (active == null) { + scheduleNext(); + } + } finally { + lock.unlock(); + } + } + + private void scheduleNext() { + lock.lock(); + try { + if ((active = tasks.poll()) != null) { + executor.execute(active); + terminating.signalAll(); + } else { + removeEmptySerialExecutor(stripe, this); + } + } finally { + lock.unlock(); + } + } + + public boolean isEmpty() { + lock.lock(); + try { + return active == null && tasks.isEmpty(); + } finally { + lock.unlock(); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java rename to fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java index e9b7695233c5b9..c61d9e69c404e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.sync; +package org.apache.doris.task; import org.apache.doris.common.UserException; import org.apache.doris.load.sync.SyncFailMsg.MsgType; +import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.SyncJob.JobState; -import org.apache.doris.task.MasterTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java new file mode 100644 index 00000000000000..7ca1f931d50203 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.load.sync.SyncChannelCallback; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class SyncTask implements StripedRunnable { + private static final Logger LOG = LogManager.getLogger(SyncTask.class); + + protected long signature; + protected Object stripe; + protected SyncChannelCallback callback; + + public SyncTask(long signature, Object stripe, SyncChannelCallback callback) { + this.signature = signature; + this.stripe = stripe; + this.callback = callback; + } + + @Override + public void run() { + try { + exec(); + } catch (Exception e) { + String errMsg = "channel " + signature + ", " + "msg: " + e.getMessage(); + LOG.error("sync task exec error: {}", errMsg); + callback.onFailed(errMsg); + } + } + + public Object getStripe() { + return this.stripe; + } + + /** + * implement in child + */ + protected abstract void exec() throws Exception; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java new file mode 100644 index 00000000000000..71abe647e2c930 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import java.util.concurrent.ExecutorService; + +public class SyncTaskPool { + + private static final ExecutorService executor = new StripedTaskExecutor(); + + public SyncTaskPool() { + } + + public static void submit(Runnable task) { + if (task == null) { + return; + } + executor.submit(task); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 8a893c1d6b8995..b987b5919cd1bf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -226,7 +226,7 @@ public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); + // channel.start(); consumer.start(); receiver.start(); @@ -235,7 +235,7 @@ public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws } finally { receiver.stop(); consumer.stop(); - channel.stop(); + // channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -301,7 +301,7 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); + // channel.start(); consumer.start(); receiver.start(); @@ -310,7 +310,7 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, } finally { receiver.stop(); consumer.stop(); - channel.stop(); + // channel.stop(); } LOG.info(consumer.getPositionInfo()); @@ -366,7 +366,7 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); + // channel.start(); consumer.start(); receiver.start(); @@ -375,7 +375,7 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, } finally { receiver.stop(); consumer.stop(); - channel.stop(); + // channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -450,7 +450,7 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); + // channel.start(); consumer.start(); receiver.start(); @@ -459,7 +459,7 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, } finally { receiver.stop(); consumer.stop(); - channel.stop(); + // channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 2a1299d42ba050..c1b6dea6829205 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -58,7 +58,7 @@ fi eval set -- "$OPTS" -PARALLEL=$[$(nproc)/4+1] +PARALLEL=$[$(sysctl -n hw.logicalcpu)/4+1] if [[ $# -ne 1 ]] ; then while true; do case "$1" in @@ -226,7 +226,8 @@ build_openssl() { LDFLAGS="-L${TP_LIB_DIR}" \ CFLAGS="-fPIC" \ LIBDIR="lib" \ - ./Configure --prefix=$TP_INSTALL_DIR -zlib -shared ${OPENSSL_PLATFORM} + #./Configure --prefix=$TP_INSTALL_DIR -zlib -shared ${OPENSSL_PLATFORM} + ./Configure --prefix=$TP_INSTALL_DIR -shared ${OPENSSL_PLATFORM} make -j $PARALLEL && make install_sw # NOTE(zc): remove this dynamic library files to make libcurl static link. # If I don't remove this files, I don't known how to make libcurl link static library @@ -247,13 +248,16 @@ build_thrift() { ./bootstrap.sh fi + OPENSSL_DIR="/usr/local/Cellar/openssl@1.1/1.1.1k" + OPENSSL_SUPPORT="-I${OPENSSL_DIR}/include -L${OPENSSL_DIR}/lib" + echo ${TP_LIB_DIR} ./configure CPPFLAGS="-I${TP_INCLUDE_DIR}" LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" LIBS="-lcrypto -ldl -lssl" CFLAGS="-fPIC" \ --prefix=$TP_INSTALL_DIR --docdir=$TP_INSTALL_DIR/doc --enable-static --disable-shared --disable-tests \ --disable-tutorial --without-qt4 --without-qt5 --without-csharp --without-erlang --without-nodejs \ --without-lua --without-perl --without-php --without-php_extension --without-dart --without-ruby \ --without-haskell --without-go --without-haxe --without-d --without-python -without-java --with-cpp \ - --with-libevent=$TP_INSTALL_DIR --with-boost=$TP_INSTALL_DIR --with-openssl=$TP_INSTALL_DIR + --with-libevent=$TP_INSTALL_DIR --with-boost=$TP_INSTALL_DIR if [ -f compiler/cpp/thrifty.hh ];then mv compiler/cpp/thrifty.hh compiler/cpp/thrifty.h @@ -301,7 +305,7 @@ build_protobuf() { LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" \ ./configure --prefix=${TP_INSTALL_DIR} --disable-shared --enable-static --with-zlib=${TP_INSTALL_DIR}/include cd src - sed -i 's/^AM_LDFLAGS\(.*\)$/AM_LDFLAGS\1 -all-static/' Makefile + sed -i '' 's/^AM_LDFLAGS\(.*\)$/AM_LDFLAGS\1 -all-static/' Makefile cd - make -j $PARALLEL && make install } @@ -450,7 +454,7 @@ build_curl() { LDFLAGS="-L${TP_LIB_DIR}" LIBS="-lcrypto -lssl -lcrypto -ldl" \ CFLAGS="-fPIC" \ ./configure --prefix=$TP_INSTALL_DIR --disable-shared --enable-static \ - --without-librtmp --with-ssl=${TP_INSTALL_DIR} --without-libidn2 --disable-ldap --enable-ipv6 \ + --without-librtmp --with-ssl=/usr/local/Cellar/openssl@1.1/1.1.1k --without-libidn2 --disable-ldap --enable-ipv6 \ --without-libssh2 make -j $PARALLEL && make install } From 7fd54b79bec21bbc7a6ba3957e599bec02340af9 Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 10 Aug 2021 20:21:13 +0800 Subject: [PATCH 02/10] save code --- .../doris/load/sync/canal/CanalSyncChannel.java | 6 ++---- thirdparty/build-thirdparty.sh | 14 +++++--------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 695eb536a2f553..ed1f5a1d942c49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -86,13 +86,11 @@ public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List rows; - private long serial; - public SendTask(long signature, Object stripe, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor, long serial) { + public SendTask(long signature, Object stripe, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor) { super(signature, stripe, callback); this.executor = executor; this.rows = rows; - this.serial = serial; } public void exec() throws Exception { @@ -270,7 +268,7 @@ public void execute(long batchId, CanalEntry.EventType eventType, List(); } diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index c1b6dea6829205..2a1299d42ba050 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -58,7 +58,7 @@ fi eval set -- "$OPTS" -PARALLEL=$[$(sysctl -n hw.logicalcpu)/4+1] +PARALLEL=$[$(nproc)/4+1] if [[ $# -ne 1 ]] ; then while true; do case "$1" in @@ -226,8 +226,7 @@ build_openssl() { LDFLAGS="-L${TP_LIB_DIR}" \ CFLAGS="-fPIC" \ LIBDIR="lib" \ - #./Configure --prefix=$TP_INSTALL_DIR -zlib -shared ${OPENSSL_PLATFORM} - ./Configure --prefix=$TP_INSTALL_DIR -shared ${OPENSSL_PLATFORM} + ./Configure --prefix=$TP_INSTALL_DIR -zlib -shared ${OPENSSL_PLATFORM} make -j $PARALLEL && make install_sw # NOTE(zc): remove this dynamic library files to make libcurl static link. # If I don't remove this files, I don't known how to make libcurl link static library @@ -248,16 +247,13 @@ build_thrift() { ./bootstrap.sh fi - OPENSSL_DIR="/usr/local/Cellar/openssl@1.1/1.1.1k" - OPENSSL_SUPPORT="-I${OPENSSL_DIR}/include -L${OPENSSL_DIR}/lib" - echo ${TP_LIB_DIR} ./configure CPPFLAGS="-I${TP_INCLUDE_DIR}" LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" LIBS="-lcrypto -ldl -lssl" CFLAGS="-fPIC" \ --prefix=$TP_INSTALL_DIR --docdir=$TP_INSTALL_DIR/doc --enable-static --disable-shared --disable-tests \ --disable-tutorial --without-qt4 --without-qt5 --without-csharp --without-erlang --without-nodejs \ --without-lua --without-perl --without-php --without-php_extension --without-dart --without-ruby \ --without-haskell --without-go --without-haxe --without-d --without-python -without-java --with-cpp \ - --with-libevent=$TP_INSTALL_DIR --with-boost=$TP_INSTALL_DIR + --with-libevent=$TP_INSTALL_DIR --with-boost=$TP_INSTALL_DIR --with-openssl=$TP_INSTALL_DIR if [ -f compiler/cpp/thrifty.hh ];then mv compiler/cpp/thrifty.hh compiler/cpp/thrifty.h @@ -305,7 +301,7 @@ build_protobuf() { LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" \ ./configure --prefix=${TP_INSTALL_DIR} --disable-shared --enable-static --with-zlib=${TP_INSTALL_DIR}/include cd src - sed -i '' 's/^AM_LDFLAGS\(.*\)$/AM_LDFLAGS\1 -all-static/' Makefile + sed -i 's/^AM_LDFLAGS\(.*\)$/AM_LDFLAGS\1 -all-static/' Makefile cd - make -j $PARALLEL && make install } @@ -454,7 +450,7 @@ build_curl() { LDFLAGS="-L${TP_LIB_DIR}" LIBS="-lcrypto -lssl -lcrypto -ldl" \ CFLAGS="-fPIC" \ ./configure --prefix=$TP_INSTALL_DIR --disable-shared --enable-static \ - --without-librtmp --with-ssl=/usr/local/Cellar/openssl@1.1/1.1.1k --without-libidn2 --disable-ldap --enable-ipv6 \ + --without-librtmp --with-ssl=${TP_INSTALL_DIR} --without-libidn2 --disable-ldap --enable-ipv6 \ --without-libssh2 make -j $PARALLEL && make install } From 4fed584c8b3ae0bd81298a8e20dbfdc4c639fa43 Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 10 Aug 2021 20:40:51 +0800 Subject: [PATCH 03/10] save code --- .../apache/doris/load/sync/canal/CanalSyncDataTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index b987b5919cd1bf..4e01e419ef3323 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -226,7 +226,6 @@ public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - // channel.start(); consumer.start(); receiver.start(); @@ -235,7 +234,6 @@ public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws } finally { receiver.stop(); consumer.stop(); - // channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -301,7 +299,6 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - // channel.start(); consumer.start(); receiver.start(); @@ -310,7 +307,6 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, } finally { receiver.stop(); consumer.stop(); - // channel.stop(); } LOG.info(consumer.getPositionInfo()); @@ -366,7 +362,6 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - // channel.start(); consumer.start(); receiver.start(); @@ -375,7 +370,6 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, } finally { receiver.stop(); consumer.stop(); - // channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -450,7 +444,6 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - // channel.start(); consumer.start(); receiver.start(); @@ -459,7 +452,6 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, } finally { receiver.stop(); consumer.stop(); - // channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); From 5a688e541c7061b435a595b229dd21d27b805d2c Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 10 Aug 2021 20:52:32 +0800 Subject: [PATCH 04/10] add key word --- fe/fe-core/src/main/cup/sql_parser.cup | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index f4e573d4253bc1..dcbb653880e194 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -5243,6 +5243,8 @@ keyword ::= {: RESULT = id; :} | KW_ISOLATION:id {: RESULT = id; :} + | KW_JOB:id + {: RESULT = id; :} | KW_ENCRYPTKEY:id {: RESULT = id; :} | KW_ENCRYPTKEYS:id From 4a17b52bbce8a528abb5864dd9e79a9ebc8ee805 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 13 Aug 2021 15:39:21 +0800 Subject: [PATCH 05/10] fix interval --- fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index bd8bf344da4304..32d33ccbd1b30b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1310,7 +1310,7 @@ private void startMasterOnlyDaemonThreads() { ExportChecker.init(Config.export_checker_interval_second * 1000L); ExportChecker.startAll(); // Sync checker - SyncChecker.init(Config.sync_checker_interval_second); + SyncChecker.init(Config.sync_checker_interval_second * 1000L); SyncChecker.startAll(); // Tablet checker and scheduler tabletChecker.start(); From d9e2d45b0ab9875014edb2dcae19fdc13d4f0f08 Mon Sep 17 00:00:00 2001 From: xy720 Date: Sun, 22 Aug 2021 04:01:45 +0800 Subject: [PATCH 06/10] fix replay log --- .../org/apache/doris/analysis/ChannelDescription.java | 10 ++++++++++ .../java/org/apache/doris/load/sync/SyncChannel.java | 5 ++--- .../apache/doris/load/sync/canal/CanalSyncChannel.java | 4 ++-- .../org/apache/doris/load/sync/canal/CanalSyncJob.java | 6 +++--- .../doris/load/sync/canal/CanalSyncDataTest.java | 9 +++++---- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java index 51eae33f54b60d..83aca6b40bd5ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java @@ -60,6 +60,8 @@ public class ChannelDescription implements Writable { // column names of source table @SerializedName(value = "colNames") private final List colNames; + @SerializedName(value = "id") + private long channelId; public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List colNames) { this.srcDatabase = srcDatabase; @@ -119,6 +121,14 @@ private void analyzeColumns() throws AnalysisException { } } + private void setChannelId(long channelId) { + this.channelId = channelId; + } + + public long getChannelId() { + return this.channelId; + } + public String getTargetTable() { return targetTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java index 9ba34ca5a90ef6..8a47385816897f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java @@ -18,7 +18,6 @@ package org.apache.doris.load.sync; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.UserException; @@ -46,8 +45,8 @@ public class SyncChannel { protected String srcTable; protected SyncChannelCallback callback; - public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { - this.id = Catalog.getCurrentCatalog().getNextId(); + public SyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { + this.id = id; this.jobId = syncJob.getId(); this.db = db; this.tbl = table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index ed1f5a1d942c49..75bf5c4f045d5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -76,8 +76,8 @@ public class CanalSyncChannel extends SyncChannel { private Data batchBuffer; private InsertStreamTxnExecutor txnExecutor; - public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { - super(syncJob, db, table, columns, srcDataBase, srcTable); + public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { + super(id, syncJob, db, table, columns, srcDataBase, srcTable); this.batchBuffer = new Data<>(); this.lastBatchId = -1L; this.timeoutSecond = -1L; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index 1960c016107132..8ca0428c6aac20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -109,8 +109,8 @@ public void initChannels() throws DdlException { colNames.add(column.getName()); } } - CanalSyncChannel syncChannel = new CanalSyncChannel(this, db, olapTable, colNames, - channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); + CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db, + (OlapTable) table, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); if (channelDescription.getPartitionNames() != null) { syncChannel.setPartitions(channelDescription.getPartitionNames()); } @@ -301,4 +301,4 @@ public String toString() { + ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs) + "]"; } -} \ No newline at end of file +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 4e01e419ef3323..343a0fff4fc351 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -73,6 +73,7 @@ public class CanalSyncDataTest { private long offset = 0; private long nextId = 1000L; private int batchSize = 8192; + private long channelId = 100001L; ReentrantLock getLock; @@ -220,7 +221,7 @@ public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); @@ -293,7 +294,7 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); @@ -356,7 +357,7 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); @@ -438,7 +439,7 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); From ff56841e30f11908f5d11850843edd402690bc7f Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 25 Aug 2021 20:17:10 +0800 Subject: [PATCH 07/10] fix bug in channel --- .../doris/analysis/ChannelDescription.java | 2 +- .../java/org/apache/doris/load/sync/SyncJob.java | 4 ++++ .../load/sync/canal/CanalSyncDataConsumer.java | 16 ++++++++++------ .../doris/load/sync/canal/CanalSyncJob.java | 6 +----- .../doris/load/sync/position/PositionMeta.java | 2 +- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java index 83aca6b40bd5ad..b953edab232a41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java @@ -121,7 +121,7 @@ private void analyzeColumns() throws AnalysisException { } } - private void setChannelId(long channelId) { + public void setChannelId(long channelId) { this.channelId = channelId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java index 3ce5b48b0d0e58..9ceb04453e7a41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java @@ -307,6 +307,10 @@ public static SyncJob read(DataInput in) throws IOException { public void setChannelDescriptions(List channelDescriptions) { this.channelDescriptions = channelDescriptions; + // set channel id + for (ChannelDescription channelDescription : channelDescriptions) { + channelDescription.setChannelId(Catalog.getCurrentCatalog().getNextId()); + } } public long getId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java index d0bc38a2e6ede5..bdfc7327d5287f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java @@ -265,7 +265,7 @@ public void put(Message message, int size) { } int startIndex = 0; - // if last ack position is null, it is the first time to consume batch (startOffset = 0) + // if last ack position is null, it is the first time to consume batch EntryPosition lastAckPosition = positionMeta.getAckPosition(); if (lastAckPosition != null) { EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0)); @@ -308,14 +308,18 @@ private void executeOneBatch(Events dataEvents) EntryPosition startPosition = dataEvents.getPositionRange().getStart(); EntryPosition endPosition = dataEvents.getPositionRange().getEnd(); for (CanalSyncChannel channel : idToChannels.values()) { + String key = CanalUtils.getFullName(channel.getSrcDataBase(), channel.getSrcTable()); + // if last commit position is null, it is the first time to execute batch EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId()); - String key = channel.getSrcDataBase() + "." + channel.getSrcTable(); - if (commitPosition.compareTo(startPosition) < 0) { + if (commitPosition != null) { + if (commitPosition.compareTo(startPosition) < 0) { + preferChannels.put(key, channel); + } else if (commitPosition.compareTo(endPosition) < 0) { + secondaryChannels.put(key, channel); + } + } else { preferChannels.put(key, channel); } - else if (commitPosition.compareTo(endPosition) < 0) { - secondaryChannels.put(key, channel); - } } // distribute data to channels diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index 8ca0428c6aac20..f82afa2ccbe1ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -232,11 +232,7 @@ public void unprotectedStopClient(JobState jobState) { return; } if (client != null) { - if (jobState == JobState.CANCELLED) { - client.shutdown(true); - } else { - client.shutdown(false); - } + client.shutdown(true); } updateState(jobState, false); LOG.info("client has been stopped. id: {}, jobName: {}" , id, jobName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java index 4d68315c331661..fc884010713872 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java @@ -76,7 +76,7 @@ public long getAckTime() { } public T getLatestPosition() { - if (batches.isEmpty()) { + if (!batches.containsKey(maxBatchId)) { return null; } else { return batches.get(maxBatchId).getEnd(); From 9cfc6c5b9d14e3b996ab59c82fa592ff7788d72e Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 30 Aug 2021 19:52:04 +0800 Subject: [PATCH 08/10] Replace StripedTaskExecutor with SerialExecutorService instead --- .../java/org/apache/doris/common/Config.java | 2 +- .../apache/doris/load/sync/SyncChecker.java | 2 +- .../load/sync/canal/CanalSyncChannel.java | 23 +- .../load/sync/position/PositionMeta.java | 1 + .../org/apache/doris/task/SerialExecutor.java | 146 +++++++++ .../doris/task/SerialExecutorService.java | 80 +++++ .../org/apache/doris/task/StripedObject.java | 22 -- .../apache/doris/task/StripedRunnable.java | 21 -- .../doris/task/StripedTaskExecutor.java | 301 ------------------ .../java/org/apache/doris/task/SyncTask.java | 13 +- .../org/apache/doris/task/SyncTaskPool.java | 27 +- 11 files changed, 268 insertions(+), 370 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index bdc424c3cc04d7..ffaa5828fd953c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -655,7 +655,7 @@ public class Config extends ConfigBase { /** * max num of thread to handle sync task in sync task thread-pool. */ - @ConfField public static int max_sync_task_threads_num = 1024; + @ConfField public static int max_sync_task_threads_num = 10; /** * Default number of waiting jobs for routine load and version 2 of load diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index b61c153bb32ab5..78da903d1126d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -68,7 +68,7 @@ public static void startAll() { @Override protected void runAfterCatalogReady() { - LOG.debug("start check export jobs. job state: {}", jobState.name()); + LOG.debug("start check sync jobs. job state: {}", jobState.name()); switch (jobState) { case PENDING: runPendingJobs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 75bf5c4f045d5f..266d52f5d112c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -68,7 +68,7 @@ public class CanalSyncChannel extends SyncChannel { private static final String DELETE_CONDITION = DELETE_COLUMN + "=1"; private static final String NULL_VALUE_FOR_LOAD = "\\N"; - private final Object stripe = new Object(); + private final int index; private long timeoutSecond; private long lastBatchId; @@ -78,32 +78,33 @@ public class CanalSyncChannel extends SyncChannel { public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { super(id, syncJob, db, table, columns, srcDataBase, srcTable); + this.index = SyncTaskPool.getNextIndex(); this.batchBuffer = new Data<>(); this.lastBatchId = -1L; this.timeoutSecond = -1L; } - final class SendTask extends SyncTask { + private final static class SendTask extends SyncTask { private final InsertStreamTxnExecutor executor; - private Data rows; + private final Data rows; - public SendTask(long signature, Object stripe, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor) { - super(signature, stripe, callback); + public SendTask(long signature, int index, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor) { + super(signature, index, callback); this.executor = executor; this.rows = rows; } public void exec() throws Exception { - TransactionEntry txnEntry = txnExecutor.getTxnEntry(); + TransactionEntry txnEntry = executor.getTxnEntry(); txnEntry.setDataToSend(rows.getDatas()); executor.sendData(); } } - final class EOFTask extends SyncTask { + private final static class EOFTask extends SyncTask { - public EOFTask(long signature, Object stripe, SyncChannelCallback callback) { - super(signature, stripe, callback); + public EOFTask(long signature, int index, SyncChannelCallback callback) { + super(signature, index, callback); } public void exec() throws Exception { @@ -256,7 +257,7 @@ public void submit(long batchId, CanalEntry.EventType eventType, CanalEntry.RowC } public void submitEOF() { - EOFTask task = new EOFTask(id, stripe, callback); + EOFTask task = new EOFTask(id, index, callback); SyncTaskPool.submit(task); } @@ -268,7 +269,7 @@ public void execute(long batchId, CanalEntry.EventType eventType, List(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java index fc884010713872..d4d8f718da8a0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java @@ -38,6 +38,7 @@ public PositionMeta() { this.batches = Maps.newHashMap(); this.commitPositions = Maps.newHashMap(); } + public void addBatch(long batchId, PositionRange range) { updateMaxBatchId(batchId); batches.put(batchId, range); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java new file mode 100644 index 00000000000000..22d3a3d364f7ca --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class SerialExecutor extends AbstractExecutorService { + + private final ExecutorService taskPool; + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition terminating = lock.newCondition(); + + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + private Runnable active; + + private boolean shutdown; + + public SerialExecutor(final ExecutorService executor) { + Preconditions.checkNotNull(executor); + this.taskPool = executor; + } + + public void execute(final Runnable r) { + lock.lock(); + try { + checkPoolIsRunning(); + tasks.add(new Runnable() { + public void run() { + try { + r.run(); + } finally { + scheduleNext(); + } + } + }); + if (active == null) { + scheduleNext(); + } + } finally { + lock.unlock(); + } + } + + private void checkPoolIsRunning() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + if (shutdown) { + throw new RejectedExecutionException("SerialExecutor is already shutdown"); + } + } + + public void shutdown() { + lock.lock(); + try { + shutdown = true; + } finally { + lock.unlock(); + } + } + + public List shutdownNow() { + lock.lock(); + try { + shutdown = true; + List result = new ArrayList<>(); + tasks.drainTo(result); + return result; + } finally { + lock.unlock(); + } + } + + public boolean isShutdown() { + lock.lock(); + try { + return shutdown; + } finally { + lock.unlock(); + } + } + + public boolean isTerminated() { + lock.lock(); + try { + return shutdown && active == null; + } finally { + lock.unlock(); + } + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long waitUntil = System.nanoTime() + unit.toNanos(timeout); + long remainingTime; + while ((remainingTime = waitUntil - System.nanoTime()) > 0) { + if (shutdown && active == null) { + break; + } + terminating.awaitNanos(remainingTime); + } + return remainingTime > 0; + } finally { + lock.unlock(); + } + } + + private void scheduleNext() { + lock.lock(); + try { + if ((active = tasks.poll()) != null) { + taskPool.execute(active); + } else if (shutdown) { + terminating.signalAll(); + } + } finally { + lock.unlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java new file mode 100644 index 00000000000000..dcdf2690e89e05 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.common.ThreadPoolManager; + +import java.util.concurrent.ExecutorService; + +/** + * This executor service ensures that all tasks submitted to + * the same slot are executed in the order of submission. + */ +public class SerialExecutorService { + + public interface SerialRunnable extends Runnable { + int getIndex(); + } + + private final int numOfSlots; + private final ExecutorService taskPool; + private final SerialExecutor[] slots; + + private SerialExecutorService(int numOfSlots, ExecutorService taskPool) { + this.numOfSlots = numOfSlots; + this.slots = new SerialExecutor[numOfSlots]; + this.taskPool = taskPool; + for (int i = 0; i < numOfSlots; i++) { + slots[i] = new SerialExecutor(taskPool); + } + } + + public SerialExecutorService(int numOfSlots) { + this(numOfSlots, ThreadPoolManager.newDaemonFixedThreadPool( + numOfSlots, 256, "sync-task-pool", true)); + } + + public void submit(Runnable command) { + int index = getIndex(command); + if (isSlotIndex(index)) { + SerialExecutor serialEx = slots[index]; + serialEx.execute(command); + } else { + taskPool.execute(command); + } + } + + private int getIndex(Runnable command) { + int index = -1; + if (command instanceof SerialRunnable) { + index = (((SerialRunnable) command).getIndex()); + } + return index; + } + + private boolean isSlotIndex(int index) { + return index >= 0 && index < numOfSlots; + } + + public void close() { + for (int i = 0; i < numOfSlots; i++) { + final SerialExecutor serialEx = slots[i]; + serialEx.shutdown(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java b/fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java deleted file mode 100644 index e2002b6a195ac4..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StripedObject.java +++ /dev/null @@ -1,22 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -public interface StripedObject { - Object getStripe(); -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java b/fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java deleted file mode 100644 index 1602a2deb87a3a..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StripedRunnable.java +++ /dev/null @@ -1,21 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -public interface StripedRunnable extends Runnable, StripedObject { -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java deleted file mode 100644 index 9a6f3f2122a639..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StripedTaskExecutor.java +++ /dev/null @@ -1,301 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.common.Config; -import org.apache.doris.common.ThreadPoolManager; - -import com.google.common.base.Preconditions; - -import java.util.ArrayList; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * This thread pool ensures that all tasks belonging to - * the same stripe are executed in the order of submission. - */ -public class StripedTaskExecutor extends AbstractExecutorService { - - private final ExecutorService executor; - - private final ReentrantLock lock = new ReentrantLock(); - - private final Condition terminating = lock.newCondition(); - - private final Map executors = new IdentityHashMap<>(); - - private final static ThreadLocal stripes = new ThreadLocal<>(); - - private State state = State.RUNNING; - - private static enum State { - RUNNING, SHUTDOWN - } - - private StripedTaskExecutor(ExecutorService executor) { - this.executor = executor; - } - - public StripedTaskExecutor() { - this(ThreadPoolManager.newDaemonCacheThreadPool(Config.max_sync_task_threads_num, - "sync-task-pool", true)); - } - - protected RunnableFuture newTaskFor(Runnable runnable, T value) { - saveStripedObject(runnable); - return super.newTaskFor(runnable, value); - } - - protected RunnableFuture newTaskFor(Callable callable) { - saveStripedObject(callable); - return super.newTaskFor(callable); - } - - private void saveStripedObject(Object task) { - if (isStripedObject(task)) { - stripes.set(((StripedObject) task).getStripe()); - } - } - - private static boolean isStripedObject(Object o) { - return o instanceof StripedObject; - } - - public Future submit(Runnable task) { - return submit(task, null); - } - - public Future submit(Runnable task, T result) { - lock.lock(); - try { - checkPoolIsRunning(); - if (isStripedObject(task)) { - return super.submit(task, result); - } else { - return executor.submit(task, result); - } - } finally { - lock.unlock(); - } - } - - public Future submit(Callable task) { - lock.lock(); - try { - checkPoolIsRunning(); - if (isStripedObject(task)) { - return super.submit(task); - } else { - return executor.submit(task); - } - } finally { - lock.unlock(); - } - } - - private void checkPoolIsRunning() { - Preconditions.checkState(lock.isHeldByCurrentThread()); - if (state != State.RUNNING) { - throw new RejectedExecutionException( - "executor not running"); - } - } - - public void execute(Runnable command) { - lock.lock(); - try { - checkPoolIsRunning(); - Object stripe = getStripe(command); - if (stripe != null) { - SerialExecutor serialEx = executors.get(stripe); - if (serialEx == null) { - serialEx = new SerialExecutor(stripe); - executors.put(stripe, serialEx); - } - serialEx.execute(command); - } else { - executor.execute(command); - } - } finally { - lock.unlock(); - } - } - - private Object getStripe(Runnable command) { - Object stripe; - if (command instanceof StripedObject) { - stripe = (((StripedObject) command).getStripe()); - } else { - stripe = stripes.get(); - } - stripes.remove(); - return stripe; - } - - public void shutdown() { - lock.lock(); - try { - state = State.SHUTDOWN; - if (executors.isEmpty()) { - executor.shutdown(); - } - } finally { - lock.unlock(); - } - } - - public List shutdownNow() { - lock.lock(); - try { - shutdown(); - List result = new ArrayList<>(); - for (SerialExecutor serialEx : executors.values()) { - serialEx.tasks.drainTo(result); - } - result.addAll(executor.shutdownNow()); - return result; - } finally { - lock.unlock(); - } - } - - public boolean isShutdown() { - lock.lock(); - try { - return state == State.SHUTDOWN; - } finally { - lock.unlock(); - } - } - - public boolean isTerminated() { - lock.lock(); - try { - if (state == State.RUNNING) return false; - for (SerialExecutor executor : executors.values()) { - if (!executor.isEmpty()) return false; - } - return executor.isTerminated(); - } finally { - lock.unlock(); - } - } - - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - lock.lock(); - try { - long waitUntil = System.nanoTime() + unit.toNanos(timeout); - long remainingTime; - while ((remainingTime = waitUntil - System.nanoTime()) > 0 - && !executors.isEmpty()) { - terminating.awaitNanos(remainingTime); - } - if (remainingTime <= 0) return false; - if (executors.isEmpty()) { - return executor.awaitTermination( - remainingTime, TimeUnit.NANOSECONDS); - } - return false; - } finally { - lock.unlock(); - } - } - - private void removeEmptySerialExecutor(Object stripe, SerialExecutor serialEx) { - Preconditions.checkState(serialEx == executors.get(stripe)); - Preconditions.checkState(lock.isHeldByCurrentThread()); - Preconditions.checkState(serialEx.isEmpty()); - - executors.remove(stripe); - terminating.signalAll(); - if (state == State.SHUTDOWN && executors.isEmpty()) { - executor.shutdown(); - } - } - - private class SerialExecutor implements Executor { - - private final BlockingQueue tasks = new LinkedBlockingQueue<>(); - - private Runnable active; - - private final Object stripe; - - private SerialExecutor(Object stripe) { - this.stripe = stripe; - } - - public void execute(final Runnable r) { - lock.lock(); - try { - tasks.add(new Runnable() { - public void run() { - try { - r.run(); - } finally { - scheduleNext(); - } - } - }); - if (active == null) { - scheduleNext(); - } - } finally { - lock.unlock(); - } - } - - private void scheduleNext() { - lock.lock(); - try { - if ((active = tasks.poll()) != null) { - executor.execute(active); - terminating.signalAll(); - } else { - removeEmptySerialExecutor(stripe, this); - } - } finally { - lock.unlock(); - } - } - - public boolean isEmpty() { - lock.lock(); - try { - return active == null && tasks.isEmpty(); - } finally { - lock.unlock(); - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java index 7ca1f931d50203..4d1422d648a6b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java @@ -18,20 +18,21 @@ package org.apache.doris.task; import org.apache.doris.load.sync.SyncChannelCallback; +import org.apache.doris.task.SerialExecutorService.SerialRunnable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public abstract class SyncTask implements StripedRunnable { +public abstract class SyncTask implements SerialRunnable { private static final Logger LOG = LogManager.getLogger(SyncTask.class); protected long signature; - protected Object stripe; + protected int index; protected SyncChannelCallback callback; - public SyncTask(long signature, Object stripe, SyncChannelCallback callback) { + public SyncTask(long signature, int index, SyncChannelCallback callback) { this.signature = signature; - this.stripe = stripe; + this.index = index; this.callback = callback; } @@ -46,8 +47,8 @@ public void run() { } } - public Object getStripe() { - return this.stripe; + public int getIndex() { + return this.index; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java index 71abe647e2c930..ac95e98490017a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java @@ -17,19 +17,32 @@ package org.apache.doris.task; -import java.util.concurrent.ExecutorService; +import org.apache.doris.common.Config; -public class SyncTaskPool { - - private static final ExecutorService executor = new StripedTaskExecutor(); +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntUnaryOperator; - public SyncTaskPool() { - } +public class SyncTaskPool { + private static final int NUM_OF_SLOTS = Config.max_sync_task_threads_num; + private static final SerialExecutorService EXECUTOR = new SerialExecutorService(NUM_OF_SLOTS); + private static final AtomicInteger nextIndex = new AtomicInteger(); public static void submit(Runnable task) { if (task == null) { return; } - executor.submit(task); + EXECUTOR.submit(task); + } + + public static int getNextIndex() { + return nextIndex.updateAndGet(new IntUnaryOperator() { + @Override + public int applyAsInt(int operand) { + if (++operand >= NUM_OF_SLOTS) { + operand = 0; + } + return operand; + } + }); } } From faa662568712d739ed8aec2a20e6e6b64d7f08c2 Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 31 Aug 2021 13:26:16 +0800 Subject: [PATCH 09/10] add ut --- .../doris/analysis/ChannelDescription.java | 2 +- .../java/org/apache/doris/task/SyncTask.java | 13 ++ .../org/apache/doris/task/SyncTaskPool.java | 3 + .../doris/task/SerialExecutorServiceTest.java | 133 ++++++++++++++++++ 4 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java index b953edab232a41..c926fae2b1bc98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java @@ -60,7 +60,7 @@ public class ChannelDescription implements Writable { // column names of source table @SerializedName(value = "colNames") private final List colNames; - @SerializedName(value = "id") + @SerializedName(value = "channelId") private long channelId; public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List colNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java index 4d1422d648a6b3..cdae68b0ea6f62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java @@ -23,10 +23,23 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * SyncTask is a runnable to submit to SerialExecutorService. Each + * SyncTask will have an index to submit to the corresponding slot + * in the SerialExecutorService. And SerialExecutorService ensures + * that all SyncTasks submitted with the same index are always + * executed in the order of submission. + */ public abstract class SyncTask implements SerialRunnable { private static final Logger LOG = LogManager.getLogger(SyncTask.class); protected long signature; + /** + * Each index corresponds to a slot in the SerialExecutorService. + * It should only be assigned by the getNextIndex() method in the + * SyncTaskPool. SyncTasks with the same index are always executed + * in the order of submission. + */ protected int index; protected SyncChannelCallback callback; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java index ac95e98490017a..fb2309d49bdde2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java @@ -34,6 +34,9 @@ public static void submit(Runnable task) { EXECUTOR.submit(task); } + /** + * Gets the next index loop from 0 to @NUM_OF_SLOTS - 1 + */ public static int getNextIndex() { return nextIndex.updateAndGet(new IntUnaryOperator() { @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java new file mode 100644 index 00000000000000..cc9f500cec76f6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.load.sync.SyncChannelCallback; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SerialExecutorServiceTest { + private static final Logger LOG = LoggerFactory.getLogger(MasterTaskExecutorTest.class); + private static final int NUM_OF_SLOTS = 10; + private static final int THREAD_NUM = 10; + + private static SerialExecutorService taskPool; + // thread signature -> tasks submit serial + private static Map> submitSerial; + // thread signature -> tasks execute serial + private static Map> execSerial; + + @Before + public void setUp() { + taskPool = new SerialExecutorService(NUM_OF_SLOTS); + submitSerial = new ConcurrentHashMap<>(); + execSerial = new ConcurrentHashMap<>(); + } + + @After + public void tearDown() { + if (taskPool != null) { + taskPool.close(); + } + } + + @Test + public void testSubmit() { + for (long i = 0; i < THREAD_NUM; i++) { + if (!submitSerial.containsKey(i)) { + submitSerial.put(i, new ArrayList<>()); + } + SubmitThread thread = new SubmitThread("Thread-" + i, i, submitSerial.get(i)); + thread.start(); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + // The submission order of the same signature should be equal to the execution order + Assert.assertEquals(submitSerial.size(), THREAD_NUM); + Assert.assertEquals(submitSerial.size(), execSerial.size()); + for (long i = 0; i < THREAD_NUM; i++) { + Assert.assertTrue(submitSerial.containsKey(i)); + Assert.assertTrue(execSerial.containsKey(i)); + List submitSerialList = submitSerial.get(i); + List execSerialList = execSerial.get(i); + Assert.assertEquals(submitSerialList.size(), execSerialList.size()); + for (int j = 0; j < submitSerialList.size(); j++) { + Assert.assertEquals(submitSerialList.get(j), execSerialList.get(j)); + } + } + } + + private static class TestSyncTask extends SyncTask { + public int serial; + + public TestSyncTask(long signature, int index, int serial, SyncChannelCallback callback) { + super(signature, index, callback); + this.serial = serial; + } + + @Override + protected void exec() { + LOG.info("run exec. signature: {}, index: {}, serial: {}", signature, index, serial); + if (!execSerial.containsKey(signature)) { + execSerial.put(signature, new ArrayList<>()); + } + execSerial.get(signature).add(serial); + } + } + + private static class SubmitThread extends Thread { + private int index = SyncTaskPool.getNextIndex(); + private long signature; + private List submitSerialList; + + public SubmitThread(String name, long signature, List submitSerialList) { + super(name); + this.signature = signature; + this.submitSerialList = submitSerialList; + } + + public void run() { + for (int i = 0; i < 100; i++) { + TestSyncTask task = new TestSyncTask(signature, index, i, new SyncChannelCallback() { + @Override + public void onFinished(long channelId) { + } + @Override + public void onFailed(String errMsg) { + } + }); + submitSerialList.add(i); + taskPool.submit(task); + } + } + } +} From dce52dd49e260e001abcd36f96247e719de091c6 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 10 Sep 2021 11:01:44 +0800 Subject: [PATCH 10/10] save code --- .../java/org/apache/doris/load/sync/canal/CanalSyncJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index f82afa2ccbe1ac..c22b9f8e4625fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -110,7 +110,7 @@ public void initChannels() throws DdlException { } } CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db, - (OlapTable) table, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); + olapTable, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); if (channelDescription.getPartitionNames() != null) { syncChannel.setPartitions(channelDescription.getPartitionNames()); }