From 0c219ec856e3f087e35429754a58650d21b67933 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 29 May 2024 17:08:38 +0800 Subject: [PATCH 1/3] [enhance](mtmv) Mv refresh on commit (#34548) support refresh MTMV when base table data change / drop partition/replace partition CREATE MATERIALIZED VIEW mv1 REFRESH ON COMMIT AS SELECT xxx; --- .../org/apache/doris/nereids/DorisParser.g4 | 1 + .../java/org/apache/doris/catalog/Env.java | 20 + .../CloudGlobalTransactionMgr.java | 1361 +++++++++++++++++ .../doris/datasource/InternalCatalog.java | 15 +- .../apache/doris/event/DataChangeEvent.java | 24 + .../doris/event/DropPartitionEvent.java | 24 + .../java/org/apache/doris/event/Event.java | 60 + .../apache/doris/event/EventException.java | 34 + .../org/apache/doris/event/EventListener.java | 23 + .../apache/doris/event/EventProcessor.java | 57 + .../org/apache/doris/event/EventType.java | 24 + .../doris/event/ReplacePartitionEvent.java | 24 + .../org/apache/doris/event/TableEvent.java | 52 + .../doris/job/extensions/mtmv/MTMVJob.java | 29 +- .../doris/job/extensions/mtmv/MTMVTask.java | 1 + .../org/apache/doris/mtmv/BaseTableInfo.java | 20 +- .../org/apache/doris/mtmv/MTMVJobManager.java | 19 +- .../apache/doris/mtmv/MTMVRefreshEnum.java | 1 + .../org/apache/doris/mtmv/MTMVService.java | 31 +- .../nereids/parser/LogicalPlanBuilder.java | 3 + .../transaction/DatabaseTransactionMgr.java | 32 + .../data/mtmv_p0/test_commit_mtmv.out | 40 + .../suites/mtmv_p0/test_commit_mtmv.groovy | 130 ++ 23 files changed, 2006 insertions(+), 19 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/Event.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/EventException.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/EventType.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java create mode 100644 regression-test/data/mtmv_p0/test_commit_mtmv.out create mode 100644 regression-test/suites/mtmv_p0/test_commit_mtmv.groovy diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index c78f11b0007f3d..a7b4c4788cf4c8 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -215,6 +215,7 @@ buildMode refreshTrigger : ON MANUAL | ON SCHEDULE refreshSchedule + | ON COMMIT ; refreshSchedule diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 81a8b682c73bd7..93edb9dce5f5a8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -141,6 +141,8 @@ import org.apache.doris.deploy.impl.AmbariDeployManager; import org.apache.doris.deploy.impl.K8sDeployManager; import org.apache.doris.deploy.impl.LocalFileDeployManager; +import org.apache.doris.event.EventProcessor; +import org.apache.doris.event.ReplacePartitionEvent; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; @@ -529,6 +531,7 @@ public class Env { private TopicPublisherThread topicPublisherThread; private MTMVService mtmvService; + private EventProcessor eventProcessor; private InsertOverwriteManager insertOverwriteManager; @@ -772,6 +775,7 @@ public Env(boolean isCheckpointCatalog) { this.topicPublisherThread = new TopicPublisherThread( "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); this.mtmvService = new MTMVService(); + this.eventProcessor = new EventProcessor(mtmvService); this.insertOverwriteManager = new InsertOverwriteManager(); this.dnsCache = new DNSCache(); this.sqlCacheManager = new NereidsSqlCacheManager(); @@ -839,6 +843,10 @@ public MTMVService getMtmvService() { return mtmvService; } + public EventProcessor getEventProcessor() { + return eventProcessor; + } + public InsertOverwriteManager getInsertOverwriteManager() { return insertOverwriteManager; } @@ -5547,6 +5555,18 @@ public void replaceTempPartition(Database db, OlapTable olapTable, ReplacePartit long version = olapTable.getNextVersion(); long versionTime = System.currentTimeMillis(); olapTable.updateVisibleVersionAndTime(version, versionTime); + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of replace the partition + try { + Env.getCurrentEnv().getEventProcessor().processEvent( + new ReplacePartitionEvent(db.getCatalog().getId(), db.getId(), + olapTable.getId())); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } // write log ReplacePartitionOperationLog info = new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java new file mode 100644 index 00000000000000..f680f9457d1c7d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -0,0 +1,1361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.transaction; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.cloud.catalog.CloudPartition; +import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest; +import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse; +import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest; +import org.apache.doris.cloud.proto.Cloud.BeginTxnResponse; +import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictRequest; +import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictResponse; +import org.apache.doris.cloud.proto.Cloud.CleanTxnLabelRequest; +import org.apache.doris.cloud.proto.Cloud.CleanTxnLabelResponse; +import org.apache.doris.cloud.proto.Cloud.CommitTxnRequest; +import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse; +import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest; +import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse; +import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest; +import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse; +import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest; +import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse; +import org.apache.doris.cloud.proto.Cloud.GetTxnRequest; +import org.apache.doris.cloud.proto.Cloud.GetTxnResponse; +import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB; +import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; +import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest; +import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; +import org.apache.doris.cloud.proto.Cloud.TableStatsPB; +import org.apache.doris.cloud.proto.Cloud.TxnInfoPB; +import org.apache.doris.cloud.proto.Cloud.TxnStatusPB; +import org.apache.doris.cloud.proto.Cloud.UniqueIdPB; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.QuotaExceedException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.InternalDatabaseUtil; +import org.apache.doris.common.util.MetaLockUtils; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.event.DataChangeEvent; +import org.apache.doris.load.loadv2.LoadJobFinalOperation; +import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.BatchRemoveTransactionsOperation; +import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; +import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.task.CalcDeleteBitmapTask; +import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TWaitingTxnStatusRequest; +import org.apache.doris.thrift.TWaitingTxnStatusResult; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.GlobalTransactionMgrIface; +import org.apache.doris.transaction.SubTransactionState; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionCommitFailedException; +import org.apache.doris.transaction.TransactionIdGenerator; +import org.apache.doris.transaction.TransactionNotFoundException; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TxnCommitAttachment; +import org.apache.doris.transaction.TxnStateCallbackFactory; +import org.apache.doris.transaction.TxnStateChangeCallback; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { + private static final Logger LOG = LogManager.getLogger(CloudGlobalTransactionMgr.class); + private static final String NOT_SUPPORTED_MSG = "Not supported in cloud mode"; + private static final int DELETE_BITMAP_LOCK_EXPIRATION_SECONDS = 10; + private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15; + + private TxnStateCallbackFactory callbackFactory; + + public CloudGlobalTransactionMgr() { + this.callbackFactory = new TxnStateCallbackFactory(); + } + + public void setEditLog(EditLog editLog) { + //do nothing + } + + @Override + public TxnStateCallbackFactory getCallbackFactory() { + return callbackFactory; + } + + @Override + public void addDatabaseTransactionMgr(Long dbId) { + // do nothing in cloud mode + } + + @Override + public void removeDatabaseTransactionMgr(Long dbId) { + // do nothing in cloud mode + } + + @Override + public long beginTransaction(long dbId, List tableIdList, String label, TxnCoordinator coordinator, + LoadJobSourceType sourceType, long timeoutSecond) + throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException, + QuotaExceedException, MetaNotFoundException { + return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond); + } + + @Override + public long beginTransaction(long dbId, List tableIdList, String label, TUniqueId requestId, + TxnCoordinator coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond) + throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException, + QuotaExceedException, MetaNotFoundException { + + LOG.info("try to begin transaction, dbId: {}, label: {}", dbId, label); + if (Config.disable_load_job) { + throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented"); + } + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); + if (!coordinator.isFromInternal) { + InternalDatabaseUtil.checkDatabase(db.getFullName(), ConnectContext.get()); + } + + switch (sourceType) { + case BACKEND_STREAMING: + checkValidTimeoutSecond(timeoutSecond, Config.max_stream_load_timeout_second, + Config.min_load_timeout_second); + break; + default: + checkValidTimeoutSecond(timeoutSecond, Config.max_load_timeout_second, Config.min_load_timeout_second); + } + + BeginTxnResponse beginTxnResponse = null; + int retryTime = 0; + + try { + Preconditions.checkNotNull(coordinator); + Preconditions.checkNotNull(label); + FeNameFormat.checkLabel(label); + + TxnInfoPB.Builder txnInfoBuilder = TxnInfoPB.newBuilder(); + txnInfoBuilder.setDbId(dbId); + txnInfoBuilder.addAllTableIds(tableIdList); + txnInfoBuilder.setLabel(label); + txnInfoBuilder.setListenerId(listenerId); + + if (requestId != null) { + UniqueIdPB.Builder uniqueIdBuilder = UniqueIdPB.newBuilder(); + uniqueIdBuilder.setHi(requestId.getHi()); + uniqueIdBuilder.setLo(requestId.getLo()); + txnInfoBuilder.setRequestId(uniqueIdBuilder); + } + + txnInfoBuilder.setCoordinator(TxnUtil.txnCoordinatorToPb(coordinator)); + txnInfoBuilder.setLoadJobSourceType(LoadJobSourceTypePB.forNumber(sourceType.value())); + txnInfoBuilder.setTimeoutMs(timeoutSecond * 1000); + txnInfoBuilder.setPrecommitTimeoutMs(Config.stream_load_default_precommit_timeout_second * 1000); + + final BeginTxnRequest beginTxnRequest = BeginTxnRequest.newBuilder() + .setTxnInfo(txnInfoBuilder.build()) + .setCloudUniqueId(Config.cloud_unique_id) + .build(); + + while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, beginTxnRequest:{}", retryTime, beginTxnRequest); + } + beginTxnResponse = MetaServiceProxy.getInstance().beginTxn(beginTxnRequest); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, beginTxnResponse:{}", retryTime, beginTxnResponse); + } + + if (beginTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + LOG.info("beginTxn KV_TXN_CONFLICT, retryTime:{}", retryTime); + backoff(); + retryTime++; + } + + Preconditions.checkNotNull(beginTxnResponse); + Preconditions.checkNotNull(beginTxnResponse.getStatus()); + } catch (Exception e) { + LOG.warn("beginTxn failed, exception:", e); + throw new BeginTransactionException("beginTxn failed, errMsg:" + e.getMessage()); + } + + if (beginTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { + switch (beginTxnResponse.getStatus().getCode()) { + case TXN_DUPLICATED_REQ: + throw new DuplicatedRequestException(DebugUtil.printId(requestId), + beginTxnResponse.getDupTxnId(), beginTxnResponse.getStatus().getMsg()); + case TXN_LABEL_ALREADY_USED: + throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false); + default: + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TXN_REJECT.increase(1L); + } + throw new BeginTransactionException(beginTxnResponse.getStatus().getMsg()); + } + } + + long txnId = beginTxnResponse.getTxnId(); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TXN_BEGIN.increase(1L); + } + return txnId; + } + + @Override + public void preCommitTransaction2PC(Database db, List tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) + throws UserException { + LOG.info("try to precommit transaction: {}", transactionId); + if (Config.disable_load_job) { + throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); + } + + PrecommitTxnRequest.Builder builder = PrecommitTxnRequest.newBuilder(); + builder.setDbId(db.getId()); + builder.setTxnId(transactionId); + + if (txnCommitAttachment != null) { + if (txnCommitAttachment instanceof LoadJobFinalOperation) { + LoadJobFinalOperation loadJobFinalOperation = (LoadJobFinalOperation) txnCommitAttachment; + builder.setCommitAttachment(TxnUtil + .loadJobFinalOperationToPb(loadJobFinalOperation)); + } else { + throw new UserException("Invalid txnCommitAttachment"); + } + } + + builder.setPrecommitTimeoutMs(timeoutMillis); + + final PrecommitTxnRequest precommitTxnRequest = builder.build(); + PrecommitTxnResponse precommitTxnResponse = null; + try { + LOG.info("precommitTxnRequest: {}", precommitTxnRequest); + precommitTxnResponse = MetaServiceProxy + .getInstance().precommitTxn(precommitTxnRequest); + LOG.info("precommitTxnResponse: {}", precommitTxnResponse); + } catch (RpcException e) { + throw new UserException(e.getMessage()); + } + + if (precommitTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { + throw new UserException(precommitTxnResponse.getStatus().getMsg()); + } + } + + @Override + public void commitTransaction(long dbId, List
tableList, + long transactionId, List tabletCommitInfos) + throws UserException { + commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); + } + + @Override + public void commitTransaction(long dbId, List
tableList, long transactionId, + List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) + throws UserException { + commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); + } + + public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) { + long dbId = commitTxnResponse.getTxnInfo().getDbId(); + long txnId = commitTxnResponse.getTxnInfo().getTxnId(); + // 1. update rowCountfor AnalysisManager + Map updatedRows = new HashMap<>(); + for (TableStatsPB tableStats : commitTxnResponse.getTableStatsList()) { + LOG.info("Update RowCount for AnalysisManager. transactionId:{}, table_id:{}, updated_row_count:{}", + txnId, tableStats.getTableId(), tableStats.getUpdatedRowCount()); + updatedRows.put(tableStats.getTableId(), tableStats.getUpdatedRowCount()); + } + Env env = Env.getCurrentEnv(); + env.getAnalysisManager().updateUpdatedRows(updatedRows); + // 2. notify partition first load + int totalPartitionNum = commitTxnResponse.getPartitionIdsList().size(); + // a map to record + Map> tablePartitionMap = Maps.newHashMap(); + for (int idx = 0; idx < totalPartitionNum; ++idx) { + long version = commitTxnResponse.getVersions(idx); + long tableId = commitTxnResponse.getTableIds(idx); + if (version == 2) { + // inform AnalysisManager first load partitions + tablePartitionMap.computeIfAbsent(tableId, k -> Lists.newArrayList()); + tablePartitionMap.get(tableId).add(commitTxnResponse.getPartitionIds(idx)); + } + // 3. update CloudPartition + OlapTable olapTable = (OlapTable) env.getInternalCatalog().getDb(dbId) + .flatMap(db -> db.getTable(tableId)).filter(t -> t.isManagedTable()) + .orElse(null); + if (olapTable == null) { + continue; + } + CloudPartition partition = (CloudPartition) olapTable.getPartition( + commitTxnResponse.getPartitionIds(idx)); + if (partition == null) { + continue; + } + partition.setCachedVisibleVersion(version); + } + env.getAnalysisManager().setNewPartitionLoaded( + tablePartitionMap.keySet().stream().collect(Collectors.toList())); + // tablePartitionMap to string + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : tablePartitionMap.entrySet()) { + sb.append(entry.getKey()).append(":["); + for (Long partitionId : entry.getValue()) { + sb.append(partitionId).append(","); + } + sb.append("];"); + } + if (sb.length() > 0) { + LOG.info("notify partition first load. {}", sb); + } + } + + private Set getBaseTabletsFromTables(List
tableList, List tabletCommitInfos) + throws MetaNotFoundException { + Set baseTabletIds = Sets.newHashSet(); + if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { + return baseTabletIds; + } + for (Table table : tableList) { + OlapTable olapTable = (OlapTable) table; + try { + olapTable.readLock(); + olapTable.getPartitions().stream() + .map(Partition::getBaseIndex) + .map(MaterializedIndex::getTablets) + .flatMap(Collection::stream) + .map(Tablet::getId) + .forEach(baseTabletIds::add); + } finally { + olapTable.readUnlock(); + } + } + Set tabletIds = tabletCommitInfos.stream().map(TabletCommitInfo::getTabletId).collect(Collectors.toSet()); + baseTabletIds.retainAll(tabletIds); + LOG.debug("baseTabletIds: {}", baseTabletIds); + + return baseTabletIds; + } + + private void commitTransaction(long dbId, List
tableList, long transactionId, + List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC) + throws UserException { + + LOG.info("try to commit transaction, transactionId: {}", transactionId); + if (Config.disable_load_job) { + throw new TransactionCommitFailedException( + "disable_load_job is set to true, all load jobs are not allowed"); + } + + List mowTableList = getMowTableList(tableList); + if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty() && !mowTableList.isEmpty()) { + calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos); + } + + CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder(); + builder.setDbId(dbId) + .setTxnId(transactionId) + .setIs2Pc(is2PC) + .setCloudUniqueId(Config.cloud_unique_id) + .addAllBaseTabletIds(getBaseTabletsFromTables(tableList, tabletCommitInfos)); + + // if tablet commit info is empty, no need to pass mowTableList to meta service. + if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty()) { + for (OlapTable olapTable : mowTableList) { + builder.addMowTableIds(olapTable.getId()); + } + } + + if (txnCommitAttachment != null) { + if (txnCommitAttachment instanceof LoadJobFinalOperation) { + LoadJobFinalOperation loadJobFinalOperation = (LoadJobFinalOperation) txnCommitAttachment; + builder.setCommitAttachment(TxnUtil + .loadJobFinalOperationToPb(loadJobFinalOperation)); + } else if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + builder.setCommitAttachment(TxnUtil + .rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment)); + } else { + throw new UserException("invalid txnCommitAttachment"); + } + } + + final CommitTxnRequest commitTxnRequest = builder.build(); + CommitTxnResponse commitTxnResponse = null; + int retryTime = 0; + + try { + while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, commitTxnRequest:{}", retryTime, commitTxnRequest); + } + commitTxnResponse = MetaServiceProxy.getInstance().commitTxn(commitTxnRequest); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, commitTxnResponse:{}", retryTime, commitTxnResponse); + } + if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + // sleep random [20, 200] ms, avoid txn conflict + LOG.info("commitTxn KV_TXN_CONFLICT, transactionId:{}, retryTime:{}", transactionId, retryTime); + backoff(); + retryTime++; + } + + Preconditions.checkNotNull(commitTxnResponse); + Preconditions.checkNotNull(commitTxnResponse.getStatus()); + } catch (Exception e) { + LOG.warn("commitTxn failed, transactionId:{}, exception:", transactionId, e); + throw new UserException("commitTxn() failed, errMsg:" + e.getMessage()); + } + + if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK + && commitTxnResponse.getStatus().getCode() != MetaServiceCode.TXN_ALREADY_VISIBLE) { + LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, commitTxnResponse:{}", + transactionId, retryTime, commitTxnResponse); + if (commitTxnResponse.getStatus().getCode() == MetaServiceCode.LOCK_EXPIRED) { + // DELETE_BITMAP_LOCK_ERR will be retried on be + throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, + "delete bitmap update lock expired, transactionId:" + transactionId); + } + StringBuilder internalMsgBuilder = + new StringBuilder("commitTxn failed, transactionId:"); + internalMsgBuilder.append(transactionId); + internalMsgBuilder.append(" code:"); + internalMsgBuilder.append(commitTxnResponse.getStatus().getCode()); + throw new UserException("internal error, " + internalMsgBuilder.toString()); + } + if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) { + throw new UserException(commitTxnResponse.getStatus().getMsg()); + } + + TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); + TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); + if (cb != null) { + LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", + txnState.getTransactionId(), txnState.getCallbackId(), txnState); + cb.afterCommitted(txnState, true); + cb.afterVisible(txnState, true); + } + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); + MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime()); + } + afterCommitTxnResp(commitTxnResponse); + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of transaction + try { + produceEvent(dbId, tableList); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } + } + + private void produceEvent(long dbId, List
tableList) { + for (Table table : tableList) { + Env.getCurrentEnv().getEventProcessor().processEvent( + new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, dbId, table.getId())); + } + } + + private List getMowTableList(List
tableList) { + List mowTableList = new ArrayList<>(); + for (Table table : tableList) { + if ((table instanceof OlapTable)) { + OlapTable olapTable = (OlapTable) table; + if (olapTable.getEnableUniqueKeyMergeOnWrite()) { + mowTableList.add(olapTable); + } + } + } + return mowTableList; + } + + private void calcDeleteBitmapForMow(long dbId, List tableList, long transactionId, + List tabletCommitInfos) + throws UserException { + Map>> backendToPartitionTablets = Maps.newHashMap(); + Map partitions = Maps.newHashMap(); + Map> tableToPartitions = Maps.newHashMap(); + getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets); + if (backendToPartitionTablets.isEmpty()) { + throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId); + } + + getDeleteBitmapUpdateLock(tableToPartitions, transactionId); + Map partitionVersions = getPartitionVersions(partitions); + + Map> backendToPartitionInfos = getCalcDeleteBitmapInfo( + backendToPartitionTablets, partitionVersions); + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); + } + + private void getPartitionInfo(List tableList, + List tabletCommitInfos, + Map> tableToParttions, + Map partitions, + Map>> backendToPartitionTablets) { + Map tableMap = Maps.newHashMap(); + for (OlapTable olapTable : tableList) { + tableMap.put(olapTable.getId(), olapTable); + } + + List tabletIds = tabletCommitInfos.stream() + .map(TabletCommitInfo::getTabletId).collect(Collectors.toList()); + TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex(); + List tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds); + for (int i = 0; i < tabletMetaList.size(); i++) { + TabletMeta tabletMeta = tabletMetaList.get(i); + long tableId = tabletMeta.getTableId(); + if (!tableMap.containsKey(tableId)) { + continue; + } + + long partitionId = tabletMeta.getPartitionId(); + long backendId = tabletCommitInfos.get(i).getBackendId(); + + if (!tableToParttions.containsKey(tableId)) { + tableToParttions.put(tableId, Sets.newHashSet()); + } + tableToParttions.get(tableId).add(partitionId); + + if (!backendToPartitionTablets.containsKey(backendId)) { + backendToPartitionTablets.put(backendId, Maps.newHashMap()); + } + Map> partitionToTablets = backendToPartitionTablets.get(backendId); + if (!partitionToTablets.containsKey(partitionId)) { + partitionToTablets.put(partitionId, Lists.newArrayList()); + } + partitionToTablets.get(partitionId).add(tabletIds.get(i)); + partitions.putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId)); + } + } + + private Map getPartitionVersions(Map partitionMap) { + Map partitionToVersions = Maps.newHashMap(); + partitionMap.forEach((key, value) -> { + long visibleVersion = value.getVisibleVersion(); + long newVersion = visibleVersion <= 0 ? 2 : visibleVersion + 1; + partitionToVersions.put(key, newVersion); + }); + return partitionToVersions; + } + + private Map> getCalcDeleteBitmapInfo( + Map>> backendToPartitionTablets, Map partitionVersions) { + Map> backendToPartitionInfos = Maps.newHashMap(); + for (Map.Entry>> entry : backendToPartitionTablets.entrySet()) { + List partitionInfos = Lists.newArrayList(); + for (Map.Entry> partitionToTables : entry.getValue().entrySet()) { + Long partitionId = partitionToTables.getKey(); + TCalcDeleteBitmapPartitionInfo partitionInfo = new TCalcDeleteBitmapPartitionInfo(partitionId, + partitionVersions.get(partitionId), + partitionToTables.getValue()); + partitionInfos.add(partitionInfo); + } + backendToPartitionInfos.put(entry.getKey(), partitionInfos); + } + return backendToPartitionInfos; + } + + private void getDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId) + throws UserException { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + for (Map.Entry> entry : tableToParttions.entrySet()) { + GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder(); + builder.setTableId(entry.getKey()) + .setLockId(transactionId) + .setInitiator(-1) + .setExpiration(DELETE_BITMAP_LOCK_EXPIRATION_SECONDS); + final GetDeleteBitmapUpdateLockRequest request = builder.build(); + GetDeleteBitmapUpdateLockResponse response = null; + + int retryTime = 0; + while (retryTime++ < Config.meta_service_rpc_retry_times) { + try { + response = MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request); + if (LOG.isDebugEnabled()) { + LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", + transactionId, request, response); + } + if (response.getStatus().getCode() != MetaServiceCode.LOCK_CONFLICT + && response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + } catch (Exception e) { + LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", + transactionId, retryTime, e); + } + // sleep random millis [20, 200] ms, avoid txn conflict + int randomMillis = 20 + (int) (Math.random() * (200 - 20)); + if (LOG.isDebugEnabled()) { + LOG.debug("randomMillis:{}", randomMillis); + } + try { + Thread.sleep(randomMillis); + } catch (InterruptedException e) { + LOG.info("InterruptedException: ", e); + } + } + Preconditions.checkNotNull(response); + Preconditions.checkNotNull(response.getStatus()); + if (response.getStatus().getCode() != MetaServiceCode.OK) { + LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", + transactionId, retryTime, response); + if (response.getStatus().getCode() == MetaServiceCode.LOCK_CONFLICT + || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT) { + // DELETE_BITMAP_LOCK_ERR will be retried on be + throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, + "Failed to get delete bitmap lock due to confilct"); + } + throw new UserException("Failed to get delete bitmap lock, code: " + response.getStatus().getCode()); + } + } + stopWatch.stop(); + LOG.info("get delete bitmap lock successfully. txns: {}. time cost: {} ms.", + transactionId, stopWatch.getTime()); + } + + private void sendCalcDeleteBitmaptask(long dbId, long transactionId, + Map> backendToPartitionInfos) + throws UserException { + if (backendToPartitionInfos.isEmpty()) { + return; + } + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + int totalTaskNum = backendToPartitionInfos.size(); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch( + totalTaskNum); + AgentBatchTask batchTask = new AgentBatchTask(); + for (Map.Entry> entry : backendToPartitionInfos.entrySet()) { + CalcDeleteBitmapTask task = new CalcDeleteBitmapTask(entry.getKey(), + transactionId, + dbId, + entry.getValue(), + countDownLatch); + countDownLatch.addMark(entry.getKey(), transactionId); + // add to AgentTaskQueue for handling finish report. + // not check return value, because the add will success + AgentTaskQueue.addTask(task); + batchTask.addTask(task); + } + AgentTaskExecutor.submit(batchTask); + + boolean ok; + try { + ok = countDownLatch.await(CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("InterruptedException: ", e); + ok = false; + } + + if (!ok || !countDownLatch.getStatus().ok()) { + String errMsg = "Failed to calculate delete bitmap."; + // clear tasks + AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CALCULATE_DELETE_BITMAP); + + if (!countDownLatch.getStatus().ok()) { + errMsg += countDownLatch.getStatus().getErrorMsg(); + if (countDownLatch.getStatus().getErrorCode() != TStatusCode.DELETE_BITMAP_LOCK_ERROR) { + throw new UserException(errMsg); + } + } else { + errMsg += " Timeout."; + List> unfinishedMarks = countDownLatch.getLeftMarks(); + // only show at most 3 results + List> subList = unfinishedMarks.subList(0, + Math.min(unfinishedMarks.size(), 3)); + if (!subList.isEmpty()) { + errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList); + } + } + LOG.warn(errMsg); + // DELETE_BITMAP_LOCK_ERR will be retried on be + throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, errMsg); + } else { + // Sometimes BE calc delete bitmap succeed, but FE wait timeout for some unknown reasons, + // FE will retry the calculation on BE, this debug point simulates such situation. + debugCalcDeleteBitmapRandomTimeout(); + } + stopWatch.stop(); + LOG.info("calc delete bitmap task successfully. txns: {}. time cost: {} ms.", + transactionId, stopWatch.getTime()); + } + + private void debugCalcDeleteBitmapRandomTimeout() throws UserException { + DebugPoint debugPoint = DebugPointUtil.getDebugPoint( + "CloudGlobalTransactionMgr.calc_delete_bitmap_random_timeout"); + if (debugPoint == null) { + return; + } + + double percent = debugPoint.param("percent", 0.5); + if (new SecureRandom().nextInt() % 100 < 100 * percent) { + throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, + "DebugPoint: Failed to calculate delete bitmap: Timeout."); + } + } + + @Override + public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis) + throws UserException { + return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null); + } + + @Override + public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, + List subTransactionStates, long timeoutMillis) throws UserException { + throw new UnsupportedOperationException("commitAndPublishTransaction is not supported in cloud"); + } + + @Override + public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, + TxnCommitAttachment txnCommitAttachment) throws UserException { + if (!MetaLockUtils.tryCommitLockTables(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { + // DELETE_BITMAP_LOCK_ERR will be retried on be + throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, + "get table cloud commit lock timeout, tableList=(" + + StringUtils.join(tableList, ",") + ")"); + } + try { + commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } finally { + MetaLockUtils.commitUnlockTables(tableList); + } + return true; + } + + @Override + public void commitTransaction2PC(Database db, List
tableList, long transactionId, long timeoutMillis) + throws UserException { + commitTransaction(db.getId(), tableList, transactionId, null, null, true); + } + + @Override + public void abortTransaction(Long dbId, Long transactionId, String reason) throws UserException { + abortTransaction(dbId, transactionId, reason, null, null); + } + + @Override + public void abortTransaction(Long dbId, Long transactionId, String reason, + TxnCommitAttachment txnCommitAttachment, List
tableList) throws UserException { + LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId); + + AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); + builder.setDbId(dbId); + builder.setTxnId(transactionId); + builder.setReason(reason); + builder.setCloudUniqueId(Config.cloud_unique_id); + + final AbortTxnRequest abortTxnRequest = builder.build(); + AbortTxnResponse abortTxnResponse = null; + int retryTime = 0; + try { + while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, abortTxnRequest:{}", retryTime, abortTxnRequest); + } + abortTxnResponse = MetaServiceProxy + .getInstance().abortTxn(abortTxnRequest); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, abortTxnResponse:{}", retryTime, abortTxnResponse); + } + if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + // sleep random [20, 200] ms, avoid txn conflict + LOG.info("abortTxn KV_TXN_CONFLICT, transactionId:{}, retryTime:{}", transactionId, retryTime); + backoff(); + retryTime++; + } + Preconditions.checkNotNull(abortTxnResponse); + Preconditions.checkNotNull(abortTxnResponse.getStatus()); + } catch (RpcException e) { + LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e); + throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); + } + + TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); + TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); + if (cb != null) { + LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), + txnState.getCallbackId()); + cb.afterAborted(txnState, true, txnState.getReason()); + } + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TXN_FAILED.increase(1L); + } + } + + @Override + public void abortTransaction(Long dbId, String label, String reason) throws UserException { + LOG.info("try to abort transaction, dbId:{}, label:{}", dbId, label); + + AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); + builder.setDbId(dbId); + builder.setLabel(label); + builder.setReason(reason); + builder.setCloudUniqueId(Config.cloud_unique_id); + + final AbortTxnRequest abortTxnRequest = builder.build(); + AbortTxnResponse abortTxnResponse = null; + int retryTime = 0; + + try { + while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { + if (LOG.isDebugEnabled()) { + LOG.debug("retyTime:{}, abortTxnRequest:{}", retryTime, abortTxnRequest); + } + abortTxnResponse = MetaServiceProxy + .getInstance().abortTxn(abortTxnRequest); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, abortTxnResponse:{}", retryTime, abortTxnResponse); + } + if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + + // sleep random [20, 200] ms, avoid txn conflict + LOG.info("abortTxn KV_TXN_CONFLICT, dbId:{}, label:{}, retryTime:{}", dbId, label, retryTime); + backoff(); + retryTime++; + } + Preconditions.checkNotNull(abortTxnResponse); + Preconditions.checkNotNull(abortTxnResponse.getStatus()); + } catch (Exception e) { + LOG.warn("abortTxn failed, label:{}, exception:", label, e); + throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); + } + + TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); + TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); + if (cb == null) { + LOG.info("no callback to run for this txn, txnId:{} callbackId:{}", txnState.getTransactionId(), + txnState.getCallbackId()); + return; + } + + LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), txnState.getCallbackId()); + cb.afterAborted(txnState, true, txnState.getReason()); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_TXN_FAILED.increase(1L); + } + } + + @Override + public void abortTransaction2PC(Long dbId, long transactionId, List
tableList) throws UserException { + LOG.info("try to abortTransaction2PC, dbId:{}, transactionId:{}", dbId, transactionId); + abortTransaction(dbId, transactionId, "User Abort", null, null); + LOG.info("abortTransaction2PC successfully, dbId:{}, transactionId:{}", dbId, transactionId); + } + + @Override + public List getReadyToPublishTransactions() { + //do nothing for CloudGlobalTransactionMgr + return new ArrayList(); + } + + @Override + public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { + //do nothing for CloudGlobalTransactionMgr + return false; + } + + @Override + public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { + throw new UserException("Disallow to call finishTransaction()"); + } + + @Override + public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List tableIdList) + throws AnalysisException { + LOG.info("isPreviousTransactionsFinished(), endTransactionId:{}, dbId:{}, tableIdList:{}", + endTransactionId, dbId, tableIdList); + + if (endTransactionId <= 0) { + throw new AnalysisException("Invaid endTransactionId:" + endTransactionId); + } + CheckTxnConflictRequest.Builder builder = CheckTxnConflictRequest.newBuilder(); + builder.setDbId(dbId); + builder.setEndTxnId(endTransactionId); + builder.addAllTableIds(tableIdList); + builder.setCloudUniqueId(Config.cloud_unique_id); + + final CheckTxnConflictRequest checkTxnConflictRequest = builder.build(); + CheckTxnConflictResponse checkTxnConflictResponse = null; + try { + LOG.info("CheckTxnConflictRequest:{}", checkTxnConflictRequest); + checkTxnConflictResponse = MetaServiceProxy + .getInstance().checkTxnConflict(checkTxnConflictRequest); + LOG.info("CheckTxnConflictResponse: {}", checkTxnConflictResponse); + } catch (RpcException e) { + throw new AnalysisException(e.getMessage()); + } + + if (checkTxnConflictResponse.getStatus().getCode() != MetaServiceCode.OK) { + throw new AnalysisException(checkTxnConflictResponse.getStatus().getMsg()); + } + return checkTxnConflictResponse.getFinished(); + } + + public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, long tableId, + long partitionId) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + public boolean isPreviousNonTimeoutTxnFinished(long endTransactionId, long dbId, List tableIdList) + throws AnalysisException { + LOG.info("isPreviousNonTimeoutTxnFinished(), endTransactionId:{}, dbId:{}, tableIdList:{}", + endTransactionId, dbId, tableIdList); + + if (endTransactionId <= 0) { + throw new AnalysisException("Invaid endTransactionId:" + endTransactionId); + } + CheckTxnConflictRequest.Builder builder = CheckTxnConflictRequest.newBuilder(); + builder.setDbId(dbId); + builder.setEndTxnId(endTransactionId); + builder.addAllTableIds(tableIdList); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setIgnoreTimeoutTxn(true); + + final CheckTxnConflictRequest checkTxnConflictRequest = builder.build(); + CheckTxnConflictResponse checkTxnConflictResponse = null; + try { + LOG.info("CheckTxnConflictRequest:{}", checkTxnConflictRequest); + checkTxnConflictResponse = MetaServiceProxy + .getInstance().checkTxnConflict(checkTxnConflictRequest); + LOG.info("CheckTxnConflictResponse: {}", checkTxnConflictResponse); + } catch (RpcException e) { + throw new AnalysisException(e.getMessage()); + } + + if (checkTxnConflictResponse.getStatus().getCode() != MetaServiceCode.OK) { + throw new AnalysisException(checkTxnConflictResponse.getStatus().getMsg()); + } + return checkTxnConflictResponse.getFinished(); + } + + @Override + public void removeExpiredAndTimeoutTxns() { + // do nothing in cloud mode + } + + public void cleanLabel(Long dbId, String label, boolean isReplay) throws Exception { + LOG.info("try to cleanLabel dbId: {}, label:{}", dbId, label); + CleanTxnLabelRequest.Builder builder = CleanTxnLabelRequest.newBuilder(); + builder.setDbId(dbId).setCloudUniqueId(Config.cloud_unique_id); + + if (!Strings.isNullOrEmpty(label)) { + builder.addLabels(label); + } + + final CleanTxnLabelRequest cleanTxnLabelRequest = builder.build(); + CleanTxnLabelResponse cleanTxnLabelResponse = null; + int retryTime = 0; + + try { + // 5 times retry is enough for clean label + while (retryTime < 5) { + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, cleanTxnLabel:{}", retryTime, cleanTxnLabelRequest); + } + cleanTxnLabelResponse = MetaServiceProxy.getInstance().cleanTxnLabel(cleanTxnLabelRequest); + if (LOG.isDebugEnabled()) { + LOG.debug("retryTime:{}, cleanTxnLabel:{}", retryTime, cleanTxnLabelResponse); + } + if (cleanTxnLabelResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + // sleep random [20, 200] ms, avoid txn conflict + LOG.info("cleanTxnLabel KV_TXN_CONFLICT, dbId:{}, label:{}, retryTime:{}", dbId, label, retryTime); + backoff(); + retryTime++; + } + + Preconditions.checkNotNull(cleanTxnLabelResponse); + Preconditions.checkNotNull(cleanTxnLabelResponse.getStatus()); + } catch (Exception e) { + LOG.warn("cleanTxnLabel failed, dbId:{}, exception:", dbId, e); + throw new UserException("cleanTxnLabel failed, errMsg:" + e.getMessage()); + } + + if (cleanTxnLabelResponse.getStatus().getCode() != MetaServiceCode.OK) { + LOG.warn("cleanTxnLabel failed, dbId:{} label:{} retryTime:{} cleanTxnLabelResponse:{}", + dbId, label, retryTime, cleanTxnLabelResponse); + throw new UserException("cleanTxnLabel failed, errMsg:" + cleanTxnLabelResponse.getStatus().getMsg()); + } + return; + } + + @Override + public void updateMultiTableRunningTransactionTableIds(Long dbId, Long transactionId, List tableIds) + throws UserException { + //throw new UserException(NOT_SUPPORTED_MSG); + } + + @Override + public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) + throws AnalysisException, TimeoutException { + long dbId = request.getDbId(); + int commitTimeoutSec = Config.commit_timeout_second; + for (int i = 0; i < commitTimeoutSec; ++i) { + Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbId); + TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult(); + statusResult.status = new TStatus(); + TransactionStatus txnStatus = null; + if (request.isSetTxnId()) { + long txnId = request.getTxnId(); + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, txnId); + if (txnState == null) { + throw new AnalysisException("txn does not exist: " + txnId); + } + txnStatus = txnState.getTransactionStatus(); + if (!txnState.getReason().trim().isEmpty()) { + statusResult.status.setErrorMsgsIsSet(true); + statusResult.status.addToErrorMsgs(txnState.getReason()); + } + } else { + txnStatus = getLabelState(dbId, request.getLabel()); + } + if (txnStatus == TransactionStatus.UNKNOWN || txnStatus.isFinalStatus()) { + statusResult.setTxnStatusId(txnStatus.value()); + return statusResult; + } + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + LOG.info("commit sleep exception.", e); + } + } + throw new TimeoutException("Operation is timeout"); + } + + @Override + public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) throws AnalysisException { + // do nothing in cloud mode + } + + @Override + public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { + // do nothing in cloud mode + } + + @Override + public TransactionStatus getLabelState(long dbId, String label) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public Long getTransactionId(Long dbId, String label) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public TransactionState getTransactionState(long dbId, long transactionId) { + LOG.info("try to get transaction state, dbId:{}, transactionId:{}", dbId, transactionId); + GetTxnRequest.Builder builder = GetTxnRequest.newBuilder(); + builder.setDbId(dbId); + builder.setTxnId(transactionId); + builder.setCloudUniqueId(Config.cloud_unique_id); + + final GetTxnRequest getTxnRequest = builder.build(); + GetTxnResponse getTxnResponse = null; + try { + LOG.info("getTxnRequest:{}", getTxnRequest); + getTxnResponse = MetaServiceProxy + .getInstance().getTxn(getTxnRequest); + LOG.info("getTxnRequest: {}", getTxnResponse); + } catch (RpcException e) { + LOG.info("getTransactionState exception: {}", e.getMessage()); + return null; + } + + if (getTxnResponse.getStatus().getCode() != MetaServiceCode.OK || !getTxnResponse.hasTxnInfo()) { + LOG.info("getTransactionState exception: {}, {}", getTxnResponse.getStatus().getCode(), + getTxnResponse.getStatus().getMsg()); + return null; + } + return TxnUtil.transactionStateFromPb(getTxnResponse.getTxnInfo()); + } + + @Override + public Long getTransactionIdByLabel(Long dbId, String label, List statusList) + throws UserException { + LOG.info("try to get transaction id by label, dbId:{}, label:{}", dbId, label); + GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder(); + builder.setDbId(dbId); + builder.setLabel(label); + builder.setCloudUniqueId(Config.cloud_unique_id); + for (TransactionStatus status : statusList) { + if (status == TransactionStatus.PREPARE) { + builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED); + } else if (status == TransactionStatus.PRECOMMITTED) { + builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED); + } else if (status == TransactionStatus.COMMITTED) { + builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED); + } + } + + final GetTxnIdRequest getTxnIdRequest = builder.build(); + GetTxnIdResponse getTxnIdResponse = null; + try { + LOG.info("getTxnRequest:{}", getTxnIdRequest); + getTxnIdResponse = MetaServiceProxy + .getInstance().getTxnId(getTxnIdRequest); + LOG.info("getTxnIdReponse: {}", getTxnIdResponse); + } catch (RpcException e) { + LOG.info("getTransactionId exception: {}", e.getMessage()); + throw new TransactionNotFoundException("transaction not found, label=" + label); + } + + if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) { + LOG.info("getTransactionState exception: {}, {}", getTxnIdResponse.getStatus().getCode(), + getTxnIdResponse.getStatus().getMsg()); + throw new TransactionNotFoundException("transaction not found, label=" + label); + } + return getTxnIdResponse.getTxnId(); + } + + @Override + public List getPreCommittedTxnList(Long dbId) throws AnalysisException { + // todo + return new ArrayList(); + } + + @Override + public int getTransactionNum() { + return 0; + } + + @Override + public Long getNextTransactionId() throws UserException { + GetCurrentMaxTxnRequest.Builder builder = GetCurrentMaxTxnRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + + final GetCurrentMaxTxnRequest getCurrentMaxTxnRequest = builder.build(); + GetCurrentMaxTxnResponse getCurrentMaxTxnResponse = null; + try { + LOG.info("GetCurrentMaxTxnRequest:{}", getCurrentMaxTxnRequest); + getCurrentMaxTxnResponse = MetaServiceProxy + .getInstance().getCurrentMaxTxnId(getCurrentMaxTxnRequest); + LOG.info("GetCurrentMaxTxnResponse: {}", getCurrentMaxTxnResponse); + } catch (RpcException e) { + LOG.warn("getNextTransactionId() RpcException: {}", e.getMessage()); + throw new UserException("getNextTransactionId() RpcException: " + e.getMessage()); + } + + if (getCurrentMaxTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { + LOG.info("getNextTransactionId() failed, code: {}, msg: {}", + getCurrentMaxTxnResponse.getStatus().getCode(), getCurrentMaxTxnResponse.getStatus().getMsg()); + throw new UserException("getNextTransactionId() failed, msg:" + + getCurrentMaxTxnResponse.getStatus().getMsg()); + } + return getCurrentMaxTxnResponse.getCurrentMaxTxnId(); + } + + @Override + public int getRunningTxnNums(Long dbId) throws AnalysisException { + return 0; + } + + @Override + public long getTxnNumByStatus(TransactionStatus status) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getAllRunningTxnNum() { + return 0; + } + + @Override + public long getAllPublishTxnNum() { + return 0; + } + + /** + * backoff policy implement by sleep random ms in [20ms, 200ms] + */ + private void backoff() { + int randomMillis = 20 + (int) (Math.random() * (200 - 20)); + try { + Thread.sleep(randomMillis); + } catch (InterruptedException e) { + LOG.info("InterruptedException: ", e); + } + } + + @Override + public TransactionIdGenerator getTransactionIDGenerator() throws Exception { + throw new Exception(NOT_SUPPORTED_MSG); + } + + @Override + public List> getDbInfo() throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public List> getDbTransStateInfo(Long dbId) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public List> getDbTransInfo(Long dbId, boolean running, int limit) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public Map> getDbRunningTransInfo(long dbId) throws AnalysisException { + return Maps.newHashMap(); + } + + @Override + public List> getDbTransInfoByStatus(Long dbId, TransactionStatus status) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public List> getDbTransInfoByLabelMatch(long dbId, String label) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public List> getSingleTranInfo(long dbId, long txnId) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public List> getTableTransInfo(long dbId, long txnId) throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public List> getPartitionTransInfo(long dbId, long tid, long tableId) + throws AnalysisException { + throw new AnalysisException(NOT_SUPPORTED_MSG); + } + + @Override + public void write(DataOutput out) throws IOException { + throw new IOException(NOT_SUPPORTED_MSG); + } + + @Override + public void readFields(DataInput in) throws IOException { + throw new IOException(NOT_SUPPORTED_MSG); + } + + @Override + public void replayUpsertTransactionState(TransactionState transactionState) throws Exception { + throw new Exception(NOT_SUPPORTED_MSG); + } + + @Deprecated + public void replayDeleteTransactionState(TransactionState transactionState) throws Exception { + throw new Exception(NOT_SUPPORTED_MSG); + } + + @Override + public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation operation) throws Exception { + throw new Exception(NOT_SUPPORTED_MSG); + } + + @Override + public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) + throws Exception { + throw new Exception(NOT_SUPPORTED_MSG); + } + + @Override + public void addSubTransaction(long dbId, long transactionId, long subTransactionId) { + throw new UnsupportedOperationException("addSubTransaction is not supported in cloud"); + } + + @Override + public void removeSubTransaction(long dbId, long subTransactionId) { + throw new UnsupportedOperationException("removeSubTransaction is not supported in cloud"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 7a107321f4dc79..381ecacefd78ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -139,6 +139,8 @@ import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.event.DropPartitionEvent; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.persist.AlterDatabasePropertyInfo; @@ -1809,11 +1811,22 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause long version = olapTable.getNextVersion(); long versionTime = System.currentTimeMillis(); olapTable.updateVisibleVersionAndTime(version, versionTime); + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of deleting the partition + try { + Env.getCurrentEnv().getEventProcessor().processEvent( + new DropPartitionEvent(db.getCatalog().getId(), db.getId(), + olapTable.getId())); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } // log DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition, clause.isForceDrop(), recycleTime, version, versionTime); Env.getCurrentEnv().getEditLog().logDropPartition(info); - LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}", partitionName, olapTable.getId(), olapTable.getName(), isTempPartition, clause.isForceDrop()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java new file mode 100644 index 00000000000000..d58e62bfddeb0e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +public class DataChangeEvent extends TableEvent { + public DataChangeEvent(long ctlId, long dbId, long tableId) { + super(EventType.DATA_CHANGE, ctlId, dbId, tableId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java new file mode 100644 index 00000000000000..67339ebd05ab55 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +public class DropPartitionEvent extends TableEvent { + public DropPartitionEvent(long ctlId, long dbId, long tableId) { + super(EventType.DROP_PARTITION, ctlId, dbId, tableId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/Event.java b/fe/fe-core/src/main/java/org/apache/doris/event/Event.java new file mode 100644 index 00000000000000..e049a1aeb8cf2a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/Event.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +import org.apache.doris.catalog.Env; + +import java.util.Objects; + +public abstract class Event { + protected final long eventId; + + // eventTime of the event. Used instead of calling getter on event everytime + protected final long eventTime; + + // eventType from the NotificationEvent + protected final EventType eventType; + + protected Event(EventType eventType) { + Objects.requireNonNull(eventType, "require eventType"); + this.eventId = Env.getCurrentEnv().getNextId(); + this.eventTime = System.currentTimeMillis(); + this.eventType = eventType; + } + + public long getEventId() { + return eventId; + } + + public long getEventTime() { + return eventTime; + } + + public EventType getEventType() { + return eventType; + } + + @Override + public String toString() { + return "Event{" + + "eventId=" + eventId + + ", eventTime=" + eventTime + + ", eventType=" + eventType + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java new file mode 100644 index 00000000000000..425ca03d65f0f1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.event; + +public class EventException extends Exception { + + public EventException(String msg, Throwable cause) { + super(msg, cause); + } + + public EventException(String msg) { + super(msg); + } + + public EventException(Exception e) { + super(e); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java new file mode 100644 index 00000000000000..d5c142bf93472d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +public interface EventListener { + + void processEvent(Event event) throws EventException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java new file mode 100644 index 00000000000000..4731a17a372b83 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; +import java.util.Set; + +public class EventProcessor { + + private static final Logger LOG = LogManager.getLogger(EventProcessor.class); + + private Set listeners = Sets.newHashSet(); + + public EventProcessor(EventListener... args) { + for (EventListener listener : args) { + this.listeners.add(listener); + } + } + + public boolean processEvent(Event event) { + Objects.requireNonNull(event); + if (LOG.isDebugEnabled()) { + LOG.debug("processEvent: {}", event); + } + boolean result = true; + for (EventListener listener : listeners) { + try { + listener.processEvent(event); + } catch (EventException e) { + // A listener processing failure does not affect other listeners + LOG.warn("[{}] process event failed, event: {}, errMsg: {}", listener.getClass().getName(), event, + e.getMessage()); + result = false; + } + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java new file mode 100644 index 00000000000000..be942141fd33c6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +public enum EventType { + DATA_CHANGE, + REPLACE_PARTITION, + DROP_PARTITION +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java new file mode 100644 index 00000000000000..371d5cd553c6e3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +public class ReplacePartitionEvent extends TableEvent { + public ReplacePartitionEvent(long ctlId, long dbId, long tableId) { + super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java new file mode 100644 index 00000000000000..210ad2df40f403 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +public abstract class TableEvent extends Event { + protected final long ctlId; + protected final long dbId; + protected final long tableId; + + public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) { + super(eventType); + this.ctlId = ctlId; + this.dbId = dbId; + this.tableId = tableId; + } + + public long getCtlId() { + return ctlId; + } + + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + @Override + public String toString() { + return "TableEvent{" + + "ctlId=" + ctlId + + ", dbId=" + dbId + + ", tableId=" + tableId + + "} " + super.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index 4f44b2e14b9987..5d7cf4435b9c7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -140,27 +140,44 @@ public List createTasks(TaskType taskType, MTMVTaskContext taskContext /** * if user trigger, return true - * if system trigger, Check if there are any system triggered tasks, and if so, return false + * else, only can have 2 task. because every task can refresh all data. * * @param taskContext * @return */ @Override public boolean isReadyForScheduling(MTMVTaskContext taskContext) { - if (taskContext != null) { + if (isManual(taskContext)) { return true; } List runningTasks = getRunningTasks(); + int runningNum = 0; for (MTMVTask task : runningTasks) { - if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) { - LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}", - task); - return false; + if (!isManual(task.getTaskContext())) { + runningNum++; + // Prerequisite: Each refresh will calculate which partitions to refresh + // + // For example, there is currently a running task that is refreshing partition p1. + // If the data of p2 changes at this time and triggers a refresh task t2, + // according to the logic (>=1), t2 will be lost + // + // If the logic is >=2, t2 will wait lock of MTMVJob. + // If the p3 data changes again and triggers the refresh task t3, + // then t3 will be discarded. However, when t2 runs, both p2 and p3 data will be refreshed. + if (runningNum >= 2) { + LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}", + task); + return false; + } } } return true; } + private boolean isManual(MTMVTaskContext taskContext) { + return taskContext != null && taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL; + } + @Override public ShowResultSetMetaData getJobMetaData() { return JOB_META_DATA; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 240c7de6a71a23..517909f5e1f128 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -107,6 +107,7 @@ public class MTMVTask extends AbstractTask { public enum MTMVTaskTriggerMode { MANUAL, + COMMIT, SYSTEM } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 9b3b6be04f10b2..bc9a3fdd2050f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -32,18 +32,24 @@ public class BaseTableInfo { private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class); @SerializedName("ti") - private Long tableId; + private long tableId; @SerializedName("di") - private Long dbId; + private long dbId; @SerializedName("ci") - private Long ctlId; + private long ctlId; - public BaseTableInfo(Long tableId, Long dbId) { + public BaseTableInfo(long tableId, long dbId) { this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID; } + public BaseTableInfo(long tableId, long dbId, long ctlId) { + this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); + this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); + this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null"); + } + public BaseTableInfo(TableIf table) { DatabaseIf database = table.getDatabase(); java.util.Objects.requireNonNull(database, "database is null"); @@ -54,15 +60,15 @@ public BaseTableInfo(TableIf table) { this.ctlId = catalog.getId(); } - public Long getTableId() { + public long getTableId() { return tableId; } - public Long getDbId() { + public long getDbId() { return dbId; } - public Long getCtlId() { + public long getCtlId() { return ctlId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index f53a7b60868ace..bed44e8d37d136 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -45,6 +45,7 @@ import org.apache.doris.persist.AlterMTMV; import org.apache.doris.qe.ConnectContext; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -79,11 +80,10 @@ public void createMTMV(MTMV mtmv) throws DdlException { private JobExecutionConfiguration getJobConfig(MTMV mtmv) { JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); - if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger() - .equals(RefreshTrigger.SCHEDULE)) { + RefreshTrigger refreshTrigger = mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger(); + if (refreshTrigger.equals(RefreshTrigger.SCHEDULE)) { setScheduleJobConfig(jobExecutionConfiguration, mtmv); - } else if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger() - .equals(RefreshTrigger.MANUAL)) { + } else if (refreshTrigger.equals(RefreshTrigger.MANUAL) || refreshTrigger.equals(RefreshTrigger.COMMIT)) { setManualJobConfig(jobExecutionConfiguration, mtmv); } return jobExecutionConfiguration; @@ -210,9 +210,20 @@ public void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNot job.cancelTaskById(info.getTaskId()); } + public void onCommit(MTMV mtmv) throws DdlException, JobException { + MTMVJob job = getJobByMTMV(mtmv); + MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.COMMIT, Lists.newArrayList(), + false); + Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext); + } + private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb()); MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW); + return getJobByMTMV(mtmv); + } + + private MTMVJob getJobByMTMV(MTMV mtmv) throws DdlException { List jobs = Env.getCurrentEnv().getJobManager() .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName()); if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java index 0f4f904c573c4c..b9d27db9c22045 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java @@ -43,6 +43,7 @@ public enum BuildMode { */ public enum RefreshTrigger { MANUAL, //manual + COMMIT, //manual SCHEDULE // schedule } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index cbbaef6b917a25..d5d86b7eedab97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -22,8 +22,13 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.event.Event; +import org.apache.doris.event.EventException; +import org.apache.doris.event.EventListener; +import org.apache.doris.event.TableEvent; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; @@ -36,8 +41,9 @@ import java.util.Map; import java.util.Objects; +import java.util.Set; -public class MTMVService { +public class MTMVService implements EventListener { private static final Logger LOG = LogManager.getLogger(MTMVService.class); private Map hooks = Maps.newConcurrentMap(); @@ -162,4 +168,27 @@ public void cancelMTMVTask(CancelMTMVTaskInfo info) throws MetaNotFoundException mtmvHookService.cancelMTMVTask(info); } } + + @Override + public void processEvent(Event event) throws EventException { + Objects.requireNonNull(event); + if (!(event instanceof TableEvent)) { + return; + } + TableEvent tableEvent = (TableEvent) event; + LOG.info("processEvent, Event: {}", event); + Set mtmvs = relationManager.getMtmvsByBaseTableOneLevel( + new BaseTableInfo(tableEvent.getTableId(), tableEvent.getDbId(), tableEvent.getCtlId())); + for (BaseTableInfo baseTableInfo : mtmvs) { + try { + // check if mtmv should trigger by event + MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(), baseTableInfo.getTableId()); + if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) { + jobManager.onCommit(mtmv); + } + } catch (Exception e) { + throw new EventException(e); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a80c0612deb6b7..4faceb3ae94a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -700,6 +700,9 @@ public MTMVRefreshTriggerInfo visitRefreshTrigger(RefreshTriggerContext ctx) { if (ctx.MANUAL() != null) { return new MTMVRefreshTriggerInfo(RefreshTrigger.MANUAL); } + if (ctx.COMMIT() != null) { + return new MTMVRefreshTriggerInfo(RefreshTrigger.COMMIT); + } if (ctx.SCHEDULE() != null) { return new MTMVRefreshTriggerInfo(RefreshTrigger.SCHEDULE, visitRefreshSchedule(ctx.refreshSchedule())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 6d53148e5af012..0462a1ec1abd14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -49,6 +49,7 @@ import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.event.DataChangeEvent; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; @@ -1052,6 +1053,17 @@ public void finishTransaction(long transactionId) throws UserException { } finally { MetaLockUtils.writeUnlockTables(tableList); } + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of transaction + try { + produceEvent(transactionState, db); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } + // The visible latch should only be counted down after all things are done // (finish transaction, write edit log, etc). // Otherwise, there is no way for stream load to query the result right after loading finished, @@ -1075,6 +1087,26 @@ private void setTableVersion(TransactionState transactionState, Database db) { } } + private void produceEvent(TransactionState transactionState, Database db) { + Collection tableCommitInfos; + if (!transactionState.getSubTxnIdToTableCommitInfo().isEmpty()) { + tableCommitInfos = transactionState.getSubTxnTableCommitInfos(); + } else { + tableCommitInfos = transactionState.getIdToTableCommitInfos().values(); + } + for (TableCommitInfo tableCommitInfo : tableCommitInfos) { + long tableId = tableCommitInfo.getTableId(); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + if (table == null) { + LOG.warn("table {} does not exist when produceEvent. transaction: {}, db: {}", + tableId, transactionState.getTransactionId(), db.getId()); + continue; + } + Env.getCurrentEnv().getEventProcessor().processEvent( + new DataChangeEvent(db.getCatalog().getId(), db.getId(), tableId)); + } + } + private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db, List> relatedTblPartitions) { Iterator tableCommitInfoIterator diff --git a/regression-test/data/mtmv_p0/test_commit_mtmv.out b/regression-test/data/mtmv_p0/test_commit_mtmv.out new file mode 100644 index 00000000000000..fafb8f883a4c25 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_commit_mtmv.out @@ -0,0 +1,40 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !mv1 -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !task1 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv2 -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !task2 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv1_2 -- +1 2017-01-15 1 +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv2_2 -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv1_init -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv1_drop -- +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv1_replace -- +3 2017-03-15 3 + diff --git a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy new file mode 100644 index 00000000000000..cd02dcd57d7fa5 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_commit_mtmv") { + def tableName = "test_commit_mtmv_table" + def mvName1 = "test_commit_mtmv1" + def mvName2 = "test_commit_mtmv2" + def dbName = "regression_test_mtmv_p0" + sql """drop materialized view if exists ${mvName1};""" + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists `${tableName}`""" + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName1} + BUILD DEFERRED REFRESH AUTO ON COMMIT + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + sql """ + CREATE MATERIALIZED VIEW ${mvName2} + BUILD DEFERRED REFRESH AUTO ON COMMIT + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${mvName1}; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);; + """ + def jobName1 = getJobName(dbName, mvName1); + waitingMTMVTaskFinished(jobName1) + order_qt_mv1 "SELECT * FROM ${mvName1}" + order_qt_task1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvName1}' order by CreateTime desc limit 1" + + def jobName2 = getJobName(dbName, mvName2); + waitingMTMVTaskFinished(jobName2) + order_qt_mv2 "SELECT * FROM ${mvName2}" + order_qt_task2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvName2}' order by CreateTime desc limit 1" + + // on manual can not trigger by commit + sql """ + alter MATERIALIZED VIEW ${mvName2} REFRESH ON MANUAL; + """ + + sql """ + insert into ${tableName} values(1,"2017-01-15",1);; + """ + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_2 "SELECT * FROM ${mvName1}" + waitingMTMVTaskFinished(jobName2) + order_qt_mv2_2 "SELECT * FROM ${mvName2}" + + sql """drop materialized view if exists ${mvName1};""" + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists `${tableName}`""" + + // test drop partition + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703 VALUES [('2017-03-01'), ('2017-04-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + CREATE MATERIALIZED VIEW ${mvName1} + BUILD DEFERRED REFRESH AUTO ON COMMIT + PARTITION BY (`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);; + """ + jobName1 = getJobName(dbName, mvName1); + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_init "SELECT * FROM ${mvName1}" + + sql """alter table ${tableName} drop PARTITION p201701""" + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_drop "SELECT * FROM ${mvName1}" + + // test replace partition + sql """ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p201702_t VALUES [('2017-02-01'), ('2017-03-01'));""" + sql """ALTER TABLE ${tableName} REPLACE PARTITION (p201702) WITH TEMPORARY PARTITION (p201702_t);""" + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_replace "SELECT * FROM ${mvName1}" + + sql """drop materialized view if exists ${mvName1};""" + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists `${tableName}`""" + +} From 757cd7ae8ee366988470fa7c860981f0f1abea49 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Fri, 31 May 2024 11:54:46 +0800 Subject: [PATCH 2/3] 1 --- .../main/java/org/apache/doris/datasource/InternalCatalog.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 381ecacefd78ef..3bb5cb0bcb88bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -140,7 +140,6 @@ import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.event.DropPartitionEvent; -import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.persist.AlterDatabasePropertyInfo; From 8e914e16885fef980604617702b1f1ad21efe161 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Fri, 31 May 2024 11:58:50 +0800 Subject: [PATCH 3/3] 1 --- .../CloudGlobalTransactionMgr.java | 1361 ----------------- 1 file changed, 1361 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java deleted file mode 100644 index f680f9457d1c7d..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ /dev/null @@ -1,1361 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.cloud.transaction; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.catalog.TabletInvertedIndex; -import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.cloud.catalog.CloudPartition; -import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest; -import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse; -import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest; -import org.apache.doris.cloud.proto.Cloud.BeginTxnResponse; -import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictRequest; -import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictResponse; -import org.apache.doris.cloud.proto.Cloud.CleanTxnLabelRequest; -import org.apache.doris.cloud.proto.Cloud.CleanTxnLabelResponse; -import org.apache.doris.cloud.proto.Cloud.CommitTxnRequest; -import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse; -import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest; -import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse; -import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest; -import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse; -import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest; -import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse; -import org.apache.doris.cloud.proto.Cloud.GetTxnRequest; -import org.apache.doris.cloud.proto.Cloud.GetTxnResponse; -import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB; -import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; -import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest; -import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; -import org.apache.doris.cloud.proto.Cloud.TableStatsPB; -import org.apache.doris.cloud.proto.Cloud.TxnInfoPB; -import org.apache.doris.cloud.proto.Cloud.TxnStatusPB; -import org.apache.doris.cloud.proto.Cloud.UniqueIdPB; -import org.apache.doris.cloud.rpc.MetaServiceProxy; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; -import org.apache.doris.common.DuplicatedRequestException; -import org.apache.doris.common.FeNameFormat; -import org.apache.doris.common.InternalErrorCode; -import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.MarkedCountDownLatch; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.QuotaExceedException; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugPointUtil; -import org.apache.doris.common.util.DebugPointUtil.DebugPoint; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.InternalDatabaseUtil; -import org.apache.doris.common.util.MetaLockUtils; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.event.DataChangeEvent; -import org.apache.doris.load.loadv2.LoadJobFinalOperation; -import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; -import org.apache.doris.metric.MetricRepo; -import org.apache.doris.persist.BatchRemoveTransactionsOperation; -import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; -import org.apache.doris.persist.EditLog; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.rpc.RpcException; -import org.apache.doris.task.AgentBatchTask; -import org.apache.doris.task.AgentTaskExecutor; -import org.apache.doris.task.AgentTaskQueue; -import org.apache.doris.task.CalcDeleteBitmapTask; -import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo; -import org.apache.doris.thrift.TStatus; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TTaskType; -import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.thrift.TWaitingTxnStatusRequest; -import org.apache.doris.thrift.TWaitingTxnStatusResult; -import org.apache.doris.transaction.BeginTransactionException; -import org.apache.doris.transaction.GlobalTransactionMgrIface; -import org.apache.doris.transaction.SubTransactionState; -import org.apache.doris.transaction.TabletCommitInfo; -import org.apache.doris.transaction.TransactionCommitFailedException; -import org.apache.doris.transaction.TransactionIdGenerator; -import org.apache.doris.transaction.TransactionNotFoundException; -import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import org.apache.doris.transaction.TransactionState.TxnCoordinator; -import org.apache.doris.transaction.TransactionStatus; -import org.apache.doris.transaction.TxnCommitAttachment; -import org.apache.doris.transaction.TxnStateCallbackFactory; -import org.apache.doris.transaction.TxnStateChangeCallback; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { - private static final Logger LOG = LogManager.getLogger(CloudGlobalTransactionMgr.class); - private static final String NOT_SUPPORTED_MSG = "Not supported in cloud mode"; - private static final int DELETE_BITMAP_LOCK_EXPIRATION_SECONDS = 10; - private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15; - - private TxnStateCallbackFactory callbackFactory; - - public CloudGlobalTransactionMgr() { - this.callbackFactory = new TxnStateCallbackFactory(); - } - - public void setEditLog(EditLog editLog) { - //do nothing - } - - @Override - public TxnStateCallbackFactory getCallbackFactory() { - return callbackFactory; - } - - @Override - public void addDatabaseTransactionMgr(Long dbId) { - // do nothing in cloud mode - } - - @Override - public void removeDatabaseTransactionMgr(Long dbId) { - // do nothing in cloud mode - } - - @Override - public long beginTransaction(long dbId, List tableIdList, String label, TxnCoordinator coordinator, - LoadJobSourceType sourceType, long timeoutSecond) - throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException, - QuotaExceedException, MetaNotFoundException { - return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond); - } - - @Override - public long beginTransaction(long dbId, List tableIdList, String label, TUniqueId requestId, - TxnCoordinator coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond) - throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException, - QuotaExceedException, MetaNotFoundException { - - LOG.info("try to begin transaction, dbId: {}, label: {}", dbId, label); - if (Config.disable_load_job) { - throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented"); - } - - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); - if (!coordinator.isFromInternal) { - InternalDatabaseUtil.checkDatabase(db.getFullName(), ConnectContext.get()); - } - - switch (sourceType) { - case BACKEND_STREAMING: - checkValidTimeoutSecond(timeoutSecond, Config.max_stream_load_timeout_second, - Config.min_load_timeout_second); - break; - default: - checkValidTimeoutSecond(timeoutSecond, Config.max_load_timeout_second, Config.min_load_timeout_second); - } - - BeginTxnResponse beginTxnResponse = null; - int retryTime = 0; - - try { - Preconditions.checkNotNull(coordinator); - Preconditions.checkNotNull(label); - FeNameFormat.checkLabel(label); - - TxnInfoPB.Builder txnInfoBuilder = TxnInfoPB.newBuilder(); - txnInfoBuilder.setDbId(dbId); - txnInfoBuilder.addAllTableIds(tableIdList); - txnInfoBuilder.setLabel(label); - txnInfoBuilder.setListenerId(listenerId); - - if (requestId != null) { - UniqueIdPB.Builder uniqueIdBuilder = UniqueIdPB.newBuilder(); - uniqueIdBuilder.setHi(requestId.getHi()); - uniqueIdBuilder.setLo(requestId.getLo()); - txnInfoBuilder.setRequestId(uniqueIdBuilder); - } - - txnInfoBuilder.setCoordinator(TxnUtil.txnCoordinatorToPb(coordinator)); - txnInfoBuilder.setLoadJobSourceType(LoadJobSourceTypePB.forNumber(sourceType.value())); - txnInfoBuilder.setTimeoutMs(timeoutSecond * 1000); - txnInfoBuilder.setPrecommitTimeoutMs(Config.stream_load_default_precommit_timeout_second * 1000); - - final BeginTxnRequest beginTxnRequest = BeginTxnRequest.newBuilder() - .setTxnInfo(txnInfoBuilder.build()) - .setCloudUniqueId(Config.cloud_unique_id) - .build(); - - while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, beginTxnRequest:{}", retryTime, beginTxnRequest); - } - beginTxnResponse = MetaServiceProxy.getInstance().beginTxn(beginTxnRequest); - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, beginTxnResponse:{}", retryTime, beginTxnResponse); - } - - if (beginTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { - break; - } - LOG.info("beginTxn KV_TXN_CONFLICT, retryTime:{}", retryTime); - backoff(); - retryTime++; - } - - Preconditions.checkNotNull(beginTxnResponse); - Preconditions.checkNotNull(beginTxnResponse.getStatus()); - } catch (Exception e) { - LOG.warn("beginTxn failed, exception:", e); - throw new BeginTransactionException("beginTxn failed, errMsg:" + e.getMessage()); - } - - if (beginTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { - switch (beginTxnResponse.getStatus().getCode()) { - case TXN_DUPLICATED_REQ: - throw new DuplicatedRequestException(DebugUtil.printId(requestId), - beginTxnResponse.getDupTxnId(), beginTxnResponse.getStatus().getMsg()); - case TXN_LABEL_ALREADY_USED: - throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false); - default: - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_REJECT.increase(1L); - } - throw new BeginTransactionException(beginTxnResponse.getStatus().getMsg()); - } - } - - long txnId = beginTxnResponse.getTxnId(); - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_BEGIN.increase(1L); - } - return txnId; - } - - @Override - public void preCommitTransaction2PC(Database db, List
tableList, long transactionId, - List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) - throws UserException { - LOG.info("try to precommit transaction: {}", transactionId); - if (Config.disable_load_job) { - throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); - } - - PrecommitTxnRequest.Builder builder = PrecommitTxnRequest.newBuilder(); - builder.setDbId(db.getId()); - builder.setTxnId(transactionId); - - if (txnCommitAttachment != null) { - if (txnCommitAttachment instanceof LoadJobFinalOperation) { - LoadJobFinalOperation loadJobFinalOperation = (LoadJobFinalOperation) txnCommitAttachment; - builder.setCommitAttachment(TxnUtil - .loadJobFinalOperationToPb(loadJobFinalOperation)); - } else { - throw new UserException("Invalid txnCommitAttachment"); - } - } - - builder.setPrecommitTimeoutMs(timeoutMillis); - - final PrecommitTxnRequest precommitTxnRequest = builder.build(); - PrecommitTxnResponse precommitTxnResponse = null; - try { - LOG.info("precommitTxnRequest: {}", precommitTxnRequest); - precommitTxnResponse = MetaServiceProxy - .getInstance().precommitTxn(precommitTxnRequest); - LOG.info("precommitTxnResponse: {}", precommitTxnResponse); - } catch (RpcException e) { - throw new UserException(e.getMessage()); - } - - if (precommitTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { - throw new UserException(precommitTxnResponse.getStatus().getMsg()); - } - } - - @Override - public void commitTransaction(long dbId, List
tableList, - long transactionId, List tabletCommitInfos) - throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); - } - - @Override - public void commitTransaction(long dbId, List
tableList, long transactionId, - List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) - throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); - } - - public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) { - long dbId = commitTxnResponse.getTxnInfo().getDbId(); - long txnId = commitTxnResponse.getTxnInfo().getTxnId(); - // 1. update rowCountfor AnalysisManager - Map updatedRows = new HashMap<>(); - for (TableStatsPB tableStats : commitTxnResponse.getTableStatsList()) { - LOG.info("Update RowCount for AnalysisManager. transactionId:{}, table_id:{}, updated_row_count:{}", - txnId, tableStats.getTableId(), tableStats.getUpdatedRowCount()); - updatedRows.put(tableStats.getTableId(), tableStats.getUpdatedRowCount()); - } - Env env = Env.getCurrentEnv(); - env.getAnalysisManager().updateUpdatedRows(updatedRows); - // 2. notify partition first load - int totalPartitionNum = commitTxnResponse.getPartitionIdsList().size(); - // a map to record - Map> tablePartitionMap = Maps.newHashMap(); - for (int idx = 0; idx < totalPartitionNum; ++idx) { - long version = commitTxnResponse.getVersions(idx); - long tableId = commitTxnResponse.getTableIds(idx); - if (version == 2) { - // inform AnalysisManager first load partitions - tablePartitionMap.computeIfAbsent(tableId, k -> Lists.newArrayList()); - tablePartitionMap.get(tableId).add(commitTxnResponse.getPartitionIds(idx)); - } - // 3. update CloudPartition - OlapTable olapTable = (OlapTable) env.getInternalCatalog().getDb(dbId) - .flatMap(db -> db.getTable(tableId)).filter(t -> t.isManagedTable()) - .orElse(null); - if (olapTable == null) { - continue; - } - CloudPartition partition = (CloudPartition) olapTable.getPartition( - commitTxnResponse.getPartitionIds(idx)); - if (partition == null) { - continue; - } - partition.setCachedVisibleVersion(version); - } - env.getAnalysisManager().setNewPartitionLoaded( - tablePartitionMap.keySet().stream().collect(Collectors.toList())); - // tablePartitionMap to string - StringBuilder sb = new StringBuilder(); - for (Map.Entry> entry : tablePartitionMap.entrySet()) { - sb.append(entry.getKey()).append(":["); - for (Long partitionId : entry.getValue()) { - sb.append(partitionId).append(","); - } - sb.append("];"); - } - if (sb.length() > 0) { - LOG.info("notify partition first load. {}", sb); - } - } - - private Set getBaseTabletsFromTables(List
tableList, List tabletCommitInfos) - throws MetaNotFoundException { - Set baseTabletIds = Sets.newHashSet(); - if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { - return baseTabletIds; - } - for (Table table : tableList) { - OlapTable olapTable = (OlapTable) table; - try { - olapTable.readLock(); - olapTable.getPartitions().stream() - .map(Partition::getBaseIndex) - .map(MaterializedIndex::getTablets) - .flatMap(Collection::stream) - .map(Tablet::getId) - .forEach(baseTabletIds::add); - } finally { - olapTable.readUnlock(); - } - } - Set tabletIds = tabletCommitInfos.stream().map(TabletCommitInfo::getTabletId).collect(Collectors.toSet()); - baseTabletIds.retainAll(tabletIds); - LOG.debug("baseTabletIds: {}", baseTabletIds); - - return baseTabletIds; - } - - private void commitTransaction(long dbId, List
tableList, long transactionId, - List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC) - throws UserException { - - LOG.info("try to commit transaction, transactionId: {}", transactionId); - if (Config.disable_load_job) { - throw new TransactionCommitFailedException( - "disable_load_job is set to true, all load jobs are not allowed"); - } - - List mowTableList = getMowTableList(tableList); - if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty() && !mowTableList.isEmpty()) { - calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos); - } - - CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder(); - builder.setDbId(dbId) - .setTxnId(transactionId) - .setIs2Pc(is2PC) - .setCloudUniqueId(Config.cloud_unique_id) - .addAllBaseTabletIds(getBaseTabletsFromTables(tableList, tabletCommitInfos)); - - // if tablet commit info is empty, no need to pass mowTableList to meta service. - if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty()) { - for (OlapTable olapTable : mowTableList) { - builder.addMowTableIds(olapTable.getId()); - } - } - - if (txnCommitAttachment != null) { - if (txnCommitAttachment instanceof LoadJobFinalOperation) { - LoadJobFinalOperation loadJobFinalOperation = (LoadJobFinalOperation) txnCommitAttachment; - builder.setCommitAttachment(TxnUtil - .loadJobFinalOperationToPb(loadJobFinalOperation)); - } else if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; - builder.setCommitAttachment(TxnUtil - .rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment)); - } else { - throw new UserException("invalid txnCommitAttachment"); - } - } - - final CommitTxnRequest commitTxnRequest = builder.build(); - CommitTxnResponse commitTxnResponse = null; - int retryTime = 0; - - try { - while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, commitTxnRequest:{}", retryTime, commitTxnRequest); - } - commitTxnResponse = MetaServiceProxy.getInstance().commitTxn(commitTxnRequest); - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, commitTxnResponse:{}", retryTime, commitTxnResponse); - } - if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { - break; - } - // sleep random [20, 200] ms, avoid txn conflict - LOG.info("commitTxn KV_TXN_CONFLICT, transactionId:{}, retryTime:{}", transactionId, retryTime); - backoff(); - retryTime++; - } - - Preconditions.checkNotNull(commitTxnResponse); - Preconditions.checkNotNull(commitTxnResponse.getStatus()); - } catch (Exception e) { - LOG.warn("commitTxn failed, transactionId:{}, exception:", transactionId, e); - throw new UserException("commitTxn() failed, errMsg:" + e.getMessage()); - } - - if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK - && commitTxnResponse.getStatus().getCode() != MetaServiceCode.TXN_ALREADY_VISIBLE) { - LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, commitTxnResponse:{}", - transactionId, retryTime, commitTxnResponse); - if (commitTxnResponse.getStatus().getCode() == MetaServiceCode.LOCK_EXPIRED) { - // DELETE_BITMAP_LOCK_ERR will be retried on be - throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, - "delete bitmap update lock expired, transactionId:" + transactionId); - } - StringBuilder internalMsgBuilder = - new StringBuilder("commitTxn failed, transactionId:"); - internalMsgBuilder.append(transactionId); - internalMsgBuilder.append(" code:"); - internalMsgBuilder.append(commitTxnResponse.getStatus().getCode()); - throw new UserException("internal error, " + internalMsgBuilder.toString()); - } - if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) { - throw new UserException(commitTxnResponse.getStatus().getMsg()); - } - - TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb != null) { - LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", - txnState.getTransactionId(), txnState.getCallbackId(), txnState); - cb.afterCommitted(txnState, true); - cb.afterVisible(txnState, true); - } - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); - MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime()); - } - afterCommitTxnResp(commitTxnResponse); - // Here, we only wait for the EventProcessor to finish processing the event, - // but regardless of the success or failure of the result, - // it does not affect the logic of transaction - try { - produceEvent(dbId, tableList); - } catch (Throwable t) { - // According to normal logic, no exceptions will be thrown, - // but in order to avoid bugs affecting the original logic, all exceptions are caught - LOG.warn("produceEvent failed: ", t); - } - } - - private void produceEvent(long dbId, List
tableList) { - for (Table table : tableList) { - Env.getCurrentEnv().getEventProcessor().processEvent( - new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, dbId, table.getId())); - } - } - - private List getMowTableList(List
tableList) { - List mowTableList = new ArrayList<>(); - for (Table table : tableList) { - if ((table instanceof OlapTable)) { - OlapTable olapTable = (OlapTable) table; - if (olapTable.getEnableUniqueKeyMergeOnWrite()) { - mowTableList.add(olapTable); - } - } - } - return mowTableList; - } - - private void calcDeleteBitmapForMow(long dbId, List tableList, long transactionId, - List tabletCommitInfos) - throws UserException { - Map>> backendToPartitionTablets = Maps.newHashMap(); - Map partitions = Maps.newHashMap(); - Map> tableToPartitions = Maps.newHashMap(); - getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets); - if (backendToPartitionTablets.isEmpty()) { - throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId); - } - - getDeleteBitmapUpdateLock(tableToPartitions, transactionId); - Map partitionVersions = getPartitionVersions(partitions); - - Map> backendToPartitionInfos = getCalcDeleteBitmapInfo( - backendToPartitionTablets, partitionVersions); - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); - } - - private void getPartitionInfo(List tableList, - List tabletCommitInfos, - Map> tableToParttions, - Map partitions, - Map>> backendToPartitionTablets) { - Map tableMap = Maps.newHashMap(); - for (OlapTable olapTable : tableList) { - tableMap.put(olapTable.getId(), olapTable); - } - - List tabletIds = tabletCommitInfos.stream() - .map(TabletCommitInfo::getTabletId).collect(Collectors.toList()); - TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex(); - List tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds); - for (int i = 0; i < tabletMetaList.size(); i++) { - TabletMeta tabletMeta = tabletMetaList.get(i); - long tableId = tabletMeta.getTableId(); - if (!tableMap.containsKey(tableId)) { - continue; - } - - long partitionId = tabletMeta.getPartitionId(); - long backendId = tabletCommitInfos.get(i).getBackendId(); - - if (!tableToParttions.containsKey(tableId)) { - tableToParttions.put(tableId, Sets.newHashSet()); - } - tableToParttions.get(tableId).add(partitionId); - - if (!backendToPartitionTablets.containsKey(backendId)) { - backendToPartitionTablets.put(backendId, Maps.newHashMap()); - } - Map> partitionToTablets = backendToPartitionTablets.get(backendId); - if (!partitionToTablets.containsKey(partitionId)) { - partitionToTablets.put(partitionId, Lists.newArrayList()); - } - partitionToTablets.get(partitionId).add(tabletIds.get(i)); - partitions.putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId)); - } - } - - private Map getPartitionVersions(Map partitionMap) { - Map partitionToVersions = Maps.newHashMap(); - partitionMap.forEach((key, value) -> { - long visibleVersion = value.getVisibleVersion(); - long newVersion = visibleVersion <= 0 ? 2 : visibleVersion + 1; - partitionToVersions.put(key, newVersion); - }); - return partitionToVersions; - } - - private Map> getCalcDeleteBitmapInfo( - Map>> backendToPartitionTablets, Map partitionVersions) { - Map> backendToPartitionInfos = Maps.newHashMap(); - for (Map.Entry>> entry : backendToPartitionTablets.entrySet()) { - List partitionInfos = Lists.newArrayList(); - for (Map.Entry> partitionToTables : entry.getValue().entrySet()) { - Long partitionId = partitionToTables.getKey(); - TCalcDeleteBitmapPartitionInfo partitionInfo = new TCalcDeleteBitmapPartitionInfo(partitionId, - partitionVersions.get(partitionId), - partitionToTables.getValue()); - partitionInfos.add(partitionInfo); - } - backendToPartitionInfos.put(entry.getKey(), partitionInfos); - } - return backendToPartitionInfos; - } - - private void getDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId) - throws UserException { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - for (Map.Entry> entry : tableToParttions.entrySet()) { - GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder(); - builder.setTableId(entry.getKey()) - .setLockId(transactionId) - .setInitiator(-1) - .setExpiration(DELETE_BITMAP_LOCK_EXPIRATION_SECONDS); - final GetDeleteBitmapUpdateLockRequest request = builder.build(); - GetDeleteBitmapUpdateLockResponse response = null; - - int retryTime = 0; - while (retryTime++ < Config.meta_service_rpc_retry_times) { - try { - response = MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request); - if (LOG.isDebugEnabled()) { - LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", - transactionId, request, response); - } - if (response.getStatus().getCode() != MetaServiceCode.LOCK_CONFLICT - && response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { - break; - } - } catch (Exception e) { - LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", - transactionId, retryTime, e); - } - // sleep random millis [20, 200] ms, avoid txn conflict - int randomMillis = 20 + (int) (Math.random() * (200 - 20)); - if (LOG.isDebugEnabled()) { - LOG.debug("randomMillis:{}", randomMillis); - } - try { - Thread.sleep(randomMillis); - } catch (InterruptedException e) { - LOG.info("InterruptedException: ", e); - } - } - Preconditions.checkNotNull(response); - Preconditions.checkNotNull(response.getStatus()); - if (response.getStatus().getCode() != MetaServiceCode.OK) { - LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", - transactionId, retryTime, response); - if (response.getStatus().getCode() == MetaServiceCode.LOCK_CONFLICT - || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT) { - // DELETE_BITMAP_LOCK_ERR will be retried on be - throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, - "Failed to get delete bitmap lock due to confilct"); - } - throw new UserException("Failed to get delete bitmap lock, code: " + response.getStatus().getCode()); - } - } - stopWatch.stop(); - LOG.info("get delete bitmap lock successfully. txns: {}. time cost: {} ms.", - transactionId, stopWatch.getTime()); - } - - private void sendCalcDeleteBitmaptask(long dbId, long transactionId, - Map> backendToPartitionInfos) - throws UserException { - if (backendToPartitionInfos.isEmpty()) { - return; - } - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - int totalTaskNum = backendToPartitionInfos.size(); - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch( - totalTaskNum); - AgentBatchTask batchTask = new AgentBatchTask(); - for (Map.Entry> entry : backendToPartitionInfos.entrySet()) { - CalcDeleteBitmapTask task = new CalcDeleteBitmapTask(entry.getKey(), - transactionId, - dbId, - entry.getValue(), - countDownLatch); - countDownLatch.addMark(entry.getKey(), transactionId); - // add to AgentTaskQueue for handling finish report. - // not check return value, because the add will success - AgentTaskQueue.addTask(task); - batchTask.addTask(task); - } - AgentTaskExecutor.submit(batchTask); - - boolean ok; - try { - ok = countDownLatch.await(CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.warn("InterruptedException: ", e); - ok = false; - } - - if (!ok || !countDownLatch.getStatus().ok()) { - String errMsg = "Failed to calculate delete bitmap."; - // clear tasks - AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CALCULATE_DELETE_BITMAP); - - if (!countDownLatch.getStatus().ok()) { - errMsg += countDownLatch.getStatus().getErrorMsg(); - if (countDownLatch.getStatus().getErrorCode() != TStatusCode.DELETE_BITMAP_LOCK_ERROR) { - throw new UserException(errMsg); - } - } else { - errMsg += " Timeout."; - List> unfinishedMarks = countDownLatch.getLeftMarks(); - // only show at most 3 results - List> subList = unfinishedMarks.subList(0, - Math.min(unfinishedMarks.size(), 3)); - if (!subList.isEmpty()) { - errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList); - } - } - LOG.warn(errMsg); - // DELETE_BITMAP_LOCK_ERR will be retried on be - throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, errMsg); - } else { - // Sometimes BE calc delete bitmap succeed, but FE wait timeout for some unknown reasons, - // FE will retry the calculation on BE, this debug point simulates such situation. - debugCalcDeleteBitmapRandomTimeout(); - } - stopWatch.stop(); - LOG.info("calc delete bitmap task successfully. txns: {}. time cost: {} ms.", - transactionId, stopWatch.getTime()); - } - - private void debugCalcDeleteBitmapRandomTimeout() throws UserException { - DebugPoint debugPoint = DebugPointUtil.getDebugPoint( - "CloudGlobalTransactionMgr.calc_delete_bitmap_random_timeout"); - if (debugPoint == null) { - return; - } - - double percent = debugPoint.param("percent", 0.5); - if (new SecureRandom().nextInt() % 100 < 100 * percent) { - throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, - "DebugPoint: Failed to calculate delete bitmap: Timeout."); - } - } - - @Override - public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId, - List tabletCommitInfos, long timeoutMillis) - throws UserException { - return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null); - } - - @Override - public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, - List subTransactionStates, long timeoutMillis) throws UserException { - throw new UnsupportedOperationException("commitAndPublishTransaction is not supported in cloud"); - } - - @Override - public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId, - List tabletCommitInfos, long timeoutMillis, - TxnCommitAttachment txnCommitAttachment) throws UserException { - if (!MetaLockUtils.tryCommitLockTables(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { - // DELETE_BITMAP_LOCK_ERR will be retried on be - throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, - "get table cloud commit lock timeout, tableList=(" - + StringUtils.join(tableList, ",") + ")"); - } - try { - commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); - } finally { - MetaLockUtils.commitUnlockTables(tableList); - } - return true; - } - - @Override - public void commitTransaction2PC(Database db, List
tableList, long transactionId, long timeoutMillis) - throws UserException { - commitTransaction(db.getId(), tableList, transactionId, null, null, true); - } - - @Override - public void abortTransaction(Long dbId, Long transactionId, String reason) throws UserException { - abortTransaction(dbId, transactionId, reason, null, null); - } - - @Override - public void abortTransaction(Long dbId, Long transactionId, String reason, - TxnCommitAttachment txnCommitAttachment, List
tableList) throws UserException { - LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId); - - AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); - builder.setDbId(dbId); - builder.setTxnId(transactionId); - builder.setReason(reason); - builder.setCloudUniqueId(Config.cloud_unique_id); - - final AbortTxnRequest abortTxnRequest = builder.build(); - AbortTxnResponse abortTxnResponse = null; - int retryTime = 0; - try { - while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, abortTxnRequest:{}", retryTime, abortTxnRequest); - } - abortTxnResponse = MetaServiceProxy - .getInstance().abortTxn(abortTxnRequest); - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, abortTxnResponse:{}", retryTime, abortTxnResponse); - } - if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { - break; - } - // sleep random [20, 200] ms, avoid txn conflict - LOG.info("abortTxn KV_TXN_CONFLICT, transactionId:{}, retryTime:{}", transactionId, retryTime); - backoff(); - retryTime++; - } - Preconditions.checkNotNull(abortTxnResponse); - Preconditions.checkNotNull(abortTxnResponse.getStatus()); - } catch (RpcException e) { - LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e); - throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); - } - - TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb != null) { - LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), - txnState.getCallbackId()); - cb.afterAborted(txnState, true, txnState.getReason()); - } - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_FAILED.increase(1L); - } - } - - @Override - public void abortTransaction(Long dbId, String label, String reason) throws UserException { - LOG.info("try to abort transaction, dbId:{}, label:{}", dbId, label); - - AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); - builder.setDbId(dbId); - builder.setLabel(label); - builder.setReason(reason); - builder.setCloudUniqueId(Config.cloud_unique_id); - - final AbortTxnRequest abortTxnRequest = builder.build(); - AbortTxnResponse abortTxnResponse = null; - int retryTime = 0; - - try { - while (retryTime < Config.cloud_meta_service_rpc_failed_retry_times) { - if (LOG.isDebugEnabled()) { - LOG.debug("retyTime:{}, abortTxnRequest:{}", retryTime, abortTxnRequest); - } - abortTxnResponse = MetaServiceProxy - .getInstance().abortTxn(abortTxnRequest); - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, abortTxnResponse:{}", retryTime, abortTxnResponse); - } - if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { - break; - } - - // sleep random [20, 200] ms, avoid txn conflict - LOG.info("abortTxn KV_TXN_CONFLICT, dbId:{}, label:{}, retryTime:{}", dbId, label, retryTime); - backoff(); - retryTime++; - } - Preconditions.checkNotNull(abortTxnResponse); - Preconditions.checkNotNull(abortTxnResponse.getStatus()); - } catch (Exception e) { - LOG.warn("abortTxn failed, label:{}, exception:", label, e); - throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); - } - - TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb == null) { - LOG.info("no callback to run for this txn, txnId:{} callbackId:{}", txnState.getTransactionId(), - txnState.getCallbackId()); - return; - } - - LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), txnState.getCallbackId()); - cb.afterAborted(txnState, true, txnState.getReason()); - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_FAILED.increase(1L); - } - } - - @Override - public void abortTransaction2PC(Long dbId, long transactionId, List
tableList) throws UserException { - LOG.info("try to abortTransaction2PC, dbId:{}, transactionId:{}", dbId, transactionId); - abortTransaction(dbId, transactionId, "User Abort", null, null); - LOG.info("abortTransaction2PC successfully, dbId:{}, transactionId:{}", dbId, transactionId); - } - - @Override - public List getReadyToPublishTransactions() { - //do nothing for CloudGlobalTransactionMgr - return new ArrayList(); - } - - @Override - public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { - //do nothing for CloudGlobalTransactionMgr - return false; - } - - @Override - public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, - Map> backendPartitions) throws UserException { - throw new UserException("Disallow to call finishTransaction()"); - } - - @Override - public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List tableIdList) - throws AnalysisException { - LOG.info("isPreviousTransactionsFinished(), endTransactionId:{}, dbId:{}, tableIdList:{}", - endTransactionId, dbId, tableIdList); - - if (endTransactionId <= 0) { - throw new AnalysisException("Invaid endTransactionId:" + endTransactionId); - } - CheckTxnConflictRequest.Builder builder = CheckTxnConflictRequest.newBuilder(); - builder.setDbId(dbId); - builder.setEndTxnId(endTransactionId); - builder.addAllTableIds(tableIdList); - builder.setCloudUniqueId(Config.cloud_unique_id); - - final CheckTxnConflictRequest checkTxnConflictRequest = builder.build(); - CheckTxnConflictResponse checkTxnConflictResponse = null; - try { - LOG.info("CheckTxnConflictRequest:{}", checkTxnConflictRequest); - checkTxnConflictResponse = MetaServiceProxy - .getInstance().checkTxnConflict(checkTxnConflictRequest); - LOG.info("CheckTxnConflictResponse: {}", checkTxnConflictResponse); - } catch (RpcException e) { - throw new AnalysisException(e.getMessage()); - } - - if (checkTxnConflictResponse.getStatus().getCode() != MetaServiceCode.OK) { - throw new AnalysisException(checkTxnConflictResponse.getStatus().getMsg()); - } - return checkTxnConflictResponse.getFinished(); - } - - public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, long tableId, - long partitionId) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - public boolean isPreviousNonTimeoutTxnFinished(long endTransactionId, long dbId, List tableIdList) - throws AnalysisException { - LOG.info("isPreviousNonTimeoutTxnFinished(), endTransactionId:{}, dbId:{}, tableIdList:{}", - endTransactionId, dbId, tableIdList); - - if (endTransactionId <= 0) { - throw new AnalysisException("Invaid endTransactionId:" + endTransactionId); - } - CheckTxnConflictRequest.Builder builder = CheckTxnConflictRequest.newBuilder(); - builder.setDbId(dbId); - builder.setEndTxnId(endTransactionId); - builder.addAllTableIds(tableIdList); - builder.setCloudUniqueId(Config.cloud_unique_id); - builder.setIgnoreTimeoutTxn(true); - - final CheckTxnConflictRequest checkTxnConflictRequest = builder.build(); - CheckTxnConflictResponse checkTxnConflictResponse = null; - try { - LOG.info("CheckTxnConflictRequest:{}", checkTxnConflictRequest); - checkTxnConflictResponse = MetaServiceProxy - .getInstance().checkTxnConflict(checkTxnConflictRequest); - LOG.info("CheckTxnConflictResponse: {}", checkTxnConflictResponse); - } catch (RpcException e) { - throw new AnalysisException(e.getMessage()); - } - - if (checkTxnConflictResponse.getStatus().getCode() != MetaServiceCode.OK) { - throw new AnalysisException(checkTxnConflictResponse.getStatus().getMsg()); - } - return checkTxnConflictResponse.getFinished(); - } - - @Override - public void removeExpiredAndTimeoutTxns() { - // do nothing in cloud mode - } - - public void cleanLabel(Long dbId, String label, boolean isReplay) throws Exception { - LOG.info("try to cleanLabel dbId: {}, label:{}", dbId, label); - CleanTxnLabelRequest.Builder builder = CleanTxnLabelRequest.newBuilder(); - builder.setDbId(dbId).setCloudUniqueId(Config.cloud_unique_id); - - if (!Strings.isNullOrEmpty(label)) { - builder.addLabels(label); - } - - final CleanTxnLabelRequest cleanTxnLabelRequest = builder.build(); - CleanTxnLabelResponse cleanTxnLabelResponse = null; - int retryTime = 0; - - try { - // 5 times retry is enough for clean label - while (retryTime < 5) { - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, cleanTxnLabel:{}", retryTime, cleanTxnLabelRequest); - } - cleanTxnLabelResponse = MetaServiceProxy.getInstance().cleanTxnLabel(cleanTxnLabelRequest); - if (LOG.isDebugEnabled()) { - LOG.debug("retryTime:{}, cleanTxnLabel:{}", retryTime, cleanTxnLabelResponse); - } - if (cleanTxnLabelResponse.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { - break; - } - // sleep random [20, 200] ms, avoid txn conflict - LOG.info("cleanTxnLabel KV_TXN_CONFLICT, dbId:{}, label:{}, retryTime:{}", dbId, label, retryTime); - backoff(); - retryTime++; - } - - Preconditions.checkNotNull(cleanTxnLabelResponse); - Preconditions.checkNotNull(cleanTxnLabelResponse.getStatus()); - } catch (Exception e) { - LOG.warn("cleanTxnLabel failed, dbId:{}, exception:", dbId, e); - throw new UserException("cleanTxnLabel failed, errMsg:" + e.getMessage()); - } - - if (cleanTxnLabelResponse.getStatus().getCode() != MetaServiceCode.OK) { - LOG.warn("cleanTxnLabel failed, dbId:{} label:{} retryTime:{} cleanTxnLabelResponse:{}", - dbId, label, retryTime, cleanTxnLabelResponse); - throw new UserException("cleanTxnLabel failed, errMsg:" + cleanTxnLabelResponse.getStatus().getMsg()); - } - return; - } - - @Override - public void updateMultiTableRunningTransactionTableIds(Long dbId, Long transactionId, List tableIds) - throws UserException { - //throw new UserException(NOT_SUPPORTED_MSG); - } - - @Override - public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) - throws AnalysisException, TimeoutException { - long dbId = request.getDbId(); - int commitTimeoutSec = Config.commit_timeout_second; - for (int i = 0; i < commitTimeoutSec; ++i) { - Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbId); - TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult(); - statusResult.status = new TStatus(); - TransactionStatus txnStatus = null; - if (request.isSetTxnId()) { - long txnId = request.getTxnId(); - TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(dbId, txnId); - if (txnState == null) { - throw new AnalysisException("txn does not exist: " + txnId); - } - txnStatus = txnState.getTransactionStatus(); - if (!txnState.getReason().trim().isEmpty()) { - statusResult.status.setErrorMsgsIsSet(true); - statusResult.status.addToErrorMsgs(txnState.getReason()); - } - } else { - txnStatus = getLabelState(dbId, request.getLabel()); - } - if (txnStatus == TransactionStatus.UNKNOWN || txnStatus.isFinalStatus()) { - statusResult.setTxnStatusId(txnStatus.value()); - return statusResult; - } - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - LOG.info("commit sleep exception.", e); - } - } - throw new TimeoutException("Operation is timeout"); - } - - @Override - public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) throws AnalysisException { - // do nothing in cloud mode - } - - @Override - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { - // do nothing in cloud mode - } - - @Override - public TransactionStatus getLabelState(long dbId, String label) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public Long getTransactionId(Long dbId, String label) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public TransactionState getTransactionState(long dbId, long transactionId) { - LOG.info("try to get transaction state, dbId:{}, transactionId:{}", dbId, transactionId); - GetTxnRequest.Builder builder = GetTxnRequest.newBuilder(); - builder.setDbId(dbId); - builder.setTxnId(transactionId); - builder.setCloudUniqueId(Config.cloud_unique_id); - - final GetTxnRequest getTxnRequest = builder.build(); - GetTxnResponse getTxnResponse = null; - try { - LOG.info("getTxnRequest:{}", getTxnRequest); - getTxnResponse = MetaServiceProxy - .getInstance().getTxn(getTxnRequest); - LOG.info("getTxnRequest: {}", getTxnResponse); - } catch (RpcException e) { - LOG.info("getTransactionState exception: {}", e.getMessage()); - return null; - } - - if (getTxnResponse.getStatus().getCode() != MetaServiceCode.OK || !getTxnResponse.hasTxnInfo()) { - LOG.info("getTransactionState exception: {}, {}", getTxnResponse.getStatus().getCode(), - getTxnResponse.getStatus().getMsg()); - return null; - } - return TxnUtil.transactionStateFromPb(getTxnResponse.getTxnInfo()); - } - - @Override - public Long getTransactionIdByLabel(Long dbId, String label, List statusList) - throws UserException { - LOG.info("try to get transaction id by label, dbId:{}, label:{}", dbId, label); - GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder(); - builder.setDbId(dbId); - builder.setLabel(label); - builder.setCloudUniqueId(Config.cloud_unique_id); - for (TransactionStatus status : statusList) { - if (status == TransactionStatus.PREPARE) { - builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED); - } else if (status == TransactionStatus.PRECOMMITTED) { - builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED); - } else if (status == TransactionStatus.COMMITTED) { - builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED); - } - } - - final GetTxnIdRequest getTxnIdRequest = builder.build(); - GetTxnIdResponse getTxnIdResponse = null; - try { - LOG.info("getTxnRequest:{}", getTxnIdRequest); - getTxnIdResponse = MetaServiceProxy - .getInstance().getTxnId(getTxnIdRequest); - LOG.info("getTxnIdReponse: {}", getTxnIdResponse); - } catch (RpcException e) { - LOG.info("getTransactionId exception: {}", e.getMessage()); - throw new TransactionNotFoundException("transaction not found, label=" + label); - } - - if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) { - LOG.info("getTransactionState exception: {}, {}", getTxnIdResponse.getStatus().getCode(), - getTxnIdResponse.getStatus().getMsg()); - throw new TransactionNotFoundException("transaction not found, label=" + label); - } - return getTxnIdResponse.getTxnId(); - } - - @Override - public List getPreCommittedTxnList(Long dbId) throws AnalysisException { - // todo - return new ArrayList(); - } - - @Override - public int getTransactionNum() { - return 0; - } - - @Override - public Long getNextTransactionId() throws UserException { - GetCurrentMaxTxnRequest.Builder builder = GetCurrentMaxTxnRequest.newBuilder(); - builder.setCloudUniqueId(Config.cloud_unique_id); - - final GetCurrentMaxTxnRequest getCurrentMaxTxnRequest = builder.build(); - GetCurrentMaxTxnResponse getCurrentMaxTxnResponse = null; - try { - LOG.info("GetCurrentMaxTxnRequest:{}", getCurrentMaxTxnRequest); - getCurrentMaxTxnResponse = MetaServiceProxy - .getInstance().getCurrentMaxTxnId(getCurrentMaxTxnRequest); - LOG.info("GetCurrentMaxTxnResponse: {}", getCurrentMaxTxnResponse); - } catch (RpcException e) { - LOG.warn("getNextTransactionId() RpcException: {}", e.getMessage()); - throw new UserException("getNextTransactionId() RpcException: " + e.getMessage()); - } - - if (getCurrentMaxTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { - LOG.info("getNextTransactionId() failed, code: {}, msg: {}", - getCurrentMaxTxnResponse.getStatus().getCode(), getCurrentMaxTxnResponse.getStatus().getMsg()); - throw new UserException("getNextTransactionId() failed, msg:" - + getCurrentMaxTxnResponse.getStatus().getMsg()); - } - return getCurrentMaxTxnResponse.getCurrentMaxTxnId(); - } - - @Override - public int getRunningTxnNums(Long dbId) throws AnalysisException { - return 0; - } - - @Override - public long getTxnNumByStatus(TransactionStatus status) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public long getAllRunningTxnNum() { - return 0; - } - - @Override - public long getAllPublishTxnNum() { - return 0; - } - - /** - * backoff policy implement by sleep random ms in [20ms, 200ms] - */ - private void backoff() { - int randomMillis = 20 + (int) (Math.random() * (200 - 20)); - try { - Thread.sleep(randomMillis); - } catch (InterruptedException e) { - LOG.info("InterruptedException: ", e); - } - } - - @Override - public TransactionIdGenerator getTransactionIDGenerator() throws Exception { - throw new Exception(NOT_SUPPORTED_MSG); - } - - @Override - public List> getDbInfo() throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public List> getDbTransStateInfo(Long dbId) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public List> getDbTransInfo(Long dbId, boolean running, int limit) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public Map> getDbRunningTransInfo(long dbId) throws AnalysisException { - return Maps.newHashMap(); - } - - @Override - public List> getDbTransInfoByStatus(Long dbId, TransactionStatus status) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public List> getDbTransInfoByLabelMatch(long dbId, String label) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public List> getSingleTranInfo(long dbId, long txnId) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public List> getTableTransInfo(long dbId, long txnId) throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public List> getPartitionTransInfo(long dbId, long tid, long tableId) - throws AnalysisException { - throw new AnalysisException(NOT_SUPPORTED_MSG); - } - - @Override - public void write(DataOutput out) throws IOException { - throw new IOException(NOT_SUPPORTED_MSG); - } - - @Override - public void readFields(DataInput in) throws IOException { - throw new IOException(NOT_SUPPORTED_MSG); - } - - @Override - public void replayUpsertTransactionState(TransactionState transactionState) throws Exception { - throw new Exception(NOT_SUPPORTED_MSG); - } - - @Deprecated - public void replayDeleteTransactionState(TransactionState transactionState) throws Exception { - throw new Exception(NOT_SUPPORTED_MSG); - } - - @Override - public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation operation) throws Exception { - throw new Exception(NOT_SUPPORTED_MSG); - } - - @Override - public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) - throws Exception { - throw new Exception(NOT_SUPPORTED_MSG); - } - - @Override - public void addSubTransaction(long dbId, long transactionId, long subTransactionId) { - throw new UnsupportedOperationException("addSubTransaction is not supported in cloud"); - } - - @Override - public void removeSubTransaction(long dbId, long subTransactionId) { - throw new UnsupportedOperationException("removeSubTransaction is not supported in cloud"); - } -}