Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -5243,6 +5243,8 @@ keyword ::=
{: RESULT = id; :}
| KW_ISOLATION:id
{: RESULT = id; :}
| KW_JOB:id
{: RESULT = id; :}
| KW_ENCRYPTKEY:id
{: RESULT = id; :}
| KW_ENCRYPTKEYS:id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class ChannelDescription implements Writable {
// column names of source table
@SerializedName(value = "colNames")
private final List<String> colNames;
@SerializedName(value = "channelId")
private long channelId;

public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List<String> colNames) {
this.srcDatabase = srcDatabase;
Expand Down Expand Up @@ -119,6 +121,14 @@ private void analyzeColumns() throws AnalysisException {
}
}

public void setChannelId(long channelId) {
this.channelId = channelId;
}

public long getChannelId() {
return this.channelId;
}

public String getTargetTable() {
return targetTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ private void startMasterOnlyDaemonThreads() {
ExportChecker.init(Config.export_checker_interval_second * 1000L);
ExportChecker.startAll();
// Sync checker
SyncChecker.init(Config.sync_checker_interval_second);
SyncChecker.init(Config.sync_checker_interval_second * 1000L);
SyncChecker.startAll();
// Tablet checker and scheduler
tabletChecker.start();
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@ public class Config extends ConfigBase {
*/
@ConfField public static int sync_checker_interval_second = 5;

/**
* max num of thread to handle sync task in sync task thread-pool.
*/
@ConfField public static int max_sync_task_threads_num = 10;

/**
* Default number of waiting jobs for routine load and version 2 of load
* This is a desired number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.load.sync;

import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.UserException;
Expand All @@ -32,7 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class SyncChannel extends SyncLifeCycle {
public class SyncChannel {
private static final Logger LOG = LogManager.getLogger(SyncChannel.class);

protected long id;
Expand All @@ -46,8 +45,8 @@ public class SyncChannel extends SyncLifeCycle {
protected String srcTable;
protected SyncChannelCallback callback;

public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
this.id = Catalog.getCurrentCatalog().getNextId();
public SyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
this.id = id;
this.jobId = syncJob.getId();
this.db = db;
this.tbl = table;
Expand All @@ -57,22 +56,6 @@ public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> c
this.srcTable = srcTable.toLowerCase();
}

@Override
public void start() {
super.start();
LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
}

@Override
public void stop() {
super.stop();
LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
}

@Override
public void process() {
}

public void beginTxn(long batchId) throws UserException, TException, TimeoutException,
InterruptedException, ExecutionException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

public interface SyncChannelCallback {

public boolean state();

public void onFinished(long channelId);

public void onFailed(String errMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.atomic.AtomicBoolean;

public class SyncChannelHandle implements SyncChannelCallback {
private Logger LOG = LogManager.getLogger(SyncChannelHandle.class);

// channel id -> dummy value(-1)
private MarkedCountDownLatch<Long, Long> latch;
private Sync sync = new Sync();

public void reset(int size) {
this.latch = new MarkedCountDownLatch<>(size);
Expand All @@ -41,19 +38,6 @@ public void mark(SyncChannel channel) {
latch.addMark(channel.getId(), -1L);
}

public void set(Boolean mutex) {
if (mutex) {
this.sync.innerSetTrue();
} else {
this.sync.innerSetFalse();
}
}

@Override
public boolean state() {
return this.sync.innerState();
}

@Override
public void onFinished(long channelId) {
this.latch.markedCountDown(channelId, -1L);
Expand All @@ -71,41 +55,4 @@ public void join() throws InterruptedException {
public Status getStatus() {
return latch.getStatus();
}

// This class describes the inner state.
private final class Sync {
private AtomicBoolean state;

boolean innerState() {
return this.state.get();
}

public boolean getState() {
return state.get();
}

void innerSetTrue() {
boolean s;
do {
s = getState();
if (s) {
return;
}
} while(!state.compareAndSet(s, true));
}

void innerSetFalse() {
boolean s;
do {
s = getState();
if (!s) {
return;
}
} while(!state.compareAndSet(s, false));
}

private Sync() {
state = new AtomicBoolean(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.collect.Maps;

import org.apache.doris.task.SyncPendingTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -67,7 +68,7 @@ public static void startAll() {

@Override
protected void runAfterCatalogReady() {
LOG.debug("start check export jobs. job state: {}", jobState.name());
LOG.debug("start check sync jobs. job state: {}", jobState.name());
switch (jobState) {
case PENDING:
runPendingJobs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ public static SyncJob read(DataInput in) throws IOException {

public void setChannelDescriptions(List<ChannelDescription> channelDescriptions) {
this.channelDescriptions = channelDescriptions;
// set channel id
for (ChannelDescription channelDescription : channelDescriptions) {
channelDescription.setChannelId(Catalog.getCurrentCatalog().getNextId());
}
}

public long getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void stop() {
this.running = false;

if (thread != null) {
// Deadlock prevention
if (thread == Thread.currentThread()) {
return;
}

try {
thread.join();
} catch (InterruptedException e) {
Expand Down
Loading