From c6354a26d64f02d987d9d4b23b74cd8a0deab826 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 16 Jun 2025 17:58:33 +0800 Subject: [PATCH 1/9] Fix stuck when stopping a DataNode with large unremovable WAL files --- .../env/cluster/config/MppDataNodeConfig.java | 12 ++ .../env/cluster/node/AbstractNodeWrapper.java | 4 + .../remote/config/RemoteDataNodeConfig.java | 10 ++ .../iotdb/itbase/env/DataNodeConfig.java | 4 + .../apache/iotdb/db/it/IoTDBStopNodeIT.java | 146 ++++++++++++++++++ .../db/service/DataNodeShutdownHook.java | 1 - .../dataregion/wal/WALManager.java | 1 + 7 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 58ac4d596bcc2..b7fa3bdee2ce6 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -119,4 +119,16 @@ public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad)); return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize)); + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 3b6d840698190..0e33674fdead3 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -818,4 +818,8 @@ public long getPid() { return -1; } } + + public Process getInstance() { + return instance; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 63f50d15958d3..1f61ff4289cfc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -78,4 +78,14 @@ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) { public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 353fdef7f2532..4d2e4435bdbe4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -45,4 +45,8 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( DataNodeConfig setLoadLastCacheStrategy(String strategyName); DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad); + + DataNodeConfig setWalThrottleSize(long walThrottleSize); + + DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java new file mode 100644 index 0000000000000..05078f4c416e5 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBStopNodeIT { + + private final Logger logger = LoggerFactory.getLogger(IoTDBStopNodeIT.class); + + /** + * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try deleting wal forever, + * which will block the DataNode from exiting, because task of deleting wal submitted by the ShutdownHook cannot be + * executed. + * This test ensures that this blocking is fixed. + */ + @Test + public void testWalThrottleStuck() + throws SQLException, + IoTDBConnectionException, + StatementExecutionException, + InterruptedException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getDataNodeConfig() + .setWalThrottleSize(1) + .setDeleteWalFilesPeriodInMs(100); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + try { + simpleEnv.initClusterEnvironment(1, 3); + + int leaderIndex = -1; + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + // write the first data + statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + // find the leader of the data region + int port = -1; + + ResultSet resultSet = statement.executeQuery("SHOW REGIONS"); + while (resultSet.next()) { + String regionType = resultSet.getString("Type"); + if (regionType.equals("DataRegion")) { + String role = resultSet.getString("Role"); + if (role.equals("Leader")) { + port = resultSet.getInt("RpcPort"); + break; + } + } + } + + if (port == -1) { + fail("Leader not found"); + } + + for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) { + if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) { + leaderIndex = i; + break; + } + } + } + + // stop a follower + int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size(); + simpleEnv.getDataNodeWrapperList().get(followerIndex).stop(); + System.out.println( + "Stopping data node " + + simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString()); + + DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex); + // write to the leader to generate wal that cannot be synced + try (Session session = new Session(leader.getIp(), leader.getPort())) { + session.open(); + + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + } + + // wait for wal-delete thread to be scheduled + Thread.sleep(1000); + + // stop the leader + leader.getInstance().destroy(); + System.out.println("Stopping data node " + leader.getIpAndPortString()); + // confirm the death of the leader + long startTime = System.currentTimeMillis(); + while (leader.isAlive()) { + if (System.currentTimeMillis() - startTime > 30000) { + fail("Leader does not exit after 30s"); + } + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 9fb29e7c6124d..d3125fae0f9c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -77,7 +77,6 @@ public void run() { .equals(ConsensusFactory.RATIS_CONSENSUS)) { StorageEngine.getInstance().syncCloseAllProcessor(); } - WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); // We did this work because the RatisConsensus recovery mechanism is different from other // consensus algorithms, which will replace the underlying storage engine based on its diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 7d9f7bb467951..2c4244585e8a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -272,6 +272,7 @@ public void stop() { shutdownThread(walDeleteThread, ThreadName.WAL_DELETE); walDeleteThread = null; } + deleteOutdatedFilesInWALNodes(); clear(); } From bf208ac2cf04143998009b63437365a6023ece42 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 16 Jun 2025 18:14:46 +0800 Subject: [PATCH 2/9] spotless --- .../test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java index 05078f4c416e5..a50af5cb6457a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java @@ -48,10 +48,10 @@ public class IoTDBStopNodeIT { private final Logger logger = LoggerFactory.getLogger(IoTDBStopNodeIT.class); /** - * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try deleting wal forever, - * which will block the DataNode from exiting, because task of deleting wal submitted by the ShutdownHook cannot be - * executed. - * This test ensures that this blocking is fixed. + * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try + * deleting wal forever, which will block the DataNode from exiting, because task of deleting wal + * submitted by the ShutdownHook cannot be executed. This test ensures that this blocking is + * fixed. */ @Test public void testWalThrottleStuck() From fb4823e9100bb02931783c074c6b1d58e1ce16b5 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 17 Jun 2025 11:38:35 +0800 Subject: [PATCH 3/9] add shutdown hook watcher --- ...eIT.java => IoTDBCustomizedClusterIT.java} | 16 ++++++---- .../db/service/DataNodeShutdownHook.java | 29 +++++++++++++++++++ .../dataregion/wal/WALManager.java | 10 +++++-- 3 files changed, 47 insertions(+), 8 deletions(-) rename integration-test/src/test/java/org/apache/iotdb/db/it/{IoTDBStopNodeIT.java => IoTDBCustomizedClusterIT.java} (92%) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java similarity index 92% rename from integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java rename to integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index a50af5cb6457a..9b7a51328c43c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBStopNodeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -23,7 +23,6 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; -import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; @@ -38,14 +37,18 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Date; import static org.junit.Assert.fail; +/** + * Tests that may not be satisfied with the default cluster settings. + */ @RunWith(IoTDBTestRunner.class) -@Category({LocalStandaloneIT.class, ClusterIT.class}) -public class IoTDBStopNodeIT { +@Category({ClusterIT.class}) +public class IoTDBCustomizedClusterIT { - private final Logger logger = LoggerFactory.getLogger(IoTDBStopNodeIT.class); + private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); /** * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try @@ -111,7 +114,8 @@ public void testWalThrottleStuck() int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size(); simpleEnv.getDataNodeWrapperList().get(followerIndex).stop(); System.out.println( - "Stopping data node " + new Date() + + ":Stopping data node " + simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString()); DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex); @@ -131,7 +135,7 @@ public void testWalThrottleStuck() // stop the leader leader.getInstance().destroy(); - System.out.println("Stopping data node " + leader.getIpAndPortString()); + System.out.println(new Date() + ":Stopping data node " + leader.getIpAndPortString()); // confirm the death of the leader long startTime = System.currentTimeMillis(); while (leader.isAlive()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index d3125fae0f9c8..ffd524300c882 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -49,15 +49,42 @@ public class DataNodeShutdownHook extends Thread { private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class); private final TDataNodeLocation nodeLocation; + private Thread watcherThread; public DataNodeShutdownHook(TDataNodeLocation nodeLocation) { super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName()); this.nodeLocation = nodeLocation; } + private void startWatcher() { + Thread hookThread = Thread.currentThread(); + watcherThread = + new Thread( + () -> { + while (!Thread.interrupted()) { + try { + Thread.sleep(10000); + StackTraceElement[] stackTrace = hookThread.getStackTrace(); + StringBuilder stackTraceBuilder = + new StringBuilder("Stack trace of shutdown hook:\n"); + for (StackTraceElement traceElement : stackTrace) { + stackTraceBuilder.append(traceElement.toString()).append("\n"); + } + logger.info(stackTraceBuilder.toString()); + } catch (InterruptedException e) { + return; + } + } + }, + "ShutdownHookWatcher"); + watcherThread.setDaemon(true); + watcherThread.start(); + } + @Override public void run() { logger.info("DataNode exiting..."); + startWatcher(); // Stop external rpc service firstly. ExternalRPCService.getInstance().stop(); @@ -113,6 +140,8 @@ public void run() { "DataNode exits. Jvm memory usage: {}", MemUtils.bytesCntToStr( Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())); + + watcherThread.interrupt(); } private void triggerSnapshotForAllDataRegion() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 2c4244585e8a4..300cc10ad81bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -180,7 +180,7 @@ private void deleteOutdatedFiles() { // threshold, the system continues to delete expired files until the disk size is smaller than // the threshold. boolean firstLoop = true; - while ((firstLoop || shouldThrottle()) && !Thread.interrupted()) { + while ((firstLoop || shouldThrottle())) { deleteOutdatedFilesInWALNodes(); if (firstLoop && shouldThrottle()) { logger.warn( @@ -189,6 +189,10 @@ private void deleteOutdatedFiles() { getThrottleThreshold()); } firstLoop = false; + if (Thread.interrupted()) { + logger.info("Timed wal delete thread is interrupted."); + return; + } } } @@ -267,13 +271,15 @@ public void stop() { if (config.getWalMode() == WALMode.DISABLE) { return; } - + logger.info("Stopping WALManager"); if (walDeleteThread != null) { shutdownThread(walDeleteThread, ThreadName.WAL_DELETE); walDeleteThread = null; } + logger.info("Deleting outdated files before exiting"); deleteOutdatedFilesInWALNodes(); clear(); + logger.info("WALManager stopped"); } private void shutdownThread(ExecutorService thread, ThreadName threadName) { From c3253669c56f411c966a7a9ac2e3ffcb5bad1ea9 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 17 Jun 2025 16:00:02 +0800 Subject: [PATCH 4/9] Fix logDispatcher stuck --- .../org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java | 4 +--- .../iotdb/consensus/iot/logdispatcher/LogDispatcher.java | 2 +- .../apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 9b7a51328c43c..5812e255d1639 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -41,9 +41,7 @@ import static org.junit.Assert.fail; -/** - * Tests that may not be satisfied with the default cluster settings. - */ +/** Tests that may not be satisfied with the default cluster settings. */ @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) public class IoTDBCustomizedClusterIT { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index c3a0665be6adf..e196df432091a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -105,8 +105,8 @@ public synchronized void start() { public synchronized void stop() { if (!threads.isEmpty()) { threads.forEach(LogDispatcherThread::setStopped); - threads.forEach(LogDispatcherThread::processStopped); executorService.shutdownNow(); + threads.forEach(LogDispatcherThread::processStopped); int timeout = 10; try { if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java index fe00939050e1e..c5a426d88b8be 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java @@ -44,8 +44,9 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) { * @throws InterruptedException */ public synchronized void addNextBatch(Batch batch) throws InterruptedException { - while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() - || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) { + while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() + || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) + && !Thread.interrupted()) { wait(); } pendingBatches.add(batch); From 992cfdad3f17e1219a8495fe95be16bd7b5d8f7a Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Wed, 18 Jun 2025 09:58:49 +0800 Subject: [PATCH 5/9] add re-interrupt --- .../java/org/apache/iotdb/db/service/DataNodeShutdownHook.java | 1 + 1 file changed, 1 insertion(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index ffd524300c882..731c4b09da1bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -72,6 +72,7 @@ private void startWatcher() { } logger.info(stackTraceBuilder.toString()); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return; } } From fe2f1893cc5be6b93649ddce99ca82f105b145f2 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Fri, 20 Jun 2025 10:32:01 +0800 Subject: [PATCH 6/9] Enhance IT frame to run tests that restart the cluster --- .../apache/iotdb/it/env/cluster/EnvUtils.java | 82 +++++- .../iotdb/it/env/cluster/env/AbstractEnv.java | 235 ++++++++++++++---- .../env/cluster/node/AbstractNodeWrapper.java | 9 +- .../iotdb/db/it/IoTDBCustomizedClusterIT.java | 113 ++++++++- .../org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../iotdb/confignode/service/ConfigNode.java | 12 +- .../org/apache/iotdb/db/service/DataNode.java | 13 +- .../iotdb/commons/client/ClientManager.java | 5 + .../iotdb/commons/client/IClientManager.java | 3 + .../exception/PortOccupiedException.java | 31 +++ .../iotdb/commons/utils/StatusUtils.java | 14 ++ 11 files changed, 450 insertions(+), 69 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index f9315ac787f9e..d5143c5874234 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -24,9 +24,10 @@ import org.apache.commons.lang3.SystemUtils; import org.apache.tsfile.utils.Pair; -import java.io.File; -import java.io.IOException; +import java.io.*; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -98,15 +99,74 @@ public static int[] searchAvailablePorts() { } private static boolean checkPortsAvailable(final List ports) { - final String cmd = getSearchAvailablePortCmd(ports); - try { - return Runtime.getRuntime().exec(cmd).waitFor() == 1; - } catch (final IOException ignore) { - // ignore - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); + try { + return listPortOccupation(ports).isEmpty(); + } catch (IOException e) { + IoTDBTestLogger.logger.error("Cannot check available ports", e); + return false; + } + } + + public static Map listPortOccupation(final List ports) throws IOException { + return SystemUtils.IS_OS_WINDOWS ? listPortOccupationWindows(ports) : listPortOccupationUnix(ports); + } + + /** + * List occupied port and the associated pid on windows. + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map listPortOccupationWindows(final List ports) throws IOException { + Process process = Runtime.getRuntime().exec("netstat -aon -p tcp"); + Map result = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + String[] split = line.trim().split("\\s+"); + if (split.length != 5) { + continue; + } + String localAddress = split[1]; + for (Integer port : ports) { + if (localAddress.equals("127.0.0.1:" + port)) { + result.put(port, Long.parseLong(split[4])); + break; + } + } + + } + } catch (EOFException ignored) { + } + return result; + } + + /** + * List occupied port and the associated pid on Unix. + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map listPortOccupationUnix(final List ports) throws IOException { + Process process = Runtime.getRuntime().exec("lsof -iTCP -sTCP:LISTEN -P -n"); + Map result = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + String[] split = line.trim().split("\\s+"); + if (split.length != 10) { + continue; + } + String localAddress = split[9]; + for (Integer port : ports) { + if (localAddress.equals("*:" + port)) { + result.put(port, Long.parseLong(split[1])); + break; + } + } + + } + } catch (EOFException ignored) { } - return false; + return result; } private static String getSearchAvailablePortCmd(final List ports) { @@ -115,7 +175,7 @@ private static String getSearchAvailablePortCmd(final List ports) { private static String getWindowsSearchPortCmd(final List ports) { return "netstat -aon -p tcp | findStr " - + ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" ")); + + ports.stream().map(v -> "/C:\"127.0.0.1:" + v + "\"").collect(Collectors.joining(" ")); } private static String getUnixSearchPortCmd(final List ports) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index fa2cb85d19191..d1b1bec862934 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -21,11 +21,13 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.exception.PortOccupiedException; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; @@ -78,13 +80,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -223,25 +219,9 @@ protected void initEnvironment( final RequestDelegate configNodesDelegate = new SerialRequestDelegate<>(configNodeEndpoints); for (int i = 1; i < configNodesNum; i++) { - final ConfigNodeWrapper configNodeWrapper = - new ConfigNodeWrapper( - false, - seedConfigNode, - testClassName, - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); + ConfigNodeWrapper configNodeWrapper = newConfigNode(); this.configNodeWrapperList.add(configNodeWrapper); configNodeEndpoints.add(configNodeWrapper.getIpAndPortString()); - configNodeWrapper.createNodeDir(); - configNodeWrapper.changeConfig( - (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(), - (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(), - (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig()); - configNodeWrapper.createLogDir(); - configNodeWrapper.setKillPoints(configNodeKillPoints); configNodesDelegate.addRequest( () -> { configNodeWrapper.start(); @@ -259,24 +239,9 @@ protected void initEnvironment( final RequestDelegate dataNodesDelegate = new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT); for (int i = 0; i < dataNodesNum; i++) { - final DataNodeWrapper dataNodeWrapper = - new DataNodeWrapper( - seedConfigNode, - testClassName, - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); - this.dataNodeWrapperList.add(dataNodeWrapper); + DataNodeWrapper dataNodeWrapper = newDataNode(); dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString()); - dataNodeWrapper.createNodeDir(); - dataNodeWrapper.changeConfig( - (MppDataNodeConfig) clusterConfig.getDataNodeConfig(), - (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(), - (MppJVMConfig) clusterConfig.getDataNodeJVMConfig()); - dataNodeWrapper.createLogDir(); - dataNodeWrapper.setKillPoints(dataNodeKillPoints); + this.dataNodeWrapperList.add(dataNodeWrapper); dataNodesDelegate.addRequest( () -> { dataNodeWrapper.start(); @@ -299,6 +264,49 @@ protected void initEnvironment( checkClusterStatusWithoutUnknown(); } + private ConfigNodeWrapper newConfigNode() { + final ConfigNodeWrapper configNodeWrapper = + new ConfigNodeWrapper( + false, + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + configNodeWrapper.createNodeDir(); + configNodeWrapper.changeConfig( + (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(), + (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(), + (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig()); + configNodeWrapper.createLogDir(); + configNodeWrapper.setKillPoints(configNodeKillPoints); + return configNodeWrapper; + } + + private DataNodeWrapper newDataNode() { + final DataNodeWrapper dataNodeWrapper = + new DataNodeWrapper( + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + dataNodeWrapper.createNodeDir(); + dataNodeWrapper.changeConfig( + (MppDataNodeConfig) clusterConfig.getDataNodeConfig(), + (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(), + (MppJVMConfig) clusterConfig.getDataNodeJVMConfig()); + dataNodeWrapper.createLogDir(); + dataNodeWrapper.setKillPoints(dataNodeKillPoints); + return dataNodeWrapper; + } + private void startAINode(final String seedConfigNode, final String testClassName) { final String aiNodeEndPoint; final AINodeWrapper aiNodeWrapper = @@ -380,16 +388,33 @@ public void checkClusterStatus(final Predicate> statusCheck logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; - boolean flag; + boolean passed; + boolean showClusterPassed = true; + boolean nodeSizePassed = true; + boolean nodeStatusPassed = true; + boolean processStatusPassed = true; + TSStatus showClusterStatus = null; + int actualNodeSize = 0; + Map lastNodeStatus = null; + Map processStatusMap = new HashMap<>(); + for (int i = 0; i < retryCount; i++) { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { - flag = true; + passed = true; + showClusterPassed = true; + nodeSizePassed = true; + nodeStatusPassed = true; + processStatusPassed = true; + processStatusMap.clear(); + showClusterResp = client.showCluster(); // Check resp status if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - flag = false; + passed = false; + showClusterPassed = false; + showClusterStatus = showClusterResp.getStatus(); } // Check the number of nodes @@ -397,18 +422,61 @@ public void checkClusterStatus(final Predicate> statusCheck != configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) { - flag = false; + passed = false; + nodeSizePassed = false; + actualNodeSize = showClusterResp.getNodeStatusSize(); } // Check the status of nodes - if (flag) { - flag = statusCheck.test(showClusterResp.getNodeStatus()); + if (passed) { + passed = statusCheck.test(showClusterResp.getNodeStatus()); + if (!passed) { + nodeStatusPassed = false; + lastNodeStatus = showClusterResp.getNodeStatus(); + } + } + + // check the status of processes + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + boolean alive = dataNodeWrapper.getInstance().isAlive(); + if (!alive) { + passed = false; + processStatusPassed = false; + processStatusMap.put(dataNodeWrapper, dataNodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(dataNodeWrapper, 0); + } + } + for (ConfigNodeWrapper nodeWrapper : configNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + passed = false; + processStatusPassed = false; + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } + } + for (AINodeWrapper nodeWrapper : aiNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + passed = false; + processStatusPassed = false; + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } } - if (flag) { + if (!processStatusPassed) { + handleProcessStatus(processStatusMap); + } + + if (passed) { logger.info("The cluster is now ready for testing!"); return; } + logger.info("Retry {}: showClusterPassed={}, nodeSizePassed={}, nodeStatusPassed={}, processStatusPassed={}", i, showClusterPassed, nodeSizePassed, nodeStatusPassed, processStatusPassed); } catch (final Exception e) { lastException = e; } @@ -425,10 +493,73 @@ public void checkClusterStatus(final Predicate> statusCheck lastException.getMessage(), lastException); } + if (!showClusterPassed) { + logger.error("Show cluster failed: {}", showClusterStatus); + } + if (!nodeSizePassed) { + logger.error("Only {} nodes detected", actualNodeSize); + } + if (!nodeStatusPassed) { + logger.error("Some node status incorrect: {}", lastNodeStatus); + } + if (!processStatusPassed) { + logger.error( + "Some process status incorrect: {}", + processStatusMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getId(), Map.Entry::getValue))); + + if (processStatusMap.containsValue(TSStatusCode.PORT_OCCUPIED.getStatusCode())) { + throw new PortOccupiedException(); + } + } + throw new AssertionError( String.format("After %d times retry, the cluster can't work!", retryCount)); } + private void handleProcessStatus(Map processStatusMap) { + for (Map.Entry entry : processStatusMap.entrySet()) { + Integer statusCode = entry.getValue(); + AbstractNodeWrapper nodeWrapper = entry.getKey(); + if(statusCode != 0) { + logger.info("Node {} is not running due to {}", nodeWrapper.getId(), statusCode); + } + if (statusCode == TSStatusCode.PORT_OCCUPIED.getStatusCode()) { + try { + Map portOccupationMap = EnvUtils. + listPortOccupation(Arrays.stream(nodeWrapper.getPortList()).boxed().collect(Collectors.toList())); + logger.info("Check port result: {}", portOccupationMap); + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + if (portOccupationMap.containsValue(dataNodeWrapper.getPid())) { + logger.info("A port is occupied by another DataNode {}-{}, restart it", dataNodeWrapper.getIpAndPortString(), dataNodeWrapper.getPid()); + dataNodeWrapper.stop(); + dataNodeWrapper.start(); + } + } + for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { + if (portOccupationMap.containsValue(configNodeWrapper.getPid())) { + logger.info("A port is occupied by another ConfigNode {}-{}, restart it", configNodeWrapper.getIpAndPortString(), configNodeWrapper.getPid()); + configNodeWrapper.stop(); + configNodeWrapper.start(); + } + } + for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) { + if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) { + logger.info("A port is occupied by another datanode {}-{}, restart it", aiNodeWrapper.getIpAndPortString(), aiNodeWrapper.getPid()); + aiNodeWrapper.stop(); + aiNodeWrapper.start(); + } + } + } catch (IOException e) { + logger.error("Cannot check port occupation", e); + } + logger.info("Restarting it"); + nodeWrapper.stop(); + nodeWrapper.start(); + } + } + } + @Override public void cleanClusterEnvironment() { final List allNodeWrappers = @@ -890,6 +1021,10 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() for (int i = 0; i < retryCount; i++) { for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { + if (!configNodeWrapper.getInstance().isAlive()) { + throw new IOException("ConfigNode " + configNodeWrapper.getId() + " is not alive"); + } + lastErrorNode = configNodeWrapper; final SyncConfigNodeIServiceClient client = clientManager.borrowClient( @@ -1335,4 +1470,8 @@ public void registerConfigNodeKillPoints(final List killPoints) { public void registerDataNodeKillPoints(final List killPoints) { this.dataNodeKillPoints = killPoints; } + + public void clearClientManager() { + clientManager.clearAll(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 0e33674fdead3..27c688dc32842 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -546,7 +546,10 @@ public void stop() { this.instance.destroy(); try { if (!this.instance.waitFor(20, TimeUnit.SECONDS)) { - this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS); + logger.warn("Node {} does not exit within 20s, killing it", getId()); + if (!this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS)) { + logger.error("Cannot forcibly stop node {}", getId()); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -822,4 +825,8 @@ public long getPid() { public Process getInstance() { return instance; } + + public int[] getPortList() { + return portList; + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 5812e255d1639..2c5996a83c9b7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.it; +import org.apache.iotdb.commons.exception.PortOccupiedException; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.DailyIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; @@ -33,12 +36,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.Date; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** Tests that may not be satisfied with the default cluster settings. */ @@ -48,6 +49,110 @@ public class IoTDBCustomizedClusterIT { private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); + @FunctionalInterface + private interface RestartAction { + void act(Statement statement, int round, AbstractEnv env) throws Exception; + } + + @Category(DailyIT.class) + @Test + public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { + testRepeatedlyRestartWholeCluster((s, i, env) -> { + if (i != 0) { + ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**"); + ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(4, metaData.getColumnCount()); + int cnt = 0; + while (resultSet.next()) { + cnt ++; + StringBuilder result = new StringBuilder(); + for (int j = 0; j < metaData.getColumnCount(); j++) { + result.append(metaData.getColumnName(j + 1)).append(":").append(resultSet.getString(j + 1)).append(","); + } + System.out.println(result); + } + } + s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)"); + s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)"); + s.execute("INSERT INTO root.db3.d1 (time, s1) VALUES (1, 1)"); + }); + } + + @Category(DailyIT.class) + @Test + public void testRepeatedlyRestartWholeClusterWithPipeCreation() throws Exception { + SimpleEnv receiverEnv = new SimpleEnv(); + receiverEnv.initClusterEnvironment(1, 1); + try { + testRepeatedlyRestartWholeCluster((s, i, env) -> { + // use another thread to make creating and restart concurrent + // otherwise, all tasks will be done before restart and the cluster will not attempt to recover tasks + s.execute(String.format("CREATE PIPE p%d_1 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + s.execute(String.format("CREATE PIPE p%d_2 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + s.execute(String.format("CREATE PIPE p%d_3 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + }); + } finally { + receiverEnv.cleanClusterEnvironment(); + } + } + + + private void testRepeatedlyRestartWholeCluster(RestartAction restartAction) throws Exception { + SimpleEnv simpleEnv = new SimpleEnv(); + try { + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setConfigNodeConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + simpleEnv.initClusterEnvironment(3, 3); + + int repeat = 100; + for (int i = 0; i < repeat; i++) { + logger.info("Round {} restart", i); + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery("SHOW CLUSTER"); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (resultSet.next()) { + StringBuilder row = new StringBuilder(); + for (int j = 0; j < columnCount; j++) { + row.append(metaData.getColumnName(j + 1)) + .append(":") + .append(resultSet.getString(j + 1)) + .append(","); + } + System.out.println(row); + } + + restartAction.act(statement, i, simpleEnv); + } + + simpleEnv.shutdownAllConfigNodes(); + simpleEnv.shutdownAllDataNodes(); + + simpleEnv.startAllConfigNodes(); + simpleEnv.startAllDataNodes(); + + simpleEnv.clearClientManager(); + + try { + simpleEnv.checkClusterStatusWithoutUnknown(); + } catch (PortOccupiedException e) { + logger.info("Some ports are occupied during restart, which cannot be processed, just pass the test."); + return; + } + } + + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } + /** * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try * deleting wal forever, which will block the DataNode from exiting, because task of deleting wal diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 3f7eb8f6507b7..6359eafde5c53 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -40,6 +40,8 @@ public enum TSStatusCode { UNSUPPORTED_SQL_DIALECT(205), + PORT_OCCUPIED(206), + // General Error UNSUPPORTED_OPERATION(300), EXECUTE_STATEMENT_ERROR(301), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 7f0db7a37840b..39612202720b6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.cpu.CpuUsageMetrics; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.client.CnToCnNodeRequestType; import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool; @@ -67,6 +68,7 @@ import org.apache.iotdb.metrics.metricsets.system.SystemMetrics; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +108,8 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { protected ConfigManager configManager; + private int exitStatusCode = 0; + public ConfigNode() { super("ConfigNode"); // We do not init anything here, so that we can re-initialize the instance in IT. @@ -121,6 +125,8 @@ public static void main(String[] args) throws Exception { "{} default charset is: {}", ConfigNodeConstant.GLOBAL_NAME, Charset.defaultCharset().displayName()); + // let IoTDB handle the exception instead of ratis + ExitUtils.disableSystemExit(); ConfigNode configNode = new ConfigNode(); int returnCode = configNode.run(args); if (returnCode != 0) { @@ -140,6 +146,7 @@ protected void start() throws IoTDBException { throw new IoTDBException("Error starting", -1); } active(); + LOGGER.info("IoTDB started"); } @Override @@ -266,8 +273,9 @@ public void active() { "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect."); stop(); } - } catch (StartupException | IOException | IllegalPathException e) { + } catch (Throwable e) { LOGGER.error("Meet error while starting up.", e); + exitStatusCode = StatusUtils.retrieveExitStatusCode(e); stop(); } } @@ -467,7 +475,7 @@ public void stop() { } catch (IOException e) { LOGGER.error("Meet error when deactivate ConfigNode", e); } - System.exit(-1); + System.exit(exitStatusCode); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 714edde57c2a4..55268d72ab253 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -114,6 +114,7 @@ import org.apache.iotdb.udf.api.exception.UDFManagementException; import org.antlr.v4.runtime.CommonTokenStream; +import org.apache.ratis.util.ExitUtils; import org.apache.thrift.TException; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -134,6 +135,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME; +import static org.apache.iotdb.commons.utils.StatusUtils.retrieveExitStatusCode; import static org.apache.iotdb.db.conf.IoTDBStartCheck.PROPERTIES_FILE_NAME; public class DataNode extends ServerCommandLine implements DataNodeMBean { @@ -193,6 +195,8 @@ public static DataNode getInstance() { public static void main(String[] args) { logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables()); logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName()); + // let IoTDB handle the exception instead of ratis + ExitUtils.disableSystemExit(); DataNode dataNode = new DataNode(); int returnCode = dataNode.run(args); if (returnCode != 0) { @@ -202,6 +206,7 @@ public static void main(String[] args) { @Override protected void start() { + logger.info("Starting DataNode..."); boolean isFirstStart; try { // Check if this DataNode is start for the first time and do other pre-checks @@ -274,11 +279,13 @@ protected void start() { dataRegionConsensusStarted = true; } - } catch (StartupException | IOException e) { + } catch (Throwable e) { + int exitStatusCode = retrieveExitStatusCode(e); logger.error("Fail to start server", e); stop(); - System.exit(-1); + System.exit(exitStatusCode); } + logger.info("DataNode started"); } @Override @@ -683,7 +690,7 @@ private void active() throws StartupException { setUp(); } catch (StartupException e) { logger.error("Meet error while starting up.", e); - throw new StartupException("Error in activating IoTDB DataNode."); + throw e; } logger.info("IoTDB DataNode has started."); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 56bc67b63993e..79fcc799ae93c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -88,6 +88,11 @@ public void clear(K node) { }); } + @Override + public void clearAll() { + pool.clear(); + } + @Override public void close() { pool.close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java index 81344e46719f4..cba9b840740de 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -44,6 +44,9 @@ public interface IClientManager { */ void clear(K node); + /** clear all clients; */ + void clearAll(); + /** close IClientManager, which means closing all clients for all nodes. */ void close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java new file mode 100644 index 0000000000000..0e0453d267b34 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.exception; + +import java.util.Arrays; + +public class PortOccupiedException extends RuntimeException{ + public PortOccupiedException() { + super("Some ports are occupied"); + } + + public PortOccupiedException(int... ports) { + super(String.format("Ports %s are occupied", Arrays.toString(ports))); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index d05fb8c9d7e2f..ac64699b9cf46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -25,6 +25,8 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.util.ExitUtils; + import java.util.Arrays; import java.util.HashSet; import java.util.Map; @@ -248,4 +250,16 @@ private static boolean needRetryHelperForSingleStatus(int statusCode) { public static boolean isUnknownError(int statusCode) { return UNKNOWN_ERRORS.contains(statusCode); } + + public static int retrieveExitStatusCode(Throwable e) { + if (e instanceof ExitUtils.ExitException && e.getCause() != null) { + e = e.getCause(); + } + if (e.getMessage().contains("because Could not create ServerSocket") + || e.getMessage().contains("Failed to bind to address") + || e.getMessage().contains("Address already in use: bind")) { + return TSStatusCode.PORT_OCCUPIED.getStatusCode(); + } + return -1; + } } From 80141cf96f0f6174b03c05e3d00c1ca4c4841324 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Fri, 20 Jun 2025 11:57:00 +0800 Subject: [PATCH 7/9] code refinement --- .../apache/iotdb/it/env/cluster/EnvUtils.java | 56 ++++++++----------- .../iotdb/it/env/cluster/env/AbstractEnv.java | 2 +- 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index 4372d8a04e7c8..a2fe36a3ed5ab 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -114,28 +114,27 @@ public static Map listPortOccupation(final List ports) : listPortOccupationUnix(ports); } - /** - * List occupied port and the associated pid on windows. - * - * @param ports ports to be checked - * @return (occupiedPort, pid) pairs - */ - public static Map listPortOccupationWindows(final List ports) + public static Map listPortOccupation( + final List ports, + String cmd, + int targetColumnLength, + int addressColumnIndex, + int pidColumnIndex) throws IOException { - Process process = Runtime.getRuntime().exec("netstat -aon -p tcp"); + Process process = Runtime.getRuntime().exec(cmd); Map result = new HashMap<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = reader.readLine()) != null) { String[] split = line.trim().split("\\s+"); - if (split.length != 5) { + if (split.length != targetColumnLength) { continue; } - String localAddress = split[1]; + String localAddress = split[addressColumnIndex]; for (Integer port : ports) { - if (localAddress.equals("127.0.0.1:" + port)) { - result.put(port, Long.parseLong(split[4])); + if (localAddress.endsWith(":" + port)) { + result.put(port, Long.parseLong(split[pidColumnIndex])); break; } } @@ -145,6 +144,17 @@ public static Map listPortOccupationWindows(final List p return result; } + /** + * List occupied port and the associated pid on windows. + * + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map listPortOccupationWindows(final List ports) + throws IOException { + return listPortOccupation(ports, "netstat -aon -p tcp", 5, 1, 4); + } + /** * List occupied port and the associated pid on Unix. * @@ -153,27 +163,7 @@ public static Map listPortOccupationWindows(final List p */ public static Map listPortOccupationUnix(final List ports) throws IOException { - Process process = Runtime.getRuntime().exec("lsof -iTCP -sTCP:LISTEN -P -n"); - Map result = new HashMap<>(); - try (BufferedReader reader = - new BufferedReader(new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - String[] split = line.trim().split("\\s+"); - if (split.length != 10) { - continue; - } - String localAddress = split[9]; - for (Integer port : ports) { - if (localAddress.equals("*:" + port)) { - result.put(port, Long.parseLong(split[1])); - break; - } - } - } - } catch (EOFException ignored) { - } - return result; + return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN -P -n", 10, 9, 1); } private static String getSearchAvailablePortCmd(final List ports) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 8c12e90d7cda2..a7a935b1dc514 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -559,7 +559,7 @@ private void handleProcessStatus(Map processStatus for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) { if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) { logger.info( - "A port is occupied by another datanode {}-{}, restart it", + "A port is occupied by another AINode {}-{}, restart it", aiNodeWrapper.getIpAndPortString(), aiNodeWrapper.getPid()); aiNodeWrapper.stop(); From 307fe065723598a0945ce3721f665bb2e4134f38 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 23 Jun 2025 10:33:00 +0800 Subject: [PATCH 8/9] modify test configuration --- .../java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 9f4172afce458..07517324e1f39 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -24,7 +24,6 @@ import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.DailyIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; @@ -48,7 +47,6 @@ /** Tests that may not be satisfied with the default cluster settings. */ @RunWith(IoTDBTestRunner.class) -@Category({ClusterIT.class}) public class IoTDBCustomizedClusterIT { private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); From 274394d1f315b5f9c1482a12aa281ac734516aa3 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 24 Jun 2025 11:43:06 +0800 Subject: [PATCH 9/9] fix test --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index a7a935b1dc514..e68a6d550c28b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -359,12 +359,14 @@ private Map countNodeStatus(final Map nodeStat } public void checkNodeInStatus(int nodeId, NodeStatus expectation) { - checkClusterStatus(nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId))); + checkClusterStatus( + nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId)), m -> true); } public void checkClusterStatusWithoutUnknown() { checkClusterStatus( - nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); + nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals), + processStatus -> processStatus.values().stream().noneMatch(i -> i != 0)); testJDBCConnection(); } @@ -374,6 +376,10 @@ public void checkClusterStatusOneUnknownOtherRunning() { Map count = countNodeStatus(nodeStatus); return count.getOrDefault("Unknown", 0) == 1 && count.getOrDefault("Running", 0) == nodeStatus.size() - 1; + }, + processStatus -> { + long aliveProcessCount = processStatus.values().stream().filter(i -> i == 0).count(); + return aliveProcessCount == processStatus.size() - 1; }); testJDBCConnection(); } @@ -382,9 +388,11 @@ public void checkClusterStatusOneUnknownOtherRunning() { * check whether all nodes' status match the provided predicate with RPC. after retryCount times, * if the status of all nodes still not match the predicate, throw AssertionError. * - * @param statusCheck the predicate to test the status of nodes + * @param nodeStatusCheck the predicate to test the status of nodes */ - public void checkClusterStatus(final Predicate> statusCheck) { + public void checkClusterStatus( + final Predicate> nodeStatusCheck, + final Predicate> processStatusCheck) { logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; @@ -429,7 +437,7 @@ public void checkClusterStatus(final Predicate> statusCheck // Check the status of nodes if (passed) { - passed = statusCheck.test(showClusterResp.getNodeStatus()); + passed = nodeStatusCheck.test(showClusterResp.getNodeStatus()); if (!passed) { nodeStatusPassed = false; lastNodeStatus = showClusterResp.getNodeStatus(); @@ -440,8 +448,6 @@ public void checkClusterStatus(final Predicate> statusCheck for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { boolean alive = dataNodeWrapper.getInstance().isAlive(); if (!alive) { - passed = false; - processStatusPassed = false; processStatusMap.put(dataNodeWrapper, dataNodeWrapper.getInstance().waitFor()); } else { processStatusMap.put(dataNodeWrapper, 0); @@ -450,8 +456,6 @@ public void checkClusterStatus(final Predicate> statusCheck for (ConfigNodeWrapper nodeWrapper : configNodeWrapperList) { boolean alive = nodeWrapper.getInstance().isAlive(); if (!alive) { - passed = false; - processStatusPassed = false; processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); } else { processStatusMap.put(nodeWrapper, 0); @@ -460,14 +464,17 @@ public void checkClusterStatus(final Predicate> statusCheck for (AINodeWrapper nodeWrapper : aiNodeWrapperList) { boolean alive = nodeWrapper.getInstance().isAlive(); if (!alive) { - passed = false; - processStatusPassed = false; processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); } else { processStatusMap.put(nodeWrapper, 0); } } + processStatusPassed = processStatusCheck.test(processStatusMap); + if (!processStatusPassed) { + passed = false; + } + if (!processStatusPassed) { handleProcessStatus(processStatusMap); }