From 95f7b14d69126eb08a306aefb777855017a2d4cf Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 8 Aug 2024 15:00:29 +0800 Subject: [PATCH 1/3] [feat](lock)add deadlock detection tool and monitored lock implementations #39015 (#22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Proposed changes ### Description: This issue proposes the addition of new features to the project, including a deadlock detection tool and monitored lock implementations. These features will help in identifying and debugging potential deadlocks and monitoring lock usage. Features: #### AbstractMonitoredLock: A monitored version of Lock that tracks and logs lock acquisition and release times. Functionality: Overrides lock(), unlock(), tryLock(), and tryLock(long timeout, TimeUnit unit) methods. Logs information about lock acquisition time, release time, and any failure to acquire the lock within the specified timeout. ##### eg ```log 2024-08-07 12:02:59 [ Thread-2:2006 ] - [ WARN ] Thread ID: 12, Thread Name: Thread-2 - Lock held for 1912 ms, exceeding hold timeout of 1000 ms Thread stack trace: at java.lang.Thread.getStackTrace(Thread.java:1564) at org.example.lock.AbstractMonitoredLock.afterUnlock(AbstractMonitoredLock.java:49) at org.example.lock.MonitoredReentrantLock.unlock(MonitoredReentrantLock.java:32) at org.example.ExampleService.timeout(ExampleService.java:17) at org.example.Main.lambda$test2$1(Main.java:39) at java.lang.Thread.run(Thread.java:750) ``` #### DeadlockCheckerTool: Uses ScheduledExecutorService for periodic deadlock checks. Logs deadlock information including thread names, states, lock info, and stack traces. **ThreadMXBean accesses thread information in the local JVM, which is already in memory, so accessing it is less expensive than fetching data from external resources such as disk or network. Thread state cache: The JVM typically maintains a cache of thread states, reducing the need for real-time calculations or additional data processing.** ##### eg ```log Thread Name: Thread-0 Thread State: WAITING Lock Name: java.util.concurrent.locks.ReentrantLock$NonfairSync@1d653213 Lock Owner Name: Thread-1 Lock Owner Id: 12 Waited Time: -1 Blocked Time: -1 Lock Info: java.util.concurrent.locks.ReentrantLock$NonfairSync@1d653213 Blocked by: java.util.concurrent.locks.ReentrantLock$NonfairSync@1d653213 Stack Trace: at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at org.example.lock.MonitoredReentrantLock.lock(MonitoredReentrantLock.java:22) at org.example.Main.lambda$testDeadLock$3(Main.java:79) at org.example.Main$$Lambda$1/1221555852.run(Unknown Source) at java.lang.Thread.run(Thread.java:750) 2024-08-07 14:11:28 [ pool-1-thread-1:2001 ] - [ WARN ] Deadlocks detected: Thread Name: Thread-1 Thread State: WAITING Lock Name: java.util.concurrent.locks.ReentrantLock$NonfairSync@13a2dfcf Lock Owner Name: Thread-0 Lock Owner Id: 11 Waited Time: -1 Blocked Time: -1 Lock Info: java.util.concurrent.locks.ReentrantLock$NonfairSync@13a2dfcf Blocked by: java.util.concurrent.locks.ReentrantLock$NonfairSync@13a2dfcf Stack Trace: at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at org.example.lock.MonitoredReentrantLock.lock(MonitoredReentrantLock.java:22) at org.example.Main.lambda$testDeadLock$4(Main.java:93) at org.example.Main$$Lambda$2/1556956098.run(Unknown Source) at java.lang.Thread.run(Thread.java:750) ``` ##### benchmark ``` @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS) @Threads(1) Benchmark Mode Cnt Score Error Units LockBenchmark.testMonitoredLock thrpt 2 15889.407 ops/ms LockBenchmark.testMonitoredLock:·gc.alloc.rate thrpt 2 678.061 MB/sec LockBenchmark.testMonitoredLock:·gc.alloc.rate.norm thrpt 2 56.000 B/op LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space thrpt 2 668.249 MB/sec LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space.norm thrpt 2 55.080 B/op LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space thrpt 2 0.075 MB/sec LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space.norm thrpt 2 0.006 B/op LockBenchmark.testMonitoredLock:·gc.count thrpt 2 20.000 counts LockBenchmark.testMonitoredLock:·gc.time thrpt 2 6.000 ms LockBenchmark.testNativeLock thrpt 2 103130.635 ops/ms LockBenchmark.testNativeLock:·gc.alloc.rate thrpt 2 ≈ 10⁻⁴ MB/sec LockBenchmark.testNativeLock:·gc.alloc.rate.norm thrpt 2 ≈ 10⁻⁶ B/op LockBenchmark.testNativeLock:·gc.count thrpt 2 ≈ 0 counts @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS) @Threads(100) Benchmark Mode Cnt Score Error Units LockBenchmark.testMonitoredLock thrpt 2 10994.606 ops/ms LockBenchmark.testMonitoredLock:·gc.alloc.rate thrpt 2 488.508 MB/sec LockBenchmark.testMonitoredLock:·gc.alloc.rate.norm thrpt 2 56.002 B/op LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space thrpt 2 481.390 MB/sec LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space.norm thrpt 2 55.163 B/op LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space thrpt 2 0.020 MB/sec LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space.norm thrpt 2 0.002 B/op LockBenchmark.testMonitoredLock:·gc.count thrpt 2 18.000 counts LockBenchmark.testMonitoredLock:·gc.time thrpt 2 9.000 ms LockBenchmark.testNativeLock thrpt 2 558652.036 ops/ms LockBenchmark.testNativeLock:·gc.alloc.rate thrpt 2 0.016 MB/sec LockBenchmark.testNativeLock:·gc.alloc.rate.norm thrpt 2 ≈ 10⁻⁴ B/op LockBenchmark.testNativeLock:·gc.count thrpt 2 ≈ 0 counts ``` --- .../java/org/apache/doris/common/Config.java | 13 + .../main/java/org/apache/doris/DorisFE.java | 11 +- .../doris/catalog/ColocateTableIndex.java | 4 +- .../org/apache/doris/catalog/Database.java | 8 +- .../java/org/apache/doris/catalog/Env.java | 6 +- .../java/org/apache/doris/catalog/Table.java | 9 +- .../java/org/apache/doris/catalog/Tablet.java | 4 +- .../common/lock/AbstractMonitoredLock.java | 105 +++++ .../doris/common/lock/DeadlockMonitor.java | 81 ++++ .../common/lock/MonitoredReentrantLock.java | 98 +++++ .../lock/MonitoredReentrantReadWriteLock.java | 137 +++++++ .../common/util/QueryableReentrantLock.java | 41 -- .../util/QueryableReentrantReadWriteLock.java | 41 -- .../apache/doris/datasource/CatalogMgr.java | 4 +- .../doris/datasource/ExternalDatabase.java | 6 +- .../doris/datasource/InternalCatalog.java | 378 ++++++++++++++---- .../datasource/TablePartitionValues.java | 7 +- .../doris/mysql/privilege/UserManager.java | 10 +- .../nereids/jobs/load/LabelProcessor.java | 4 +- .../doris/qe/cache/CacheCoordinator.java | 5 +- .../transaction/DatabaseTransactionMgr.java | 4 +- .../util/QueryableReentrantLockTest.java | 4 +- regression-test/pipeline/p0/conf/fe.conf | 4 + 23 files changed, 786 insertions(+), 198 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index aef49960bf2cc2..406395efa38265 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2765,4 +2765,17 @@ public static boolean isNotCloudMode() { //========================================================================== // end of cloud config //========================================================================== + //========================================================================== + // start of lock config + @ConfField(description = {"是否开启死锁检测", + "Whether to enable deadlock detection"}) + public static boolean enable_deadlock_detection = false; + + @ConfField(description = {"死锁检测间隔时间,单位分钟", + "Deadlock detection interval time, unit minute"}) + public static long deadlock_detection_interval_minute = 5; + + @ConfField(mutable = true, description = {"表示最大锁持有时间,超过该时间会打印告警日志,单位秒", + "Maximum lock hold time; logs a warning if exceeded"}) + public static long max_lock_hold_threshold_seconds = 10; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index d1d7ab1f213574..cbfd84b498d98d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -27,6 +27,7 @@ import org.apache.doris.common.LogUtils; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.Version; +import org.apache.doris.common.lock.DeadlockMonitor; import org.apache.doris.common.util.JdkUtils; import org.apache.doris.common.util.NetUtils; import org.apache.doris.httpv2.HttpServer; @@ -60,6 +61,7 @@ import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; +import java.util.concurrent.TimeUnit; public class DorisFE { private static final Logger LOG = LogManager.getLogger(DorisFE.class); @@ -95,6 +97,13 @@ public static void main(String[] args) { start(DORIS_HOME_DIR, PID_DIR, args, options); } + private static void startMonitor() { + if (Config.enable_deadlock_detection) { + DeadlockMonitor deadlockMonitor = new DeadlockMonitor(); + deadlockMonitor.startMonitoring(Config.deadlock_detection_interval_minute, TimeUnit.MINUTES); + } + } + // entrance for doris frontend public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) { if (System.getenv("DORIS_LOG_TO_STDERR") != null) { @@ -216,7 +225,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star } ThreadPoolManager.registerAllThreadPoolMetric(); - + startMonitor(); while (true) { Thread.sleep(2000); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 470464407d03ad..46a96bd39ea385 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -25,6 +25,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.ColocatePersistInfo; @@ -56,7 +57,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -182,7 +182,7 @@ public static boolean isGlobalGroupName(String groupName) { // save some error msg of the group for show. no need to persist private Map group2ErrMsgs = Maps.newHashMap(); - private transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(); public ColocateTableIndex() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 9cdab71fc06090..a1644e6e8c6ce5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -28,8 +28,10 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.persist.CreateTableInfo; import org.apache.doris.persist.gson.GsonUtils; @@ -58,7 +60,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -83,7 +84,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf private long id; @SerializedName(value = "fullQualifiedName") private volatile String fullQualifiedName; - private final ReentrantReadWriteLock rwLock; + + private MonitoredReentrantReadWriteLock rwLock; // table family group map private final Map idToTable; @@ -133,7 +135,7 @@ public Database(long id, String name) { if (this.fullQualifiedName == null) { this.fullQualifiedName = ""; } - this.rwLock = new ReentrantReadWriteLock(true); + this.rwLock = new MonitoredReentrantReadWriteLock(true); this.idToTable = Maps.newConcurrentMap(); this.nameToTable = Maps.newConcurrentMap(); this.lowerCaseToTableName = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f828b6ae576243..1d33cbbe42a5a1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -111,6 +111,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; +import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.common.publish.TopicPublisher; import org.apache.doris.common.publish.TopicPublisherThread; import org.apache.doris.common.publish.WorkloadGroupPublisher; @@ -122,7 +123,6 @@ import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.QueryableReentrantLock; import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -355,7 +355,7 @@ public class Env { // We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass // because fair lock has poor performance. // Using QueryableReentrantLock to print owner thread in debug mode. - private QueryableReentrantLock lock; + private MonitoredReentrantLock lock; private CatalogMgr catalogMgr; private GlobalFunctionMgr globalFunctionMgr; @@ -664,7 +664,7 @@ public Env(boolean isCheckpointCatalog) { this.syncJobManager = new SyncJobManager(); this.alter = new Alter(); this.consistencyChecker = new ConsistencyChecker(); - this.lock = new QueryableReentrantLock(true); + this.lock = new MonitoredReentrantLock(true); this.backupHandler = new BackupHandler(this); this.metaDir = Config.meta_dir; this.publishVersionDaemon = new PublishVersionDaemon(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index c9ce42dd5ecf9b..b669b30cb3932e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -27,7 +27,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.QueryableReentrantReadWriteLock; +import org.apache.doris.common.lock.MonitoredReentrantLock; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.persist.gson.GsonUtils; @@ -82,7 +83,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { protected TableType type; @SerializedName(value = "createTime") protected long createTime; - protected QueryableReentrantReadWriteLock rwLock; + protected MonitoredReentrantReadWriteLock rwLock; /* * fullSchema and nameToColumn should contains all columns, both visible and shadow. @@ -128,7 +129,7 @@ public Table(TableType type) { this.type = type; this.fullSchema = Lists.newArrayList(); this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - this.rwLock = new QueryableReentrantReadWriteLock(true); + this.rwLock = new MonitoredReentrantReadWriteLock(true); if (Config.check_table_lock_leaky) { this.readLockThreads = Maps.newConcurrentMap(); } @@ -151,7 +152,7 @@ public Table(long id, String tableName, TableType type, List fullSchema) // Only view in with-clause have null base Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns"); } - this.rwLock = new QueryableReentrantReadWriteLock(true); + this.rwLock = new MonitoredReentrantReadWriteLock(true); this.createTime = Instant.now().getEpochSecond(); if (Config.check_table_lock_leaky) { this.readLockThreads = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 9714ef15719d2c..61251084afacd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -26,6 +26,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -51,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -128,7 +128,7 @@ public TabletHealth() { private long cooldownReplicaId = -1; @SerializedName(value = "cooldownTerm") private long cooldownTerm = -1; - private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock(); + private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock(); // last time that the tablet checker checks this tablet. // no need to persist diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java new file mode 100644 index 00000000000000..7389ed0d61b6b2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import org.apache.doris.common.Config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for a monitored lock that tracks lock acquisition, + * release, and attempt times. It provides mechanisms for monitoring the + * duration for which a lock is held and logging any instances where locks + * are held longer than a specified timeout or fail to be acquired within + * a specified timeout. + */ +public abstract class AbstractMonitoredLock { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMonitoredLock.class); + + // Thread-local variable to store the lock start time + private final ThreadLocal lockStartTime = new ThreadLocal<>(); + + + /** + * Method to be called after successfully acquiring the lock. + * Sets the start time for the lock. + */ + protected void afterLock() { + lockStartTime.set(System.nanoTime()); + } + + /** + * Method to be called after releasing the lock. + * Calculates the lock hold time and logs a warning if it exceeds the hold timeout. + */ + protected void afterUnlock() { + Long startTime = lockStartTime.get(); + if (startTime != null) { + long lockHoldTimeNanos = System.nanoTime() - startTime; + long lockHoldTimeMs = lockHoldTimeNanos >> 20; + if (lockHoldTimeMs > Config.max_lock_hold_threshold_seconds * 1000) { + Thread currentThread = Thread.currentThread(); + String stackTrace = getThreadStackTrace(currentThread.getStackTrace()); + LOG.warn("Thread ID: {}, Thread Name: {} - Lock held for {} ms, exceeding hold timeout of {} ms " + + "Thread stack trace:{}", + currentThread.getId(), currentThread.getName(), lockHoldTimeMs, lockHoldTimeMs, stackTrace); + } + lockStartTime.remove(); + } + } + + /** + * Method to be called after attempting to acquire the lock using tryLock. + * Logs a warning if the lock was not acquired within a reasonable time. + * + * @param acquired Whether the lock was successfully acquired + * @param startTime The start time of the lock attempt + */ + protected void afterTryLock(boolean acquired, long startTime) { + if (acquired) { + afterLock(); + return; + } + if (LOG.isDebugEnabled()) { + long elapsedTime = (System.nanoTime() - startTime) >> 20; + Thread currentThread = Thread.currentThread(); + String stackTrace = getThreadStackTrace(currentThread.getStackTrace()); + LOG.debug("Thread ID: {}, Thread Name: {} - Failed to acquire the lock within {} ms" + + "\nThread blocking info:\n{}", + currentThread.getId(), currentThread.getName(), elapsedTime, stackTrace); + } + } + + /** + * Utility method to format the stack trace of a thread. + * + * @param stackTrace The stack trace elements of the thread + * @return A formatted string of the stack trace + */ + private String getThreadStackTrace(StackTraceElement[] stackTrace) { + StringBuilder sb = new StringBuilder(); + for (StackTraceElement element : stackTrace) { + sb.append("\tat ").append(element).append("\n"); + } + return sb.toString().replace("\n", "\\n"); + } +} + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java new file mode 100644 index 00000000000000..4fcda97dbd1ad5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A utility class for monitoring and reporting deadlocks in a Java application. + *

