Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
k_streaming_load_current_processing.increment(-1);
return -1;
}
return 0;
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/file_helper_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ TEST_F(FileHandlerTest, TestWrite) {
ASSERT_EQ(22, length);


char* large_bytes2[(1 << 12)];
char* large_bytes2[(1 << 10)];
memset(large_bytes2, 0, sizeof(char)*((1 << 12)));
int i = 1;
while (i < 1 << 20) {
while (i < 1 << 17) {
file_handler.write(large_bytes2, ((1 << 12)));
++i;
}
Expand Down
33 changes: 28 additions & 5 deletions fe/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -178,6 +179,25 @@ public synchronized void updateVersionInfo(long newVersion, long newVersionHash,
lastSuccessVersion, lastSuccessVersionHash, dataSize, rowCount);
}

/* last failed version: LFV
* last success version: LSV
* version: V
*
* Case 1:
* If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid.
* Clone task will clone the version between LSV and LFV
*
* Case 2:
* LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV.
*
* Case 3:
* LFV remains unchanged, just update LSV, and then check if it falls into Case 1.
*
* Case 4:
* V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may
* happen when a clone task finished and report version V, but the LSV is already larger than V,
* And we know that version between V and LSV is valid, so move V forward to LSV.
*/
private void updateReplicaInfo(long newVersion, long newVersionHash,
long lastFailedVersion, long lastFailedVersionHash,
long lastSuccessVersion, long lastSuccessVersionHash,
Expand All @@ -196,11 +216,14 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,
lastSuccessVersion = this.version;
lastSuccessVersionHash = this.versionHash;
}

// case 1:
if (this.lastSuccessVersion <= this.lastFailedVersion) {
this.lastSuccessVersion = this.version;
this.lastSuccessVersionHash = this.versionHash;
}

// TODO: this case is unknown, add log to observe
if (this.version > lastFailedVersion && lastFailedVersion > 0) {
LOG.info("current version {} is larger than last failed version {} , "
+ "last failed version hash {}, maybe a fatal error or be report version, print a stack here ",
Expand All @@ -209,15 +232,17 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,

if (lastFailedVersion != this.lastFailedVersion
|| this.lastFailedVersionHash != lastFailedVersionHash) {
// if last failed version changed, then set last success version to invalid version
// Case 2:
if (lastFailedVersion > this.lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
this.lastFailedVersionHash = lastFailedVersionHash;
this.lastFailedTimestamp = System.currentTimeMillis();
}

this.lastSuccessVersion = this.version;
this.lastSuccessVersionHash = this.versionHash;
} else {
// Case 3:
if (lastSuccessVersion >= this.lastSuccessVersion) {
this.lastSuccessVersion = lastSuccessVersion;
this.lastSuccessVersionHash = lastSuccessVersionHash;
Expand All @@ -228,9 +253,7 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,
}
}

// if last failed version <= version, then last failed version is invalid
// version xxxx | last failed version xxxx | last success version xxx
// if current version == last failed version and version hash != last failed version hash, it means the version report from be is not valid
// Case 4:
if (this.version > this.lastFailedVersion
|| this.version == this.lastFailedVersion && this.versionHash == this.lastFailedVersionHash
|| this.version == this.lastFailedVersion && this.lastFailedVersionHash == 0 && this.versionHash != 0) {
Expand All @@ -242,7 +265,7 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,
this.versionHash = this.lastSuccessVersionHash;
}
}
// TODO yiguolei use info log here, there maybe a lot of logs, change it to debug when concurrent load is stable

LOG.debug("update {}", this.toString());
}

Expand Down
3 changes: 3 additions & 0 deletions fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ public class Config extends ConfigBase {
* if (current_time - t1) > 300s, then palo will treat C as a failure node
* will call transaction manager to commit the transaction and tell transaction manager
* that C is failed
*
* This is also used when waiting for publish tasks
*
* TODO this parameter is the default value for all job and the DBA could specify it for separate job
*/
@ConfField public static int load_straggler_wait_second = 300;
Expand Down
4 changes: 2 additions & 2 deletions fe/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.apache.doris.task.CloneTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.CreateRollupTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.task.SchemaChangeTask;
import org.apache.doris.task.SnapshotTask;
Expand Down Expand Up @@ -555,7 +555,7 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
if (request.isSetError_tablet_ids()) {
errorTabletIds = request.getError_tablet_ids();
}
PublishVersionTask publishVersionTask = (PublishVersionTask)task;
PublishVersionTask publishVersionTask = (PublishVersionTask) task;
publishVersionTask.addErrorTablets(errorTabletIds);
publishVersionTask.setIsFinished(true);
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
Expand Down
15 changes: 12 additions & 3 deletions fe/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.doris.metric;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;

import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJob.JobType;
import org.apache.doris.catalog.Catalog;
Expand All @@ -33,6 +30,10 @@
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -57,6 +58,8 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_IMAGE_WRITE;
public static LongCounterMetric COUNTER_IMAGE_PUSH;
public static LongCounterMetric COUNTER_TXN_FAILED;
public static LongCounterMetric COUNTER_TXN_SUCCESS;
public static Histogram HISTO_QUERY_LATENCY;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;

Expand Down Expand Up @@ -161,6 +164,12 @@ public Long getValue() {
COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
"counter of image succeeded in pushing to other frontends");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success",
"counter of success transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed",
"counter of failed transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED);

// 3. histogram
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws
return result;
}

// return true if commit success and publish success, return false if publish timout
// return true if commit success and publish success, return false if publish timeout
private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException {
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
Expand All @@ -655,6 +655,7 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce
}
throw new UserException("unknown database, database=" + dbName);
}

return Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
db, request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()),
Expand Down
10 changes: 5 additions & 5 deletions fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.doris.task;

import java.util.ArrayList;
import java.util.List;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TPublishVersionRequest;
import org.apache.doris.thrift.TTaskType;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TPublishVersionRequest;
import org.apache.doris.thrift.TTaskType;
import java.util.ArrayList;
import java.util.List;

public class PublishVersionTask extends AgentTask {
private static final Logger LOG = LogManager.getLogger(PublishVersionTask.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package org.apache.doris.transaction;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.doris.alter.RollupJob;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
Expand All @@ -49,6 +43,13 @@
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -82,6 +83,7 @@ public class GlobalTransactionMgr {

// transactionId -> TransactionState
private Map<Long, TransactionState> idToTransactionState;
// db id -> (label -> txn id)
private com.google.common.collect.Table<Long, String, Long> dbIdToTxnLabels;
private Map<Long, Integer> runningTxnNums;
private TransactionIdGenerator idGenerator;
Expand All @@ -107,7 +109,7 @@ public long beginTransaction(long dbId, String label, String coordinator, LoadJo
throws AnalysisException, LabelAlreadyExistsException, BeginTransactionException {

if (Config.disable_load_job) {
throw new BeginTransactionException("disable_load_job is set to true, all load job is prevented");
throw new BeginTransactionException("disable_load_job is set to true, all load jobs are prevented");
}

writeLock();
Expand Down Expand Up @@ -185,12 +187,11 @@ public void deleteTransaction(long transactionId) {
*/
public void commitTransaction(long dbId, long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws MetaNotFoundException, TransactionCommitFailedException {

if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is set to true, all load job is prevented");
throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
}

LOG.debug("try to commit transaction:[{}]", transactionId);
LOG.debug("try to commit transaction: {}", transactionId);
if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
throw new TransactionCommitFailedException("all partitions have no load data");
}
Expand Down Expand Up @@ -260,7 +261,8 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
}
// the rolling up index should also be taken care
// if the rollup index failed during load, then set its last failed version
// if rollup task finished, it should compare version and last failed version, if version < last failed version, then the replica is failed
// if rollup task finished, it should compare version and last failed version,
// if version < last failed version, then the replica is failed
if (rollingUpIndex != null) {
allIndices.add(rollingUpIndex);
}
Expand All @@ -287,11 +289,12 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
// ignore it but not log it
// for example, a replica is in clone state
if (replica.getLastFailedVersion() < 0) {
++ successReplicaNum;
++successReplicaNum;
} else {
// if this error replica is a base replica and it is under rollup
// then remove the rollup task and rollup job will remove the rollup replica automatically
// should remove here, because the error replicas not contains this base replica, but it have errors in the past
// should remove here, because the error replicas not contains this base replica,
// but it has errors in the past
if (index.getId() == baseIndex.getId() && rollupJob != null) {
LOG.info("the base replica [{}] has error, remove the related rollup replica from rollupjob [{}]",
replica, rollupJob);
Expand Down Expand Up @@ -340,12 +343,16 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
}
// 5. persistent transactionState
unprotectUpsertTransactionState(transactionState);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
for (long backendId : totalInvolvedBackends) {
transactionState.addPublishVersionTask(backendId, null);
}
} finally {
writeUnlock();
}

// 6. update nextVersion because of the failure of persistent transaction resulting in error version
updateCatalogAfterCommitted(transactionState, db);
LOG.info("transaction:[{}] successfully committed", transactionState);
Expand Down Expand Up @@ -385,7 +392,6 @@ public boolean commitAndPublishTransaction(Database db, long transactionId,
}

public void abortTransaction(long transactionId, String reason) throws UserException {

if (transactionId < 0) {
LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId);
return;
Expand Down Expand Up @@ -897,12 +903,14 @@ private void updateCatalogAfterCommitted(TransactionState transactionState, Data
}
}
partition.setNextVersion(partition.getNextVersion() + 1);
// the partition's current version hash should be set from partition commit info
// for example, fe master's partition current version hash is 123123, fe followers partition current version hash is 3333
// they are different, fe master changed, the follower is master now, but its current version hash is 333, if clone happened,
// clone finished but its finished version hash != partition's current version hash, then clone is failed
// because clone depend on partition's current version to clone
partition.setNextVersionHash(Util.generateVersionHash(), partitionCommitInfo.getVersionHash());
// Although committed version(hash) is not visible to user,
// but they need to be synchronized among Frontends.
// because we use committed version(hash) to create clone task, if the first Master FE
// send clone task with committed version hash X, and than Master changed, the new Master FE
// received the clone task report with version hash X, which not equals to it own committed
// version hash, than the clone task is failed.
partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */,
partitionCommitInfo.getVersionHash() /* committed version hash*/);
}
}
}
Expand Down Expand Up @@ -1015,14 +1023,14 @@ private void updateDBRunningTxnNum(TransactionStatus preStatus, TransactionState
if (preStatus == null
&& (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
|| curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
++ dbRunningTxnNum;
++dbRunningTxnNum;
runningTxnNums.put(curTxnState.getDbId(), dbRunningTxnNum);
} else if (preStatus != null
&& (preStatus == TransactionStatus.PREPARE
|| preStatus == TransactionStatus.COMMITTED)
&& (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
|| curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
-- dbRunningTxnNum;
--dbRunningTxnNum;
if (dbRunningTxnNum < 1) {
runningTxnNums.remove(curTxnState.getDbId());
} else {
Expand Down
Loading