+ * This class uses the Java Management API to periodically check for deadlocked threads + * and logs detailed information about any detected deadlocks. It can be configured to + * run at a fixed interval. + *

+ */ +public class DeadlockMonitor { + private static final Logger LOG = LoggerFactory.getLogger(DeadlockMonitor.class); + private final ThreadMXBean threadMXBean; + private final ScheduledExecutorService scheduler; + + public DeadlockMonitor() { + this.threadMXBean = ManagementFactory.getThreadMXBean(); + this.scheduler = Executors.newScheduledThreadPool(1); + } + + /** + * Starts monitoring for deadlocks at a fixed rate. + * + * @param period the period between successive executions + * @param unit the time unit of the period parameter + */ + public void startMonitoring(long period, TimeUnit unit) { + scheduler.scheduleAtFixedRate(this::detectAndReportDeadlocks, 5, period, unit); + } + + /** + * Detects and reports deadlocks if any are found. + */ + public void detectAndReportDeadlocks() { + // Get IDs of threads that are deadlocked + long[] deadlockedThreadIds = threadMXBean.findDeadlockedThreads(); + + // Check if there are no deadlocked threads + if (deadlockedThreadIds == null || deadlockedThreadIds.length == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("No deadlocks detected."); + } + return; + } + + // Get information about deadlocked threads + ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreadIds, true, true); + String deadlockReportString = Arrays.toString(threadInfos).replace("\n", "\\n"); + // Log the deadlock report + LOG.warn("Deadlocks detected {}", deadlockReportString); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java new file mode 100644 index 00000000000000..60211a6a8a8c9c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A monitored version of ReentrantLock that provides additional monitoring capabilities + * for lock acquisition and release. + */ +public class MonitoredReentrantLock extends ReentrantLock { + private static final long serialVersionUID = 1L; + + // Monitor for tracking lock acquisition and release + private final AbstractMonitoredLock lockMonitor = new AbstractMonitoredLock() { + }; + + // Constructor for creating a monitored lock with fairness option + public MonitoredReentrantLock(boolean fair) { + super(fair); + } + + // Constructor for creating a monitored lock with fairness option + public MonitoredReentrantLock() { + } + + /** + * Acquires the lock. + * Records the time when the lock is acquired. + */ + @Override + public void lock() { + super.lock(); + lockMonitor.afterLock(); + } + + /** + * Releases the lock. + * Records the time when the lock is released and logs the duration. + */ + @Override + public void unlock() { + lockMonitor.afterUnlock(); + super.unlock(); + } + + /** + * Tries to acquire the lock. + * Records the time when the lock attempt started and logs the result. + * + * @return true if the lock was acquired, false otherwise + */ + @Override + public boolean tryLock() { + long start = System.nanoTime(); // Record start time + boolean acquired = super.tryLock(); // Attempt to acquire the lock + lockMonitor.afterTryLock(acquired, start); // Log result and elapsed time + return acquired; + } + + /** + * Tries to acquire the lock within the specified time limit. + * Records the time when the lock attempt started and logs the result. + * + * @param timeout the time to wait for the lock + * @param unit the time unit of the timeout argument + * @return true if the lock was acquired, false if the waiting time elapsed before the lock was acquired + * @throws InterruptedException if the current thread is interrupted while waiting + */ + @Override + public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { + long start = System.nanoTime(); // Record start time + boolean acquired = super.tryLock(timeout, unit); // Attempt to acquire the lock + lockMonitor.afterTryLock(acquired, start); // Log result and elapsed time + return acquired; + } + + @Override + public Thread getOwner() { + return super.getOwner(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java new file mode 100644 index 00000000000000..7a6f0db5938b23 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A monitored version of ReentrantReadWriteLock that provides additional + * monitoring capabilities for read and write locks. + */ +public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock { + // Monitored read and write lock instances + private final ReadLock readLock = new ReadLock(this); + private final WriteLock writeLock = new WriteLock(this); + + // Constructor for creating a monitored lock with fairness option + public MonitoredReentrantReadWriteLock(boolean fair) { + super(fair); + } + + public MonitoredReentrantReadWriteLock() { + } + + /** + * Monitored read lock class that extends ReentrantReadWriteLock.ReadLock. + */ + public class ReadLock extends ReentrantReadWriteLock.ReadLock { + private static final long serialVersionUID = 1L; + private final AbstractMonitoredLock monitor = new AbstractMonitoredLock() {}; + + /** + * Constructs a new ReadLock instance. + * + * @param lock The ReentrantReadWriteLock this lock is associated with + */ + protected ReadLock(ReentrantReadWriteLock lock) { + super(lock); + } + + /** + * Acquires the read lock. + * Records the time when the lock is acquired. + */ + @Override + public void lock() { + super.lock(); + monitor.afterLock(); + } + + /** + * Releases the read lock. + * Records the time when the lock is released and logs the duration. + */ + @Override + public void unlock() { + monitor.afterUnlock(); + super.unlock(); + } + } + + /** + * Monitored write lock class that extends ReentrantReadWriteLock.WriteLock. + */ + public class WriteLock extends ReentrantReadWriteLock.WriteLock { + private static final long serialVersionUID = 1L; + private final AbstractMonitoredLock monitor = new AbstractMonitoredLock() {}; + + /** + * Constructs a new WriteLock instance. + * + * @param lock The ReentrantReadWriteLock this lock is associated with + */ + protected WriteLock(ReentrantReadWriteLock lock) { + super(lock); + } + + /** + * Acquires the write lock. + * Records the time when the lock is acquired. + */ + @Override + public void lock() { + super.lock(); + monitor.afterLock(); + } + + /** + * Releases the write lock. + * Records the time when the lock is released and logs the duration. + */ + @Override + public void unlock() { + monitor.afterUnlock(); + super.unlock(); + } + } + + /** + * Returns the read lock associated with this lock. + * + * @return The monitored read lock + */ + @Override + public ReadLock readLock() { + return readLock; + } + + /** + * Returns the write lock associated with this lock. + * + * @return The monitored write lock + */ + @Override + public WriteLock writeLock() { + return writeLock; + } + + @Override + public Thread getOwner() { + return super.getOwner(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java deleted file mode 100644 index 1f0283434f99a0..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import java.util.concurrent.locks.ReentrantLock; - -/* - * This Lock is for exposing the getOwner() method, - * which is a protected method of ReentrantLock - */ -public class QueryableReentrantLock extends ReentrantLock { - private static final long serialVersionUID = 1L; - - public QueryableReentrantLock() { - super(); - } - - public QueryableReentrantLock(boolean fair) { - super(fair); - } - - @Override - public Thread getOwner() { - return super.getOwner(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java deleted file mode 100644 index 3f55b54229710f..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/* - * This Lock is for exposing the getOwner() method, - * which is a protected method of ReentrantLock - */ -public class QueryableReentrantReadWriteLock extends ReentrantReadWriteLock { - private static final long serialVersionUID = 1L; - - public QueryableReentrantReadWriteLock() { - super(); - } - - public QueryableReentrantReadWriteLock(boolean fair) { - super(fair); - } - - @Override - public Thread getOwner() { - return super.getOwner(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index a8aa34cf6c8e1a..887b62151ceda5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -41,6 +41,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -70,7 +71,6 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -90,7 +90,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { private static final String YES = "yes"; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true); @SerializedName(value = "idToCatalog") private final Map>> idToCatalog = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index 2919633858f90c..b564a17ce8687b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -28,6 +28,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaTable; @@ -58,7 +59,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Base class of external database. @@ -69,7 +69,7 @@ public abstract class ExternalDatabase implements DatabaseIf, Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class); - protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); + protected MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock(true); @SerializedName(value = "id") protected long id; @@ -446,7 +446,7 @@ public void gsonPostProcess() throws IOException { } } idToTbl = tmpIdToTbl; - rwLock = new ReentrantReadWriteLock(true); + rwLock = new MonitoredReentrantReadWriteLock(true); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 5a5ae14ba5f420..9df82874afda0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -124,13 +124,13 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.IdGeneratorUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.QueryableReentrantLock; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -209,9 +209,9 @@ public class InternalCatalog implements CatalogIf { private static final Logger LOG = LogManager.getLogger(InternalCatalog.class); - private QueryableReentrantLock lock = new QueryableReentrantLock(true); - private ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); - private ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); + private MonitoredReentrantLock lock = new MonitoredReentrantLock(true); + private transient ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); + private transient ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); // Add transient to fix gson issue. @Getter @@ -702,6 +702,26 @@ public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlExcepti } } + public void dropCatalogRecycleBin(IdType idType, long id) throws DdlException { + switch (idType) { + case DATABASE_ID: + Env.getCurrentRecycleBin().eraseDatabaseInstantly(id); + LOG.info("drop database[{}] in catalog recycle bin", id); + break; + case TABLE_ID: + Env.getCurrentRecycleBin().eraseTableInstantly(id); + LOG.info("drop table[{}] in catalog recycle bin", id); + break; + case PARTITION_ID: + Env.getCurrentRecycleBin().erasePartitionInstantly(id); + LOG.info("drop partition[{}] in catalog recycle bin", id); + break; + default: + String message = "DROP CATALOG RECYCLE BIN: idType should be 'DbId', 'TableId' or 'PartitionId'."; + throw new DdlException(message); + } + } + public void replayRecoverDatabase(RecoverInfo info) { long dbId = info.getDbId(); String newDbName = info.getNewDbName(); @@ -1009,6 +1029,42 @@ public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException, D } } + public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) { + if (isReplay || Env.isCheckpointThread()) { + return; + } + + // drop all replicas + AgentBatchTask batchTask = new AgentBatchTask(); + for (Partition partition : olapTable.getAllPartitions()) { + List allIndices = partition.getMaterializedIndices(IndexExtState.ALL); + for (MaterializedIndex materializedIndex : allIndices) { + long indexId = materializedIndex.getId(); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); + for (Tablet tablet : materializedIndex.getTablets()) { + long tabletId = tablet.getId(); + List replicas = tablet.getReplicas(); + for (Replica replica : replicas) { + long backendId = replica.getBackendId(); + long replicaId = replica.getId(); + DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, + replicaId, schemaHash, true); + batchTask.addTask(dropTask); + } // end for replicas + } // end for tablets + } // end for indices + } // end for partitions + AgentTaskExecutor.submit(batchTask); + } + + public void erasePartitionDropBackendReplicas(List partitions) { + // no need send be delete task, when be report its tablets, fe will send delete task then. + } + + public void eraseDroppedIndex(long tableId, List indexIdList) { + // nothing to do in non cloud mode + } + private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) { if (LOG.isDebugEnabled()) { LOG.debug("replay add a replica {}", info); @@ -1120,10 +1176,7 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { // only internal table should check quota and cluster capacity if (!stmt.isExternal()) { - // check cluster capacity - Env.getCurrentSystemInfo().checkAvailableCapacity(); - // check db quota - db.checkQuota(); + checkAvailableCapacity(db); } // check if table exists in db @@ -1315,8 +1368,8 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio } ColumnDef columnDef; if (resultExpr.getSrcSlotRef() == null) { - columnDef = new ColumnDef(name, typeDef, false, null, - true, -1, new DefaultValue(false, null), ""); + columnDef = new ColumnDef(name, typeDef, false, null, true, -1, new DefaultValue(false, null), "", + true); } else { Column column = resultExpr.getSrcSlotRef().getDesc().getColumn(); boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue()); @@ -1333,8 +1386,8 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio } else { defaultValue = new DefaultValue(setDefault, column.getDefaultValue()); } - columnDef = new ColumnDef(name, typeDef, false, null, - column.isAllowNull(), -1, defaultValue, column.getComment()); + columnDef = new ColumnDef(name, typeDef, false, null, column.isAllowNull(), -1, defaultValue, + column.getComment(), true); } createTableStmt.addColumnDef(columnDef); // set first column as default distribution @@ -1431,7 +1484,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau } finally { table.readUnlock(); } - addPartition(db, tableName, clause); + addPartition(db, tableName, clause, false, 0, true); } catch (UserException e) { throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName() @@ -1439,7 +1492,25 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau } } - public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException { + public static long checkAndGetBufferSize(long indexNum, long bucketNum, + long replicaNum, Database db, String tableName) throws DdlException { + long totalReplicaNum = indexNum * bucketNum * replicaNum; + if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " + + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); + } + return 1 + totalReplicaNum + indexNum * bucketNum; + } + + public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, + boolean isCreateTable, long generatedPartitionId, + boolean writeEditLog) throws DdlException { + // in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated. + // isCreateTable == false, other case, partitionId generate in below, must be set 0 + if (!FeConstants.runningUnitTest && Config.isCloudMode() + && (isCreateTable && generatedPartitionId == 0) || (!isCreateTable && generatedPartitionId != 0)) { + throw new DdlException("not impossible"); + } SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc(); DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); boolean isTempPartition = addPartitionClause.isTempPartition(); @@ -1462,7 +1533,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa if (singlePartitionDesc.isSetIfNotExists()) { LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) { - return; + return null; } } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); @@ -1616,17 +1687,10 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty(); Preconditions.checkNotNull(dataProperty); // check replica quota if this operation done - long indexNum = indexIdToMeta.size(); - long bucketNum = distributionInfo.getBucketNum(); - long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(); - long totalReplicaNum = indexNum * bucketNum * replicaNum; - if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { - throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " - + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); - } - Set tabletIdSet = new HashSet<>(); - long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum; + long bufferSize = checkAndGetBufferSize(indexIdToMeta.size(), distributionInfo.getBucketNum(), + singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(), db, tableName); IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); + Set tabletIdSet = new HashSet<>(); String storagePolicy = olapTable.getStoragePolicy(); if (!Strings.isNullOrEmpty(dataProperty.getStoragePolicy())) { storagePolicy = dataProperty.getStoragePolicy(); @@ -1637,7 +1701,13 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa } }; try { - long partitionId = idGeneratorBuffer.getNextId(); + long partitionId = Config.isCloudMode() && !FeConstants.runningUnitTest && isCreateTable + ? generatedPartitionId : idGeneratorBuffer.getNextId(); + List partitionIds = Lists.newArrayList(partitionId); + List indexIds = new ArrayList<>(indexIdToMeta.keySet()); + if (!isCreateTable) { + beforeCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); + } Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, indexIdToMeta, distributionInfo, dataProperty, singlePartitionDesc.getReplicaAlloc(), @@ -1658,7 +1728,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (singlePartitionDesc.isSetIfNotExists()) { failedCleanCallback.run(); - return; + return null; } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); } @@ -1726,23 +1796,31 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa PartitionPersistInfo info = null; if (partitionInfo.getType() == PartitionType.RANGE) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } - Env.getCurrentEnv().getEditLog().logAddPartition(info); - LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); + if (!isCreateTable) { + afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); + } + if (writeEditLog) { + Env.getCurrentEnv().getEditLog().logAddPartition(info); + LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); + } else { + LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition); + } + return info; } finally { olapTable.writeUnlock(); } @@ -1819,10 +1897,11 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause } // drop - Partition partition = null; long recycleTime = 0; + + Partition partition = null; if (isTempPartition) { - olapTable.dropTempPartition(partitionName, true); + partition = olapTable.dropTempPartition(partitionName, true); } else { if (!clause.isForceDrop()) { partition = olapTable.getPartition(partitionName); @@ -1837,14 +1916,23 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause } } } - olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop()); + partition = olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop()); if (!clause.isForceDrop() && partition != null) { recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(partition.getId()); } } - long version = olapTable.getNextVersion(); - long versionTime = System.currentTimeMillis(); - olapTable.updateVisibleVersionAndTime(version, versionTime); + + long version = olapTable.getVisibleVersion(); + long versionTime = olapTable.getVisibleVersionTime(); + // Only update table version if drop a non-empty partition + if (partition != null && partition.hasData()) { + versionTime = System.currentTimeMillis(); + if (Config.isNotCloudMode()) { + version = olapTable.getNextVersion(); + olapTable.updateVisibleVersionAndTime(version, versionTime); + } + } + // Here, we only wait for the EventProcessor to finish processing the event, // but regardless of the success or failure of the result, // it does not affect the logic of deleting the partition @@ -1857,6 +1945,11 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause // but in order to avoid bugs affecting the original logic, all exceptions are caught LOG.warn("produceEvent failed: ", t); } + // Set new partition loaded flag for statistics. This will trigger auto analyzing to update dropped partition. + TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId()); + if (tableStats != null && tableStats.partitionChanged != null) { + tableStats.partitionChanged.set(true); + } // log long partitionId = partition == null ? -1L : partition.getId(); DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionId, partitionName, @@ -1881,6 +1974,11 @@ public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundExcep } } olapTable.updateVisibleVersionAndTime(info.getVersion(), info.getVersionTime()); + // Replay set new partition loaded flag to true for auto analyze. + TableStatsMeta stats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId()); + if (stats != null && stats.partitionChanged != null) { + stats.partitionChanged.set(true); + } } finally { olapTable.writeUnlock(); } @@ -1912,7 +2010,8 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, BinlogConfig binlogConfig, - boolean isStorageMediumSpecified, List clusterKeyIndexes) + boolean isStorageMediumSpecified, + List clusterKeyIndexes) throws DdlException { // create base index first. Preconditions.checkArgument(tbl.getBaseIndexId() != -1); @@ -1971,9 +2070,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa TStorageType storageType = indexMeta.getStorageType(); List schema = indexMeta.getSchema(); KeysType keysType = indexMeta.getKeysType(); + List indexes = indexId == tbl.getBaseIndexId() ? tbl.getCopiedIndexes() : null; int totalTaskNum = index.getTablets().size() * totalReplicaNum; MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalTaskNum); AgentBatchTask batchTask = new AgentBatchTask(); + List rowStoreColumns = tbl.getTableProperty().getCopiedRowStoreColumns(); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); for (Replica replica : tablet.getReplicas()) { @@ -1983,7 +2084,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tbl.getId(), partitionId, indexId, tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType, realStorageMedium, schema, bfColumns, tbl.getBfFpp(), countDownLatch, - tbl.getCopiedIndexes(), tbl.isInMemory(), tabletType, + indexes, tbl.isInMemory(), tabletType, tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getEnableUniqueKeyMergeOnWrite(), storagePolicy, tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), @@ -1992,10 +2093,12 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), - tbl.storeRowColumn(), binlogConfig, objectPool, tbl.rowStorePageSize()); + tbl.storeRowColumn(), binlogConfig, + tbl.getRowStoreColumnsUniqueIds(rowStoreColumns), + objectPool, tbl.rowStorePageSize()); task.setStorageFormat(tbl.getStorageFormat()); - task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat()); + task.setInvertedIndexFileStorageFormat(tbl.getInvertedIndexFileStorageFormat()); task.setClusterKeyIndexes(clusterKeyIndexes); batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. @@ -2022,11 +2125,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa Map failedTabletCounter = Maps.newHashMap(); countDownLatch.getLeftMarks().stream().forEach( item -> failedTabletCounter.put(item.getValue(), - failedTabletCounter.getOrDefault(item.getValue(), 0) + 1)); + failedTabletCounter.getOrDefault(item.getValue(), 0) + 1)); boolean createFailed = failedTabletCounter.values().stream().anyMatch( failedNum -> (totalReplicaNum - failedNum) < quorumReplicaNum); errMsg = createFailed ? "Failed to create partition[" + partitionName + "]." - : "Failed to create some replicas when create partition[" + partitionName + "]."; + : "Failed to create some replicas when create partition[" + partitionName + "]."; if (!countDownLatch.getStatus().ok()) { errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg(); @@ -2071,6 +2174,23 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa return partition; } + public void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + boolean isCreateTable) + throws DdlException { + } + + public void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + boolean isCreateTable) + throws DdlException { + } + + public void checkAvailableCapacity(Database db) throws DdlException { + // check cluster capacity + Env.getCurrentSystemInfo().checkAvailableCapacity(); + // check db quota + db.checkQuota(); + } + private Type getChildTypeByName(String name, CreateTableStmt stmt) throws AnalysisException { List columns = stmt.getColumns(); @@ -2334,13 +2454,13 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } olapTable.setStorageFormat(storageFormat); - TInvertedIndexStorageFormat invertedIndexStorageFormat; + TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat; try { - invertedIndexStorageFormat = PropertyAnalyzer.analyzeInvertedIndexStorageFormat(properties); + invertedIndexFileStorageFormat = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } - olapTable.setInvertedIndexStorageFormat(invertedIndexStorageFormat); + olapTable.setInvertedIndexFileStorageFormat(invertedIndexFileStorageFormat); // get compression type TCompressionType compressionType = TCompressionType.LZ4; @@ -2402,6 +2522,44 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction); + if (Config.isCloudMode() && ((CloudEnv) env).getEnableStorageVault()) { + // set storage vault + String storageVaultName = PropertyAnalyzer.analyzeStorageVault(properties); + String storageVaultId = null; + // If user does not specify one storage vault then FE would use the default vault + if (Strings.isNullOrEmpty(storageVaultName)) { + Pair info = env.getStorageVaultMgr().getDefaultStorageVaultInfo(); + if (info != null) { + storageVaultName = info.first; + storageVaultId = info.second; + } else { + throw new DdlException("No default storage vault." + + " You can use `SHOW STORAGE VAULT` to get all available vaults," + + " and pick one set default vault with `SET AS DEFAULT STORAGE VAULT`"); + } + } + + if (storageVaultName == null || storageVaultName.isEmpty()) { + throw new DdlException("Invalid Storage Vault. " + + " You can use `SHOW STORAGE VAULT` to get all available vaults," + + " and pick one to set the table property `\"storage_vault_name\" = \"\"`"); + } + + // Check if user has storage vault usage privilege + if (ctx != null && !env.getAuth() + .checkStorageVaultPriv(ctx.getCurrentUserIdentity(), storageVaultName, PrivPredicate.USAGE)) { + throw new DdlException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser() + + "'@'" + ConnectContext.get().getRemoteIP() + + "' for storage vault '" + storageVaultName + "'"); + } + + olapTable.setStorageVaultName(storageVaultName); + storageVaultId = env.getStorageVaultMgr().getVaultIdByName(storageVaultName); + if (storageVaultId != null && !storageVaultId.isEmpty()) { + olapTable.setStorageVaultId(storageVaultId); + } + } + // check `update on current_timestamp` if (!enableUniqueKeyMergeOnWrite) { for (Column column : baseSchema) { @@ -2467,17 +2625,29 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } } - boolean storeRowColumn = false; + // analyze row store columns try { + boolean storeRowColumn = false; storeRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(properties); if (storeRowColumn && !enableLightSchemaChange) { throw new DdlException( "Row store column rely on light schema change, enable light schema change first"); } + olapTable.setStoreRowColumn(storeRowColumn); + List rowStoreColumns; + try { + rowStoreColumns = PropertyAnalyzer.analyzeRowStoreColumns(properties, + baseSchema.stream().map(Column::getName).collect(Collectors.toList())); + if (rowStoreColumns != null && rowStoreColumns.isEmpty()) { + rowStoreColumns = null; + } + olapTable.setRowStoreColumns(rowStoreColumns); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } - olapTable.setStoreRowColumn(storeRowColumn); // set skip inverted index on load boolean skipWriteIndexOnLoad = PropertyAnalyzer.analyzeSkipWriteIndexOnLoad(properties); @@ -2485,6 +2655,9 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_MUTABLE, true); + Long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties); + olapTable.setTTLSeconds(ttlSeconds); + // set storage policy String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties); Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy); @@ -2682,7 +2855,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx boolean hadLogEditCreateTable = false; try { if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { - if (storagePolicy.equals("") && properties != null && !properties.isEmpty()) { + if (properties != null && !properties.isEmpty()) { // here, all properties should be checked throw new DdlException("Unknown properties: " + properties); } @@ -2697,12 +2870,14 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx long bucketNum = partitionDistributionInfo.getBucketNum(); long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum(); long totalReplicaNum = indexNum * bucketNum * replicaNum; - if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { throw new DdlException( "Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, + beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); + Partition partition = createPartitionWithIndices(db.getId(), olapTable, + partitionId, partitionName, olapTable.getIndexIdToMeta(), partitionDistributionInfo, partitionInfo.getDataProperty(partitionId), partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, tabletIdSet, @@ -2712,6 +2887,8 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx binlogConfigForTask, partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds()); + afterCreatePartitions(db.getId(), olapTable.getId(), null, + olapTable.getIndexIdList(), true); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -2728,6 +2905,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // and then check if there still has unknown properties olapTable.setStorageMedium(dataProperty.getStorageMedium()); if (partitionInfo.getType() == PartitionType.RANGE) { + DynamicPartitionUtil.checkDynamicPartitionPropertyKeysValid(properties); DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties, db); } else if (partitionInfo.getType() == PartitionType.LIST) { if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) { @@ -2755,12 +2933,14 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum(); totalReplicaNum += indexNum * bucketNum * replicaNum; } - if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { throw new DdlException( "Database " + db.getFullName() + " create table " + tableName + " increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } + beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); + // this is a 2-level partitioned tables for (Map.Entry entry : partitionNameToId.entrySet()) { DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue()); @@ -2780,7 +2960,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx partionStoragePolicy = storagePolicy; } Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy); - Partition partition = createPartitionWithIndices(db.getId(), olapTable, entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(), partitionDistributionInfo, @@ -2795,6 +2974,8 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx olapTable.getPartitionInfo().getDataProperty(partition.getId()) .setStoragePolicy(partionStoragePolicy); } + afterCreatePartitions(db.getId(), olapTable.getId(), null, + olapTable.getIndexIdList(), true); } else { throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name()); } @@ -3022,7 +3203,7 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends - Tablet tablet = new Tablet(idGeneratorBuffer.getNextId()); + Tablet tablet = EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId()); // add tablet to inverted index first index.addTablet(tablet, tabletMeta); @@ -3151,11 +3332,12 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti Database db = (Database) getDbOrDdlException(dbTbl.getDb()); OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl()); - long rowsToTruncate = 0; if (olapTable instanceof MTMV && !MTMVUtil.allowModifyMTMVData(ConnectContext.get())) { throw new DdlException("Not allowed to perform current operation on async materialized view"); } + HashMap updateRecords = new HashMap<>(); + BinlogConfig binlogConfig; olapTable.readLock(); try { @@ -3173,10 +3355,9 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } origPartitions.put(partName, partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); - rowsToTruncate += partition.getBaseIndex().getRowCount(); + updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount()); } } else { - rowsToTruncate = olapTable.getRowCount(); for (Partition partition : olapTable.getPartitions()) { // If need absolutely correct, should check running txn here. // But if the txn is in prepare state, cann't known which partitions had load data. @@ -3185,11 +3366,12 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } origPartitions.put(partition.getName(), partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); + updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount()); } } // if table currently has no partitions, this sql like empty command and do nothing, should return directly. // but if truncate whole table, the temporary partitions also need drop - if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getTempPartitions().isEmpty())) { + if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getAllTempPartitions().isEmpty())) { LOG.info("finished to truncate table {}, no partition contains data, do nothing", tblRef.getName().toSql()); return; @@ -3222,6 +3404,19 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti long bufferSize = IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, origPartitions.values()); IdGeneratorBuffer idGeneratorBuffer = origPartitions.isEmpty() ? null : Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); + + Map oldToNewPartitionId = new HashMap(); + List newPartitionIds = new ArrayList(); + for (Map.Entry entry : origPartitions.entrySet()) { + long oldPartitionId = entry.getValue(); + long newPartitionId = idGeneratorBuffer.getNextId(); + oldToNewPartitionId.put(oldPartitionId, newPartitionId); + newPartitionIds.add(newPartitionId); + } + + List indexIds = copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList()); + beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); + for (Map.Entry entry : origPartitions.entrySet()) { // the new partition must use new id // If we still use the old partition id, the behavior of current load jobs on this partition @@ -3229,7 +3424,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti // By using a new id, load job will be aborted(just like partition is dropped), // which is the right behavior. long oldPartitionId = entry.getValue(); - long newPartitionId = idGeneratorBuffer.getNextId(); + long newPartitionId = oldToNewPartitionId.get(oldPartitionId); Partition newPartition = createPartitionWithIndices(db.getId(), copiedTbl, newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(), partitionsDistributionInfo.get(oldPartitionId), @@ -3244,6 +3439,9 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti clusterKeyIdxes); newPartitions.add(newPartition); } + + afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); + } catch (DdlException e) { // create partition failed, remove all newly created tablets failedCleanCallback.run(); @@ -3254,6 +3452,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti // all partitions are created successfully, try to replace the old partitions. // before replacing, we need to check again. // Things may be changed outside the table lock. + List oldPartitions = Lists.newArrayList(); boolean hasWriteLock = false; try { olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId()); @@ -3319,7 +3518,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } // replace - truncateTableInternal(olapTable, newPartitions, truncateEntireTable); + oldPartitions = truncateTableInternal(olapTable, newPartitions, truncateEntireTable); // write edit log TruncateTableInfo info = @@ -3336,23 +3535,23 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } } - HashMap updateRecords = new HashMap<>(); - updateRecords.put(olapTable.getId(), rowsToTruncate); - if (truncateEntireTable) { - // Drop the whole table stats after truncate the entire table - Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); - } else { - // Update the updated rows in table stats after truncate some partitions. - Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); - } + erasePartitionDropBackendReplicas(oldPartitions); + + PartitionNames partitionNames = truncateEntireTable ? null + : new PartitionNames(false, tblRef.getPartitionNames().getPartitionNames()); + Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, partitionNames); + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords, db.getId(), olapTable.getId(), 0); LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } - private void truncateTableInternal(OlapTable olapTable, List newPartitions, boolean isEntireTable) { + private List truncateTableInternal(OlapTable olapTable, List newPartitions, + boolean isEntireTable) { // use new partitions to replace the old ones. + List oldPartitions = Lists.newArrayList(); Set oldTabletIds = Sets.newHashSet(); for (Partition newPartition : newPartitions) { Partition oldPartition = olapTable.replacePartition(newPartition); + oldPartitions.add(oldPartition); // save old tablets to be removed for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) { index.getTablets().forEach(t -> { @@ -3362,6 +3561,12 @@ private void truncateTableInternal(OlapTable olapTable, List newParti } if (isEntireTable) { + Set oldPartitionsIds = oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet()); + for (Partition partition : olapTable.getAllTempPartitions()) { + if (!oldPartitionsIds.contains(partition.getId())) { + oldPartitions.add(partition); + } + } // drop all temp partitions olapTable.dropAllTempPartitions(); } @@ -3370,9 +3575,12 @@ private void truncateTableInternal(OlapTable olapTable, List newParti for (Long tabletId : oldTabletIds) { Env.getCurrentInvertedIndex().deleteTablet(tabletId); } + + return oldPartitions; } public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException { + List oldPartitions = Lists.newArrayList(); Database db = (Database) getDbOrMetaException(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTblId(), TableType.OLAP); olapTable.writeLock(); @@ -3382,6 +3590,7 @@ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundExcep // add tablet to inverted index TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Partition partition : info.getPartitions()) { + oldPartitions.add(partition); long partitionId = partition.getId(); TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) .getStorageMedium(); @@ -3402,6 +3611,10 @@ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundExcep } finally { olapTable.writeUnlock(); } + + if (!Env.isCheckpointThread()) { + erasePartitionDropBackendReplicas(oldPartitions); + } } public void replayAlterExternalTableSchema(String dbName, String tableName, List newSchema) @@ -3465,8 +3678,7 @@ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlEx int dbCount = dis.readInt(); long newChecksum = checksum ^ dbCount; for (long i = 0; i < dbCount; ++i) { - Database db = new Database(); - db.readFields(dis); + Database db = Database.read(dis); newChecksum ^= db.getId(); Database dbPrev = fullNameToDb.get(db.getFullName()); @@ -3512,4 +3724,12 @@ public void replayAutoIncrementIdUpdateLog(AutoIncrementIdUpdateLog log) throws public boolean enableAutoAnalyze() { return true; } + + public Map getUsedDataQuota() { + Map dbToDataSize = new TreeMap<>(); + for (Database db : this.idToDb.values()) { + dbToDataSize.put(db.getFullName(), db.getUsedDataQuotaWithLock()); + } + return dbToDataSize; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java index 60765d705d554d..d5e8a39e605a8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; @@ -43,15 +44,13 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @Data public class TablePartitionValues { public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; - private final ReadWriteLock readWriteLock; + private final MonitoredReentrantReadWriteLock readWriteLock; private long lastUpdateTimestamp; private long nextPartitionId; private final Map idToPartitionItem; @@ -68,7 +67,7 @@ public class TablePartitionValues { private Map> singleUidToColumnRangeMap; public TablePartitionValues() { - readWriteLock = new ReentrantReadWriteLock(); + readWriteLock = new MonitoredReentrantReadWriteLock(); lastUpdateTimestamp = 0; nextPartitionId = 0; idToPartitionItem = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java index 6d677779639498..1a85d21bb7b8b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java @@ -79,17 +79,17 @@ public List getUserByName(String name) { } public void checkPassword(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString, - List currentUser) throws AuthenticationException { + List currentUser) throws AuthenticationException { checkPasswordInternal(remoteUser, remoteHost, remotePasswd, randomString, null, currentUser, false); } public void checkPlainPassword(String remoteUser, String remoteHost, String remotePasswd, - List currentUser) throws AuthenticationException { + List currentUser) throws AuthenticationException { checkPasswordInternal(remoteUser, remoteHost, null, null, remotePasswd, currentUser, true); } private void checkPasswordInternal(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString, - String remotePasswdStr, List currentUser, boolean plain) throws AuthenticationException { + String remotePasswdStr, List currentUser, boolean plain) throws AuthenticationException { PasswordPolicyManager passwdPolicyMgr = Env.getCurrentEnv().getAuth().getPasswdPolicyManager(); List users = nameToUsers.get(remoteUser); if (CollectionUtils.isEmpty(users)) { @@ -147,7 +147,7 @@ private String hasRemotePasswd(boolean plain, byte[] remotePasswd) { } private boolean comparePassword(Password curUserPassword, byte[] remotePasswd, - byte[] randomString, String remotePasswdStr, boolean plain) { + byte[] randomString, String remotePasswdStr, boolean plain) { // check password if (plain) { return MysqlPassword.checkPlainPass(curUserPassword.getPassword(), remotePasswdStr); @@ -182,7 +182,7 @@ public void clearEntriesSetByResolver() { } public User createUser(UserIdentity userIdent, byte[] pwd, UserIdentity domainUserIdent, boolean setByResolver, - String comment) + String comment) throws PatternMatcherException { if (userIdentityExist(userIdent, true)) { User userByUserIdentity = getUserByUserIdentity(userIdent); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java index 88454eaecdc4e0..c0ea85dc9f2177 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java @@ -23,6 +23,7 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.InsertJob; @@ -33,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -41,7 +41,7 @@ */ public class LabelProcessor { private final Map>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true); private void readLock() { lock.readLock().lock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java index 529454b9fa3048..51a4a6755b8d20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java @@ -18,6 +18,7 @@ package org.apache.doris.qe.cache; import org.apache.doris.catalog.Env; +import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.proto.Types; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.system.Backend; @@ -33,8 +34,6 @@ import java.util.List; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * Use consistent hashing to find the BE corresponding to the key to @@ -47,7 +46,7 @@ public class CacheCoordinator { public boolean debugModel = false; private Hashtable realNodes = new Hashtable<>(); private SortedMap virtualNodes = new TreeMap<>(); - private static Lock belock = new ReentrantLock(); + private static MonitoredReentrantLock belock = new MonitoredReentrantLock(); private long lastRefreshTime; private static CacheCoordinator cachePartition; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cc4ba2110846de..eb7cc0e86dcd85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -44,6 +44,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.InternalDatabaseUtil; @@ -90,7 +91,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -119,7 +119,7 @@ private enum PublishResult { // the lock is used to control the access to transaction states // no other locks should be inside this lock - private final ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true); + private final MonitoredReentrantReadWriteLock transactionLock = new MonitoredReentrantReadWriteLock(true); // transactionId -> running TransactionState private final Map idToRunningTransactionState = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java index f8f7b2178f971b..1608b1d6efa3e5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java @@ -17,6 +17,8 @@ package org.apache.doris.common.util; +import org.apache.doris.common.lock.MonitoredReentrantLock; + import org.junit.Assert; import org.junit.Test; @@ -24,7 +26,7 @@ public class QueryableReentrantLockTest { - private QueryableReentrantLock lock = new QueryableReentrantLock(true); + private MonitoredReentrantLock lock = new MonitoredReentrantLock(true); @Test public void test() throws InterruptedException { diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 0b313cb4c5b687..40d5fbb792ff58 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -116,3 +116,7 @@ workload_group_max_num = 30 master_sync_policy = WRITE_NO_SYNC replica_sync_policy = WRITE_NO_SYNC +enable_advance_next_id = true +# enable deadlock detection +enable_deadlock_detection = true +max_lock_hold_threshold_seconds = 10 From 4935e36eed0deb895c513804347b2e8ea71fe713 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 8 Aug 2024 15:50:51 +0800 Subject: [PATCH 2/3] fix checkstyle --- .../org/apache/doris/catalog/Database.java | 1 - .../java/org/apache/doris/catalog/Table.java | 1 - .../doris/datasource/InternalCatalog.java | 451 +++++------------- .../doris/mysql/privilege/UserManager.java | 3 +- 4 files changed, 119 insertions(+), 337 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index a1644e6e8c6ce5..0eed9534f0ae02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -31,7 +31,6 @@ import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.persist.CreateTableInfo; import org.apache.doris.persist.gson.GsonUtils; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index b669b30cb3932e..929f53fb3cb8d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -27,7 +27,6 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 9df82874afda0d..151f624d657e5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -210,8 +210,8 @@ public class InternalCatalog implements CatalogIf { private static final Logger LOG = LogManager.getLogger(InternalCatalog.class); private MonitoredReentrantLock lock = new MonitoredReentrantLock(true); - private transient ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); - private transient ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); + private ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); + private ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); // Add transient to fix gson issue. @Getter @@ -702,26 +702,6 @@ public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlExcepti } } - public void dropCatalogRecycleBin(IdType idType, long id) throws DdlException { - switch (idType) { - case DATABASE_ID: - Env.getCurrentRecycleBin().eraseDatabaseInstantly(id); - LOG.info("drop database[{}] in catalog recycle bin", id); - break; - case TABLE_ID: - Env.getCurrentRecycleBin().eraseTableInstantly(id); - LOG.info("drop table[{}] in catalog recycle bin", id); - break; - case PARTITION_ID: - Env.getCurrentRecycleBin().erasePartitionInstantly(id); - LOG.info("drop partition[{}] in catalog recycle bin", id); - break; - default: - String message = "DROP CATALOG RECYCLE BIN: idType should be 'DbId', 'TableId' or 'PartitionId'."; - throw new DdlException(message); - } - } - public void replayRecoverDatabase(RecoverInfo info) { long dbId = info.getDbId(); String newDbName = info.getNewDbName(); @@ -974,7 +954,7 @@ private static String genDropHint(String dbName, TableIf table) { } public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay, - long recycleTime) { + long recycleTime) { if (table.getType() == TableType.ELASTICSEARCH) { esRepository.deRegisterTable(table.getId()); } else if (table.isManagedTable()) { @@ -996,7 +976,7 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, } private void dropTable(Database db, long tableId, boolean isForceDrop, boolean isReplay, - Long recycleTime) throws MetaNotFoundException { + Long recycleTime) throws MetaNotFoundException { Table table = db.getTableOrMetaException(tableId); db.writeLock(); table.writeLock(); @@ -1011,7 +991,7 @@ private void dropTable(Database db, long tableId, boolean isForceDrop, boolean i } public void replayDropTable(Database db, long tableId, boolean isForceDrop, - Long recycleTime) throws MetaNotFoundException { + Long recycleTime) throws MetaNotFoundException { dropTable(db, tableId, isForceDrop, true, recycleTime); } @@ -1029,42 +1009,6 @@ public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException, D } } - public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) { - if (isReplay || Env.isCheckpointThread()) { - return; - } - - // drop all replicas - AgentBatchTask batchTask = new AgentBatchTask(); - for (Partition partition : olapTable.getAllPartitions()) { - List allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex materializedIndex : allIndices) { - long indexId = materializedIndex.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - for (Tablet tablet : materializedIndex.getTablets()) { - long tabletId = tablet.getId(); - List replicas = tablet.getReplicas(); - for (Replica replica : replicas) { - long backendId = replica.getBackendId(); - long replicaId = replica.getId(); - DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, - replicaId, schemaHash, true); - batchTask.addTask(dropTask); - } // end for replicas - } // end for tablets - } // end for indices - } // end for partitions - AgentTaskExecutor.submit(batchTask); - } - - public void erasePartitionDropBackendReplicas(List partitions) { - // no need send be delete task, when be report its tablets, fe will send delete task then. - } - - public void eraseDroppedIndex(long tableId, List indexIdList) { - // nothing to do in non cloud mode - } - private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) { if (LOG.isDebugEnabled()) { LOG.debug("replay add a replica {}", info); @@ -1176,7 +1120,10 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { // only internal table should check quota and cluster capacity if (!stmt.isExternal()) { - checkAvailableCapacity(db); + // check cluster capacity + Env.getCurrentSystemInfo().checkAvailableCapacity(); + // check db quota + db.checkQuota(); } // check if table exists in db @@ -1334,7 +1281,7 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio resultExpr.getSrcSlotRef().getColumnName()) || (createTableStmt.getDistributionDesc() != null && createTableStmt.getDistributionDesc().inDistributionColumns( - resultExpr.getSrcSlotRef().getColumnName()))) { + resultExpr.getSrcSlotRef().getColumnName()))) { // String type can not be used in partition/distributed column // so we replace it to varchar if (resultType.getPrimitiveType() == PrimitiveType.STRING) { @@ -1368,8 +1315,8 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio } ColumnDef columnDef; if (resultExpr.getSrcSlotRef() == null) { - columnDef = new ColumnDef(name, typeDef, false, null, true, -1, new DefaultValue(false, null), "", - true); + columnDef = new ColumnDef(name, typeDef, false, null, + true, -1, new DefaultValue(false, null), ""); } else { Column column = resultExpr.getSrcSlotRef().getDesc().getColumn(); boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue()); @@ -1377,17 +1324,17 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio if (column.getDefaultValueExprDef() != null) { if (column.getDefaultValueExprDef().getPrecision() != null) { defaultValue = new DefaultValue(setDefault, column.getDefaultValue(), - column.getDefaultValueExprDef().getExprName(), - column.getDefaultValueExprDef().getPrecision()); + column.getDefaultValueExprDef().getExprName(), + column.getDefaultValueExprDef().getPrecision()); } else { defaultValue = new DefaultValue(setDefault, column.getDefaultValue(), - column.getDefaultValueExprDef().getExprName()); + column.getDefaultValueExprDef().getExprName()); } } else { defaultValue = new DefaultValue(setDefault, column.getDefaultValue()); } - columnDef = new ColumnDef(name, typeDef, false, null, column.isAllowNull(), -1, defaultValue, - column.getComment(), true); + columnDef = new ColumnDef(name, typeDef, false, null, + column.isAllowNull(), -1, defaultValue, column.getComment()); } createTableStmt.addColumnDef(columnDef); // set first column as default distribution @@ -1484,7 +1431,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau } finally { table.readUnlock(); } - addPartition(db, tableName, clause, false, 0, true); + addPartition(db, tableName, clause); } catch (UserException e) { throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName() @@ -1492,25 +1439,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau } } - public static long checkAndGetBufferSize(long indexNum, long bucketNum, - long replicaNum, Database db, String tableName) throws DdlException { - long totalReplicaNum = indexNum * bucketNum * replicaNum; - if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { - throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " - + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); - } - return 1 + totalReplicaNum + indexNum * bucketNum; - } - - public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, - boolean isCreateTable, long generatedPartitionId, - boolean writeEditLog) throws DdlException { - // in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated. - // isCreateTable == false, other case, partitionId generate in below, must be set 0 - if (!FeConstants.runningUnitTest && Config.isCloudMode() - && (isCreateTable && generatedPartitionId == 0) || (!isCreateTable && generatedPartitionId != 0)) { - throw new DdlException("not impossible"); - } + public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException { SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc(); DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); boolean isTempPartition = addPartitionClause.isTempPartition(); @@ -1533,7 +1462,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti if (singlePartitionDesc.isSetIfNotExists()) { LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) { - return null; + return; } } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); @@ -1595,23 +1524,23 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, - olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString()); + olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, - olapTable.getTimeSeriesCompactionFileCountThreshold().toString()); + olapTable.getTimeSeriesCompactionFileCountThreshold().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, - olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString()); + olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, - olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString()); + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, - olapTable.getTimeSeriesCompactionLevelThreshold().toString()); + olapTable.getTimeSeriesCompactionLevelThreshold().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) { properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, olapTable.getStoragePolicy()); @@ -1687,10 +1616,17 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty(); Preconditions.checkNotNull(dataProperty); // check replica quota if this operation done - long bufferSize = checkAndGetBufferSize(indexIdToMeta.size(), distributionInfo.getBucketNum(), - singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(), db, tableName); - IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); + long indexNum = indexIdToMeta.size(); + long bucketNum = distributionInfo.getBucketNum(); + long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(); + long totalReplicaNum = indexNum * bucketNum * replicaNum; + if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " + + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); + } Set tabletIdSet = new HashSet<>(); + long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum; + IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); String storagePolicy = olapTable.getStoragePolicy(); if (!Strings.isNullOrEmpty(dataProperty.getStoragePolicy())) { storagePolicy = dataProperty.getStoragePolicy(); @@ -1701,13 +1637,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti } }; try { - long partitionId = Config.isCloudMode() && !FeConstants.runningUnitTest && isCreateTable - ? generatedPartitionId : idGeneratorBuffer.getNextId(); - List partitionIds = Lists.newArrayList(partitionId); - List indexIds = new ArrayList<>(indexIdToMeta.keySet()); - if (!isCreateTable) { - beforeCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); - } + long partitionId = idGeneratorBuffer.getNextId(); Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, indexIdToMeta, distributionInfo, dataProperty, singlePartitionDesc.getReplicaAlloc(), @@ -1728,7 +1658,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (singlePartitionDesc.isSetIfNotExists()) { failedCleanCallback.run(); - return null; + return; } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); } @@ -1796,31 +1726,23 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti PartitionPersistInfo info = null; if (partitionInfo.getType() == PartitionType.RANGE) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } + Env.getCurrentEnv().getEditLog().logAddPartition(info); - if (!isCreateTable) { - afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); - } - if (writeEditLog) { - Env.getCurrentEnv().getEditLog().logAddPartition(info); - LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); - } else { - LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition); - } - return info; + LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); } finally { olapTable.writeUnlock(); } @@ -1897,11 +1819,10 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause } // drop - long recycleTime = 0; - Partition partition = null; + long recycleTime = 0; if (isTempPartition) { - partition = olapTable.dropTempPartition(partitionName, true); + olapTable.dropTempPartition(partitionName, true); } else { if (!clause.isForceDrop()) { partition = olapTable.getPartition(partitionName); @@ -1916,23 +1837,14 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause } } } - partition = olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop()); + olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop()); if (!clause.isForceDrop() && partition != null) { recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(partition.getId()); } } - - long version = olapTable.getVisibleVersion(); - long versionTime = olapTable.getVisibleVersionTime(); - // Only update table version if drop a non-empty partition - if (partition != null && partition.hasData()) { - versionTime = System.currentTimeMillis(); - if (Config.isNotCloudMode()) { - version = olapTable.getNextVersion(); - olapTable.updateVisibleVersionAndTime(version, versionTime); - } - } - + long version = olapTable.getNextVersion(); + long versionTime = System.currentTimeMillis(); + olapTable.updateVisibleVersionAndTime(version, versionTime); // Here, we only wait for the EventProcessor to finish processing the event, // but regardless of the success or failure of the result, // it does not affect the logic of deleting the partition @@ -1945,11 +1857,6 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause // but in order to avoid bugs affecting the original logic, all exceptions are caught LOG.warn("produceEvent failed: ", t); } - // Set new partition loaded flag for statistics. This will trigger auto analyzing to update dropped partition. - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId()); - if (tableStats != null && tableStats.partitionChanged != null) { - tableStats.partitionChanged.set(true); - } // log long partitionId = partition == null ? -1L : partition.getId(); DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionId, partitionName, @@ -1974,11 +1881,6 @@ public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundExcep } } olapTable.updateVisibleVersionAndTime(info.getVersion(), info.getVersionTime()); - // Replay set new partition loaded flag to true for auto analyze. - TableStatsMeta stats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId()); - if (stats != null && stats.partitionChanged != null) { - stats.partitionChanged.set(true); - } } finally { olapTable.writeUnlock(); } @@ -2010,8 +1912,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, BinlogConfig binlogConfig, - boolean isStorageMediumSpecified, - List clusterKeyIndexes) + boolean isStorageMediumSpecified, List clusterKeyIndexes) throws DdlException { // create base index first. Preconditions.checkArgument(tbl.getBaseIndexId() != -1); @@ -2054,11 +1955,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa TabletMeta tabletMeta = new TabletMeta(dbId, tbl.getId(), partitionId, indexId, schemaHash, dataProperty.getStorageMedium()); realStorageMedium = createTablets(index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, - tabletMeta, tabletIdSet, idGeneratorBuffer, dataProperty.isStorageMediumSpecified()); + tabletMeta, tabletIdSet, idGeneratorBuffer, dataProperty.isStorageMediumSpecified()); if (realStorageMedium != null && !realStorageMedium.equals(dataProperty.getStorageMedium())) { dataProperty.setStorageMedium(realStorageMedium); LOG.info("real medium not eq default " - + "tableName={} tableId={} partitionName={} partitionId={} readMedium {}", + + "tableName={} tableId={} partitionName={} partitionId={} readMedium {}", tbl.getName(), tbl.getId(), partitionName, partitionId, realStorageMedium); } @@ -2070,11 +1971,9 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa TStorageType storageType = indexMeta.getStorageType(); List schema = indexMeta.getSchema(); KeysType keysType = indexMeta.getKeysType(); - List indexes = indexId == tbl.getBaseIndexId() ? tbl.getCopiedIndexes() : null; int totalTaskNum = index.getTablets().size() * totalReplicaNum; MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalTaskNum); AgentBatchTask batchTask = new AgentBatchTask(); - List rowStoreColumns = tbl.getTableProperty().getCopiedRowStoreColumns(); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); for (Replica replica : tablet.getReplicas()) { @@ -2084,7 +1983,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tbl.getId(), partitionId, indexId, tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType, realStorageMedium, schema, bfColumns, tbl.getBfFpp(), countDownLatch, - indexes, tbl.isInMemory(), tabletType, + tbl.getCopiedIndexes(), tbl.isInMemory(), tabletType, tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getEnableUniqueKeyMergeOnWrite(), storagePolicy, tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), @@ -2093,12 +1992,10 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), - tbl.storeRowColumn(), binlogConfig, - tbl.getRowStoreColumnsUniqueIds(rowStoreColumns), - objectPool, tbl.rowStorePageSize()); + tbl.storeRowColumn(), binlogConfig, objectPool, tbl.rowStorePageSize()); task.setStorageFormat(tbl.getStorageFormat()); - task.setInvertedIndexFileStorageFormat(tbl.getInvertedIndexFileStorageFormat()); + task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat()); task.setClusterKeyIndexes(clusterKeyIndexes); batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. @@ -2125,11 +2022,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa Map failedTabletCounter = Maps.newHashMap(); countDownLatch.getLeftMarks().stream().forEach( item -> failedTabletCounter.put(item.getValue(), - failedTabletCounter.getOrDefault(item.getValue(), 0) + 1)); + failedTabletCounter.getOrDefault(item.getValue(), 0) + 1)); boolean createFailed = failedTabletCounter.values().stream().anyMatch( failedNum -> (totalReplicaNum - failedNum) < quorumReplicaNum); errMsg = createFailed ? "Failed to create partition[" + partitionName + "]." - : "Failed to create some replicas when create partition[" + partitionName + "]."; + : "Failed to create some replicas when create partition[" + partitionName + "]."; if (!countDownLatch.getStatus().ok()) { errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg(); @@ -2174,23 +2071,6 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa return partition; } - public void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, - boolean isCreateTable) - throws DdlException { - } - - public void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, - boolean isCreateTable) - throws DdlException { - } - - public void checkAvailableCapacity(Database db) throws DdlException { - // check cluster capacity - Env.getCurrentSystemInfo().checkAvailableCapacity(); - // check db quota - db.checkQuota(); - } - private Type getChildTypeByName(String name, CreateTableStmt stmt) throws AnalysisException { List columns = stmt.getColumns(); @@ -2304,8 +2184,8 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema); if (defaultDistributionInfo instanceof HashDistributionInfo - && ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns() - .stream().anyMatch(column -> column.getType().isVariantType())) { + && ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns() + .stream().anyMatch(column -> column.getType().isVariantType())) { throw new DdlException("Hash distribution info should not contain variant columns"); } @@ -2382,20 +2262,20 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx && (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))) { + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))) { throw new DdlException("only time series compaction policy support for time series config"); } // set time series compaction goal size long timeSeriesCompactionGoalSizeMbytes - = PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; try { timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer - .analyzeTimeSeriesCompactionGoalSizeMbytes(properties); + .analyzeTimeSeriesCompactionGoalSizeMbytes(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2403,10 +2283,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction file count threshold long timeSeriesCompactionFileCountThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionFileCountThreshold = PropertyAnalyzer - .analyzeTimeSeriesCompactionFileCountThreshold(properties); + .analyzeTimeSeriesCompactionFileCountThreshold(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2414,10 +2294,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction time threshold long timeSeriesCompactionTimeThresholdSeconds - = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; try { timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer - .analyzeTimeSeriesCompactionTimeThresholdSeconds(properties); + .analyzeTimeSeriesCompactionTimeThresholdSeconds(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2425,10 +2305,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction empty rowsets threshold long timeSeriesCompactionEmptyRowsetsThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer - .analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties); + .analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2436,10 +2316,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction level threshold long timeSeriesCompactionLevelThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionLevelThreshold = PropertyAnalyzer - .analyzeTimeSeriesCompactionLevelThreshold(properties); + .analyzeTimeSeriesCompactionLevelThreshold(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2454,13 +2334,13 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } olapTable.setStorageFormat(storageFormat); - TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat; + TInvertedIndexStorageFormat invertedIndexStorageFormat; try { - invertedIndexFileStorageFormat = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(properties); + invertedIndexStorageFormat = PropertyAnalyzer.analyzeInvertedIndexStorageFormat(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } - olapTable.setInvertedIndexFileStorageFormat(invertedIndexFileStorageFormat); + olapTable.setInvertedIndexStorageFormat(invertedIndexStorageFormat); // get compression type TCompressionType compressionType = TCompressionType.LZ4; @@ -2522,44 +2402,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction); - if (Config.isCloudMode() && ((CloudEnv) env).getEnableStorageVault()) { - // set storage vault - String storageVaultName = PropertyAnalyzer.analyzeStorageVault(properties); - String storageVaultId = null; - // If user does not specify one storage vault then FE would use the default vault - if (Strings.isNullOrEmpty(storageVaultName)) { - Pair info = env.getStorageVaultMgr().getDefaultStorageVaultInfo(); - if (info != null) { - storageVaultName = info.first; - storageVaultId = info.second; - } else { - throw new DdlException("No default storage vault." - + " You can use `SHOW STORAGE VAULT` to get all available vaults," - + " and pick one set default vault with `SET AS DEFAULT STORAGE VAULT`"); - } - } - - if (storageVaultName == null || storageVaultName.isEmpty()) { - throw new DdlException("Invalid Storage Vault. " - + " You can use `SHOW STORAGE VAULT` to get all available vaults," - + " and pick one to set the table property `\"storage_vault_name\" = \"\"`"); - } - - // Check if user has storage vault usage privilege - if (ctx != null && !env.getAuth() - .checkStorageVaultPriv(ctx.getCurrentUserIdentity(), storageVaultName, PrivPredicate.USAGE)) { - throw new DdlException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser() - + "'@'" + ConnectContext.get().getRemoteIP() - + "' for storage vault '" + storageVaultName + "'"); - } - - olapTable.setStorageVaultName(storageVaultName); - storageVaultId = env.getStorageVaultMgr().getVaultIdByName(storageVaultName); - if (storageVaultId != null && !storageVaultId.isEmpty()) { - olapTable.setStorageVaultId(storageVaultId); - } - } - // check `update on current_timestamp` if (!enableUniqueKeyMergeOnWrite) { for (Column column : baseSchema) { @@ -2625,29 +2467,17 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } } - // analyze row store columns + boolean storeRowColumn = false; try { - boolean storeRowColumn = false; storeRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(properties); if (storeRowColumn && !enableLightSchemaChange) { throw new DdlException( "Row store column rely on light schema change, enable light schema change first"); } - olapTable.setStoreRowColumn(storeRowColumn); - List rowStoreColumns; - try { - rowStoreColumns = PropertyAnalyzer.analyzeRowStoreColumns(properties, - baseSchema.stream().map(Column::getName).collect(Collectors.toList())); - if (rowStoreColumns != null && rowStoreColumns.isEmpty()) { - rowStoreColumns = null; - } - olapTable.setRowStoreColumns(rowStoreColumns); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } + olapTable.setStoreRowColumn(storeRowColumn); // set skip inverted index on load boolean skipWriteIndexOnLoad = PropertyAnalyzer.analyzeSkipWriteIndexOnLoad(properties); @@ -2655,9 +2485,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_MUTABLE, true); - Long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties); - olapTable.setTTLSeconds(ttlSeconds); - // set storage policy String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties); Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy); @@ -2665,7 +2492,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx && !Strings.isNullOrEmpty(storagePolicy)) { throw new AnalysisException( "Can not create UNIQUE KEY table that enables Merge-On-write" - + " with storage policy(" + storagePolicy + ")"); + + " with storage policy(" + storagePolicy + ")"); } // Consider one situation: if the table has no storage policy but some partitions // have their own storage policy then it might be erased by the following function. @@ -2855,7 +2682,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx boolean hadLogEditCreateTable = false; try { if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { - if (properties != null && !properties.isEmpty()) { + if (storagePolicy.equals("") && properties != null && !properties.isEmpty()) { // here, all properties should be checked throw new DdlException("Unknown properties: " + properties); } @@ -2870,14 +2697,12 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx long bucketNum = partitionDistributionInfo.getBucketNum(); long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum(); long totalReplicaNum = indexNum * bucketNum * replicaNum; - if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { throw new DdlException( "Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); - Partition partition = createPartitionWithIndices(db.getId(), olapTable, - partitionId, partitionName, + Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, olapTable.getIndexIdToMeta(), partitionDistributionInfo, partitionInfo.getDataProperty(partitionId), partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, tabletIdSet, @@ -2887,8 +2712,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx binlogConfigForTask, partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds()); - afterCreatePartitions(db.getId(), olapTable.getId(), null, - olapTable.getIndexIdList(), true); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -2905,7 +2728,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // and then check if there still has unknown properties olapTable.setStorageMedium(dataProperty.getStorageMedium()); if (partitionInfo.getType() == PartitionType.RANGE) { - DynamicPartitionUtil.checkDynamicPartitionPropertyKeysValid(properties); DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties, db); } else if (partitionInfo.getType() == PartitionType.LIST) { if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) { @@ -2933,14 +2755,12 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum(); totalReplicaNum += indexNum * bucketNum * replicaNum; } - if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { throw new DdlException( "Database " + db.getFullName() + " create table " + tableName + " increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); - // this is a 2-level partitioned tables for (Map.Entry entry : partitionNameToId.entrySet()) { DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue()); @@ -2960,6 +2780,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx partionStoragePolicy = storagePolicy; } Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy); + Partition partition = createPartitionWithIndices(db.getId(), olapTable, entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(), partitionDistributionInfo, @@ -2974,8 +2795,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx olapTable.getPartitionInfo().getDataProperty(partition.getId()) .setStoragePolicy(partionStoragePolicy); } - afterCreatePartitions(db.getId(), olapTable.getId(), null, - olapTable.getIndexIdList(), true); } else { throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name()); } @@ -3149,7 +2968,7 @@ private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlExc } private boolean checkCreateTableResult(String tableName, long tableId, Pair result) - throws DdlException { + throws DdlException { if (Boolean.FALSE.equals(result.first)) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } @@ -3162,8 +2981,10 @@ private boolean checkCreateTableResult(String tableName, long tableId, Pair tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified) + DistributionInfo distributionInfo, long version, + ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, + Set tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, + boolean isStorageMediumSpecified) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); @@ -3203,7 +3024,7 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends - Tablet tablet = EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId()); + Tablet tablet = new Tablet(idGeneratorBuffer.getNextId()); // add tablet to inverted index first index.addTablet(tablet, tabletMeta); @@ -3332,12 +3153,11 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti Database db = (Database) getDbOrDdlException(dbTbl.getDb()); OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl()); + long rowsToTruncate = 0; if (olapTable instanceof MTMV && !MTMVUtil.allowModifyMTMVData(ConnectContext.get())) { throw new DdlException("Not allowed to perform current operation on async materialized view"); } - HashMap updateRecords = new HashMap<>(); - BinlogConfig binlogConfig; olapTable.readLock(); try { @@ -3355,9 +3175,10 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } origPartitions.put(partName, partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); - updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount()); + rowsToTruncate += partition.getBaseIndex().getRowCount(); } } else { + rowsToTruncate = olapTable.getRowCount(); for (Partition partition : olapTable.getPartitions()) { // If need absolutely correct, should check running txn here. // But if the txn is in prepare state, cann't known which partitions had load data. @@ -3366,12 +3187,11 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } origPartitions.put(partition.getName(), partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); - updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount()); } } // if table currently has no partitions, this sql like empty command and do nothing, should return directly. // but if truncate whole table, the temporary partitions also need drop - if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getAllTempPartitions().isEmpty())) { + if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getTempPartitions().isEmpty())) { LOG.info("finished to truncate table {}, no partition contains data, do nothing", tblRef.getName().toSql()); return; @@ -3404,19 +3224,6 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti long bufferSize = IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, origPartitions.values()); IdGeneratorBuffer idGeneratorBuffer = origPartitions.isEmpty() ? null : Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); - - Map oldToNewPartitionId = new HashMap(); - List newPartitionIds = new ArrayList(); - for (Map.Entry entry : origPartitions.entrySet()) { - long oldPartitionId = entry.getValue(); - long newPartitionId = idGeneratorBuffer.getNextId(); - oldToNewPartitionId.put(oldPartitionId, newPartitionId); - newPartitionIds.add(newPartitionId); - } - - List indexIds = copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList()); - beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); - for (Map.Entry entry : origPartitions.entrySet()) { // the new partition must use new id // If we still use the old partition id, the behavior of current load jobs on this partition @@ -3424,7 +3231,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti // By using a new id, load job will be aborted(just like partition is dropped), // which is the right behavior. long oldPartitionId = entry.getValue(); - long newPartitionId = oldToNewPartitionId.get(oldPartitionId); + long newPartitionId = idGeneratorBuffer.getNextId(); Partition newPartition = createPartitionWithIndices(db.getId(), copiedTbl, newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(), partitionsDistributionInfo.get(oldPartitionId), @@ -3439,9 +3246,6 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti clusterKeyIdxes); newPartitions.add(newPartition); } - - afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); - } catch (DdlException e) { // create partition failed, remove all newly created tablets failedCleanCallback.run(); @@ -3452,7 +3256,6 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti // all partitions are created successfully, try to replace the old partitions. // before replacing, we need to check again. // Things may be changed outside the table lock. - List oldPartitions = Lists.newArrayList(); boolean hasWriteLock = false; try { olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId()); @@ -3518,7 +3321,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } // replace - oldPartitions = truncateTableInternal(olapTable, newPartitions, truncateEntireTable); + truncateTableInternal(olapTable, newPartitions, truncateEntireTable); // write edit log TruncateTableInfo info = @@ -3535,23 +3338,23 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } } - erasePartitionDropBackendReplicas(oldPartitions); - - PartitionNames partitionNames = truncateEntireTable ? null - : new PartitionNames(false, tblRef.getPartitionNames().getPartitionNames()); - Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, partitionNames); - Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords, db.getId(), olapTable.getId(), 0); + HashMap updateRecords = new HashMap<>(); + updateRecords.put(olapTable.getId(), rowsToTruncate); + if (truncateEntireTable) { + // Drop the whole table stats after truncate the entire table + Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); + } else { + // Update the updated rows in table stats after truncate some partitions. + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); + } LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } - private List truncateTableInternal(OlapTable olapTable, List newPartitions, - boolean isEntireTable) { + private void truncateTableInternal(OlapTable olapTable, List newPartitions, boolean isEntireTable) { // use new partitions to replace the old ones. - List oldPartitions = Lists.newArrayList(); Set oldTabletIds = Sets.newHashSet(); for (Partition newPartition : newPartitions) { Partition oldPartition = olapTable.replacePartition(newPartition); - oldPartitions.add(oldPartition); // save old tablets to be removed for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) { index.getTablets().forEach(t -> { @@ -3561,12 +3364,6 @@ private List truncateTableInternal(OlapTable olapTable, List oldPartitionsIds = oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet()); - for (Partition partition : olapTable.getAllTempPartitions()) { - if (!oldPartitionsIds.contains(partition.getId())) { - oldPartitions.add(partition); - } - } // drop all temp partitions olapTable.dropAllTempPartitions(); } @@ -3575,12 +3372,9 @@ private List truncateTableInternal(OlapTable olapTable, List oldPartitions = Lists.newArrayList(); Database db = (Database) getDbOrMetaException(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTblId(), TableType.OLAP); olapTable.writeLock(); @@ -3590,7 +3384,6 @@ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundExcep // add tablet to inverted index TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Partition partition : info.getPartitions()) { - oldPartitions.add(partition); long partitionId = partition.getId(); TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) .getStorageMedium(); @@ -3611,10 +3404,6 @@ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundExcep } finally { olapTable.writeUnlock(); } - - if (!Env.isCheckpointThread()) { - erasePartitionDropBackendReplicas(oldPartitions); - } } public void replayAlterExternalTableSchema(String dbName, String tableName, List newSchema) @@ -3678,7 +3467,8 @@ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlEx int dbCount = dis.readInt(); long newChecksum = checksum ^ dbCount; for (long i = 0; i < dbCount; ++i) { - Database db = Database.read(dis); + Database db = new Database(); + db.readFields(dis); newChecksum ^= db.getId(); Database dbPrev = fullNameToDb.get(db.getFullName()); @@ -3686,12 +3476,13 @@ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlEx String errMsg; if (dbPrev instanceof MysqlCompatibleDatabase || db instanceof MysqlCompatibleDatabase) { errMsg = String.format( - "Mysql compatibility problem, previous checkpoint already has a database with full name " - + "%s. If its name is mysql, try to add mysqldb_replace_name=\"mysql_comp\" in fe.conf.", - db.getFullName()); + "Mysql compatibility problem, previous checkpoint already has a database with full name " + + "%s. If its name is mysql, try to add mysqldb_replace_name=\"mysql_comp\" in " + + "fe.conf.", + db.getFullName()); } else { errMsg = String.format("Logical error, duplicated database fullname: %s, id: %d %d.", - db.getFullName(), db.getId(), fullNameToDb.get(db.getFullName()).getId()); + db.getFullName(), db.getId(), fullNameToDb.get(db.getFullName()).getId()); } throw new IOException(errMsg); } @@ -3724,12 +3515,4 @@ public void replayAutoIncrementIdUpdateLog(AutoIncrementIdUpdateLog log) throws public boolean enableAutoAnalyze() { return true; } - - public Map getUsedDataQuota() { - Map dbToDataSize = new TreeMap<>(); - for (Database db : this.idToDb.values()) { - dbToDataSize.put(db.getFullName(), db.getUsedDataQuotaWithLock()); - } - return dbToDataSize; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java index 1a85d21bb7b8b3..a6712456e8563d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java @@ -89,7 +89,8 @@ public void checkPlainPassword(String remoteUser, String remoteHost, String remo } private void checkPasswordInternal(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString, - String remotePasswdStr, List currentUser, boolean plain) throws AuthenticationException { + String remotePasswdStr, List currentUser, boolean plain) + throws AuthenticationException { PasswordPolicyManager passwdPolicyMgr = Env.getCurrentEnv().getAuth().getPasswdPolicyManager(); List users = nameToUsers.get(remoteUser); if (CollectionUtils.isEmpty(users)) { From 2f01dc34a8d544c7c3996495d8e6faf3c8af8e1b Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 8 Aug 2024 16:11:58 +0800 Subject: [PATCH 3/3] fix checkstyle --- .../doris/datasource/InternalCatalog.java | 77 +++++++++---------- .../doris/mysql/privilege/UserManager.java | 11 ++- 2 files changed, 42 insertions(+), 46 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 151f624d657e5e..2104ea2638c36b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -954,7 +954,7 @@ private static String genDropHint(String dbName, TableIf table) { } public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay, - long recycleTime) { + long recycleTime) { if (table.getType() == TableType.ELASTICSEARCH) { esRepository.deRegisterTable(table.getId()); } else if (table.isManagedTable()) { @@ -976,7 +976,7 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, } private void dropTable(Database db, long tableId, boolean isForceDrop, boolean isReplay, - Long recycleTime) throws MetaNotFoundException { + Long recycleTime) throws MetaNotFoundException { Table table = db.getTableOrMetaException(tableId); db.writeLock(); table.writeLock(); @@ -991,7 +991,7 @@ private void dropTable(Database db, long tableId, boolean isForceDrop, boolean i } public void replayDropTable(Database db, long tableId, boolean isForceDrop, - Long recycleTime) throws MetaNotFoundException { + Long recycleTime) throws MetaNotFoundException { dropTable(db, tableId, isForceDrop, true, recycleTime); } @@ -1281,7 +1281,7 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio resultExpr.getSrcSlotRef().getColumnName()) || (createTableStmt.getDistributionDesc() != null && createTableStmt.getDistributionDesc().inDistributionColumns( - resultExpr.getSrcSlotRef().getColumnName()))) { + resultExpr.getSrcSlotRef().getColumnName()))) { // String type can not be used in partition/distributed column // so we replace it to varchar if (resultType.getPrimitiveType() == PrimitiveType.STRING) { @@ -1324,11 +1324,11 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio if (column.getDefaultValueExprDef() != null) { if (column.getDefaultValueExprDef().getPrecision() != null) { defaultValue = new DefaultValue(setDefault, column.getDefaultValue(), - column.getDefaultValueExprDef().getExprName(), - column.getDefaultValueExprDef().getPrecision()); + column.getDefaultValueExprDef().getExprName(), + column.getDefaultValueExprDef().getPrecision()); } else { defaultValue = new DefaultValue(setDefault, column.getDefaultValue(), - column.getDefaultValueExprDef().getExprName()); + column.getDefaultValueExprDef().getExprName()); } } else { defaultValue = new DefaultValue(setDefault, column.getDefaultValue()); @@ -1524,23 +1524,23 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, - olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString()); + olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, - olapTable.getTimeSeriesCompactionFileCountThreshold().toString()); + olapTable.getTimeSeriesCompactionFileCountThreshold().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, - olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString()); + olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, - olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString()); + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, - olapTable.getTimeSeriesCompactionLevelThreshold().toString()); + olapTable.getTimeSeriesCompactionLevelThreshold().toString()); } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) { properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, olapTable.getStoragePolicy()); @@ -1955,11 +1955,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa TabletMeta tabletMeta = new TabletMeta(dbId, tbl.getId(), partitionId, indexId, schemaHash, dataProperty.getStorageMedium()); realStorageMedium = createTablets(index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, - tabletMeta, tabletIdSet, idGeneratorBuffer, dataProperty.isStorageMediumSpecified()); + tabletMeta, tabletIdSet, idGeneratorBuffer, dataProperty.isStorageMediumSpecified()); if (realStorageMedium != null && !realStorageMedium.equals(dataProperty.getStorageMedium())) { dataProperty.setStorageMedium(realStorageMedium); LOG.info("real medium not eq default " - + "tableName={} tableId={} partitionName={} partitionId={} readMedium {}", + + "tableName={} tableId={} partitionName={} partitionId={} readMedium {}", tbl.getName(), tbl.getId(), partitionName, partitionId, realStorageMedium); } @@ -2184,8 +2184,8 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema); if (defaultDistributionInfo instanceof HashDistributionInfo - && ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns() - .stream().anyMatch(column -> column.getType().isVariantType())) { + && ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns() + .stream().anyMatch(column -> column.getType().isVariantType())) { throw new DdlException("Hash distribution info should not contain variant columns"); } @@ -2262,20 +2262,20 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx && (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))) { + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))) { throw new DdlException("only time series compaction policy support for time series config"); } // set time series compaction goal size long timeSeriesCompactionGoalSizeMbytes - = PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; try { timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer - .analyzeTimeSeriesCompactionGoalSizeMbytes(properties); + .analyzeTimeSeriesCompactionGoalSizeMbytes(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2283,10 +2283,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction file count threshold long timeSeriesCompactionFileCountThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionFileCountThreshold = PropertyAnalyzer - .analyzeTimeSeriesCompactionFileCountThreshold(properties); + .analyzeTimeSeriesCompactionFileCountThreshold(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2294,10 +2294,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction time threshold long timeSeriesCompactionTimeThresholdSeconds - = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; try { timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer - .analyzeTimeSeriesCompactionTimeThresholdSeconds(properties); + .analyzeTimeSeriesCompactionTimeThresholdSeconds(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2305,10 +2305,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction empty rowsets threshold long timeSeriesCompactionEmptyRowsetsThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer - .analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties); + .analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2316,10 +2316,10 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx // set time series compaction level threshold long timeSeriesCompactionLevelThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionLevelThreshold = PropertyAnalyzer - .analyzeTimeSeriesCompactionLevelThreshold(properties); + .analyzeTimeSeriesCompactionLevelThreshold(properties); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -2492,7 +2492,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx && !Strings.isNullOrEmpty(storagePolicy)) { throw new AnalysisException( "Can not create UNIQUE KEY table that enables Merge-On-write" - + " with storage policy(" + storagePolicy + ")"); + + " with storage policy(" + storagePolicy + ")"); } // Consider one situation: if the table has no storage policy but some partitions // have their own storage policy then it might be erased by the following function. @@ -2968,7 +2968,7 @@ private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlExc } private boolean checkCreateTableResult(String tableName, long tableId, Pair result) - throws DdlException { + throws DdlException { if (Boolean.FALSE.equals(result.first)) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } @@ -2981,10 +2981,8 @@ private boolean checkCreateTableResult(String tableName, long tableId, Pair tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, - boolean isStorageMediumSpecified) + DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, + Set tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); @@ -3476,13 +3474,12 @@ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlEx String errMsg; if (dbPrev instanceof MysqlCompatibleDatabase || db instanceof MysqlCompatibleDatabase) { errMsg = String.format( - "Mysql compatibility problem, previous checkpoint already has a database with full name " - + "%s. If its name is mysql, try to add mysqldb_replace_name=\"mysql_comp\" in " - + "fe.conf.", - db.getFullName()); + "Mysql compatibility problem, previous checkpoint already has a database with full name " + + "%s. If its name is mysql, try to add mysqldb_replace_name=\"mysql_comp\" in fe.conf.", + db.getFullName()); } else { errMsg = String.format("Logical error, duplicated database fullname: %s, id: %d %d.", - db.getFullName(), db.getId(), fullNameToDb.get(db.getFullName()).getId()); + db.getFullName(), db.getId(), fullNameToDb.get(db.getFullName()).getId()); } throw new IOException(errMsg); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java index a6712456e8563d..6d677779639498 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java @@ -79,18 +79,17 @@ public List getUserByName(String name) { } public void checkPassword(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString, - List currentUser) throws AuthenticationException { + List currentUser) throws AuthenticationException { checkPasswordInternal(remoteUser, remoteHost, remotePasswd, randomString, null, currentUser, false); } public void checkPlainPassword(String remoteUser, String remoteHost, String remotePasswd, - List currentUser) throws AuthenticationException { + List currentUser) throws AuthenticationException { checkPasswordInternal(remoteUser, remoteHost, null, null, remotePasswd, currentUser, true); } private void checkPasswordInternal(String remoteUser, String remoteHost, byte[] remotePasswd, byte[] randomString, - String remotePasswdStr, List currentUser, boolean plain) - throws AuthenticationException { + String remotePasswdStr, List currentUser, boolean plain) throws AuthenticationException { PasswordPolicyManager passwdPolicyMgr = Env.getCurrentEnv().getAuth().getPasswdPolicyManager(); List users = nameToUsers.get(remoteUser); if (CollectionUtils.isEmpty(users)) { @@ -148,7 +147,7 @@ private String hasRemotePasswd(boolean plain, byte[] remotePasswd) { } private boolean comparePassword(Password curUserPassword, byte[] remotePasswd, - byte[] randomString, String remotePasswdStr, boolean plain) { + byte[] randomString, String remotePasswdStr, boolean plain) { // check password if (plain) { return MysqlPassword.checkPlainPass(curUserPassword.getPassword(), remotePasswdStr); @@ -183,7 +182,7 @@ public void clearEntriesSetByResolver() { } public User createUser(UserIdentity userIdent, byte[] pwd, UserIdentity domainUserIdent, boolean setByResolver, - String comment) + String comment) throws PatternMatcherException { if (userIdentityExist(userIdent, true)) { User userByUserIdentity = getUserByUserIdentity(userIdent);