From c2ce6c7e07848f0d557001fe8599532d1e1375b6 Mon Sep 17 00:00:00 2001 From: ChandraSekhar K Date: Wed, 17 Apr 2024 21:00:57 +0530 Subject: [PATCH 1/6] HBASE-27126 Support multi-threads cleaner for MOB files --- .../apache/hadoop/hbase/master/HMaster.java | 4 + .../apache/hadoop/hbase/mob/MobConstants.java | 6 + .../hadoop/hbase/mob/MobFileCleanerChore.java | 81 +++-- .../mob/TestExpiredMobFileCleanerChore.java | 279 ++++++++++++++++++ 4 files changed, 351 insertions(+), 19 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6f235b2156f3..849ddb457d7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -4569,4 +4569,8 @@ protected String getDescription() { } }); } + + public MobFileCleanerChore getMobFileCleanerChore() { + return mobFileCleanerChore; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index 5efea69788ca..e8332cd0a014 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -144,6 +144,12 @@ public final class MobConstants { public static final String MOB_COMPACTION_THREADS_MAX = "hbase.mob.compaction.threads.max"; public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1; + public static final String MOB_CLEANER_THREAD_COUNT = "hbase.master.mob.cleaner.threads"; + public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1; + public static final String MOB_FILE_CLEANER_CHORE_TIME_OUT = + "hbase.master.mob.cleaner.chore.timeout"; + public static final int DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT = 5 * 60; // 5 minutes + private MobConstants() { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index 3144b71f11e9..056cfb5ea79b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -17,9 +17,20 @@ */ package org.apache.hadoop.hbase.mob; +import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT; +import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT; + import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableDescriptors; @@ -31,6 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete * (files which have no active references to) mob files. @@ -39,8 +53,11 @@ public class MobFileCleanerChore extends ScheduledChore { private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class); + private final HMaster master; - private ExpiredMobFileCleaner cleaner; + private final ExpiredMobFileCleaner cleaner; + private final ExecutorService threadPool; + private final int cleanerFutureTimeout; public MobFileCleanerChore(HMaster master) { super(master.getServerName() + "-MobFileCleanerChore", master, @@ -52,7 +69,19 @@ public MobFileCleanerChore(HMaster master) { this.master = master; cleaner = new ExpiredMobFileCleaner(); cleaner.setConf(master.getConfiguration()); + int threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, + MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); + + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build(); + if (threadCount == 1) { + threadPool = MoreExecutors.newDirectExecutorService(); + } else { + threadPool = Executors.newFixedThreadPool(threadCount, threadFactory); + } checkObsoleteConfigurations(); + cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT, + DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT); } private void checkObsoleteConfigurations() { @@ -83,29 +112,43 @@ protected void chore() { LOG.error("MobFileCleanerChore failed", e); return; } + List futureList = new ArrayList<>(map.size()); for (TableDescriptor htd : map.values()) { - for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { - if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { - try { - cleaner.cleanExpiredMobFiles(htd, hcd); - } catch (IOException e) { - LOG.error("Failed to clean the expired mob files table={} family={}", - htd.getTableName().getNameAsString(), hcd.getNameAsString(), e); - } - } - } + Future future = threadPool.submit(() -> handleOneTable(htd)); + futureList.add(future); + } + + for (Future future : futureList) { try { - // Now clean obsolete files for a table - LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); - try (final Admin admin = master.getConnection().getAdmin()) { - MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(), - admin); + future.get(cleanerFutureTimeout, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Exception during the execution of MobFileCleanerChore", e); + } + } + } + + private void handleOneTable(TableDescriptor htd) { + for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { + if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { + try { + cleaner.cleanExpiredMobFiles(htd, hcd); + } catch (IOException e) { + LOG.error("Failed to clean the expired mob files table={} family={}", + htd.getTableName().getNameAsString(), hcd.getNameAsString(), e); } - LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); - } catch (IOException e) { - LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e); } } + try { + // Now clean obsolete files for a table + LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); + try (final Admin admin = master.getConnection().getAdmin()) { + MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(), + admin); + } + LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); + } catch (IOException e) { + LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java new file mode 100644 index 000000000000..934d78528103 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UPPER_BOUND; +import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT; +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; + +@Category(MediumTests.class) +@RunWith(Enclosed.class) +public class TestExpiredMobFileCleanerChore { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner"); + private final static TableName tableName2 = TableName.valueOf("TestExpiredMobFileCleaner2"); + private final static String family = "family"; + private final static byte[] row1 = Bytes.toBytes("row1"); + private final static byte[] row2 = Bytes.toBytes("row2"); + private final static byte[] row3 = Bytes.toBytes("row3"); + private final static byte[] qf = Bytes.toBytes("qf"); + + private static BufferedMutator table; + private static Admin admin; + private static BufferedMutator table2; + + @Category(MediumTests.class) + public static class SingleThreadTest { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1); + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.disableTable(tableName2); + admin.deleteTable(tableName2); + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCleaner() throws Exception { + testCleanerInternal(); + } + } + + @Category(MediumTests.class) + public static class MultiThreadTest { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2); + + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.disableTable(tableName2); + admin.deleteTable(tableName2); + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCleaner() throws Exception { + testCleanerInternal(); + } + } + + private static void init() throws Exception { + TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); + ColumnFamilyDescriptor columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true) + .setMobThreshold(3L).setMaxVersions(4).build(); + tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); + + admin = TEST_UTIL.getAdmin(); + admin.createTable(tableDescriptorBuilder.build()); + + table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) + .getBufferedMutator(tableName); + + TableDescriptorBuilder tableDescriptorBuilder2 = TableDescriptorBuilder.newBuilder(tableName2); + ColumnFamilyDescriptor columnFamilyDescriptor2 = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true) + .setMobThreshold(3L).setMaxVersions(4).build(); + tableDescriptorBuilder2.setColumnFamily(columnFamilyDescriptor2); + admin.createTable(tableDescriptorBuilder2.build()); + + table2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) + .getBufferedMutator(tableName2); + } + + private static void modifyColumnExpiryDays(int expireDays) throws Exception { + + // change ttl as expire days to make some row expired + int timeToLive = expireDays * secondsOfDay(); + ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder + .newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L); + columnFamilyDescriptorBuilder.setTimeToLive(timeToLive); + + admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build()); + + ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder2 = ColumnFamilyDescriptorBuilder + .newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L); + columnFamilyDescriptorBuilder2.setTimeToLive(timeToLive); + + admin.modifyColumnFamily(tableName2, columnFamilyDescriptorBuilder2.build()); + } + + private static void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts, + TableName tableName) throws Exception { + + Put put = new Put(row, ts); + put.addColumn(Bytes.toBytes(family), qf, value); + table.mutate(put); + + table.flush(); + admin.flush(tableName); + } + + /** + * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days. Verifies that the + * 3 day old hfile is removed but the 1 day one is still present after the expiry based cleaner is + * run. + */ + + public static void testCleanerInternal() throws Exception { + init(); + + Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + + byte[] dummyData = makeDummyData(600); + long ts = EnvironmentEdgeManager.currentTime() - 3 * secondsOfDay() * 1000; // 3 days before + putKVAndFlush(table, row1, dummyData, ts, tableName); + FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + // the first mob file + assertEquals("Before cleanup without delay 1", 1, firstFiles.length); + String firstFile = firstFiles[0].getPath().getName(); + + // 1.5 day before + ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000); + putKVAndFlush(table, row2, dummyData, ts, tableName); + FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + // now there are 2 mob files + assertEquals("Before cleanup without delay 2", 2, secondFiles.length); + String f1 = secondFiles[0].getPath().getName(); + String f2 = secondFiles[1].getPath().getName(); + String secondFile = f1.equals(firstFile) ? f2 : f1; + + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table, row3, dummyData, ts, tableName); + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table, row3, dummyData, ts, tableName); + FileStatus[] thirdFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + // now there are 4 mob files + assertEquals("Before cleanup without delay 3", 4, thirdFiles.length); + + // modifyColumnExpiryDays(2); // ttl = 2, make the first row expired + + // for table 2 + Path mobDirPath2 = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName2, family); + + byte[] dummyData2 = makeDummyData(600); + + putKVAndFlush(table2, row1, dummyData2, ts, tableName2); + FileStatus[] firstFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + // the first mob file + assertEquals("Before cleanup without delay 1", 1, firstFiles2.length); + String firstFile2 = firstFiles2[0].getPath().getName(); + + // 1.5 day before + ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000); + putKVAndFlush(table2, row2, dummyData2, ts, tableName2); + FileStatus[] secondFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + // now there are 2 mob files + assertEquals("Before cleanup without delay 2", 2, secondFiles2.length); + String f1Second = secondFiles2[0].getPath().getName(); + String f2Second = secondFiles2[1].getPath().getName(); + String secondFile2 = f1Second.equals(firstFile2) ? f2Second : f1Second; + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table2, row3, dummyData2, ts, tableName2); + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table2, row3, dummyData2, ts, tableName2); + FileStatus[] thirdFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + // now there are 4 mob files + assertEquals("Before cleanup without delay 3", 4, thirdFiles2.length); + + modifyColumnExpiryDays(2); // ttl = 2, make the first row expired + + // run the cleaner chore + MobFileCleanerChore mobFileCleanerChore = + TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore(); + mobFileCleanerChore.chore(); + + FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + String lastFile = filesAfterClean[0].getPath().getName(); + // there are 4 mob files in total, but only 3 need to be cleaned + assertEquals("After cleanup without delay 1", 1, filesAfterClean.length); + assertEquals("After cleanup without delay 2", secondFile, lastFile); + + filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + lastFile = filesAfterClean[0].getPath().getName(); + // there are 4 mob files in total, but only 3 need to be cleaned + assertEquals("After cleanup without delay 1", 1, filesAfterClean.length); + assertEquals("After cleanup without delay 2", secondFile2, lastFile); + } + + private static int secondsOfDay() { + return 24 * 3600; + } + + private static byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + Bytes.random(dummyData); + return dummyData; + } +} From c5521ef7872da917d1f6439cc3acc66bbd850459 Mon Sep 17 00:00:00 2001 From: chandrasekhar-188k <154109917+chandrasekhar-188k@users.noreply.github.com> Date: Tue, 2 Sep 2025 17:12:05 +0530 Subject: [PATCH 2/6] Fix UT Failures --- .../mob/TestExpiredMobFileCleanerChore.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java index 934d78528103..53b9d8157e59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java @@ -45,12 +45,9 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; -@Category(MediumTests.class) @RunWith(Enclosed.class) +@SuppressWarnings("JUnit4TestNotRun") public class TestExpiredMobFileCleanerChore { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class); private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner"); @@ -66,7 +63,12 @@ public class TestExpiredMobFileCleanerChore { private static BufferedMutator table2; @Category(MediumTests.class) - public static class SingleThreadTest { + @SuppressWarnings("JUnit4TestNotRun") + public static class TestWithSingleThread { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWithSingleThread.class); + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); @@ -97,7 +99,13 @@ public void testCleaner() throws Exception { } @Category(MediumTests.class) - public static class MultiThreadTest { + @SuppressWarnings("JUnit4TestNotRun") + public static class TestWithMultiThread { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWithMultiThread.class); + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); From 3be4cfb4e216f23c10dc2695a86a3222b58cee0b Mon Sep 17 00:00:00 2001 From: chandrasekhar-188k <154109917+chandrasekhar-188k@users.noreply.github.com> Date: Mon, 8 Sep 2025 08:12:38 +0530 Subject: [PATCH 3/6] Fix UT Failures --- .../mob/TestExpiredMobFileCleanerChore.java | 109 ++++++------------ 1 file changed, 35 insertions(+), 74 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java index 53b9d8157e59..f26468727954 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT; import static org.junit.Assert.assertEquals; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -38,17 +40,19 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -@RunWith(Enclosed.class) -@SuppressWarnings("JUnit4TestNotRun") +@RunWith(Parameterized.class) +@Category(MediumTests.class) public class TestExpiredMobFileCleanerChore { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class); private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner"); private final static TableName tableName2 = TableName.valueOf("TestExpiredMobFileCleaner2"); @@ -61,79 +65,36 @@ public class TestExpiredMobFileCleanerChore { private static BufferedMutator table; private static Admin admin; private static BufferedMutator table2; + @Parameterized.Parameter() + public int mobCleanerThreadCount; - @Category(MediumTests.class) - @SuppressWarnings("JUnit4TestNotRun") - public static class TestWithSingleThread { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestWithSingleThread.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); - TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1); - } - - @Before - public void setUp() throws Exception { - TEST_UTIL.startMiniCluster(1); - } - - @After - public void tearDown() throws Exception { - admin.disableTable(tableName); - admin.deleteTable(tableName); - admin.disableTable(tableName2); - admin.deleteTable(tableName2); - admin.close(); - TEST_UTIL.shutdownMiniCluster(); - TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); - } - - @Test - public void testCleaner() throws Exception { - testCleanerInternal(); - } + @Parameterized.Parameters + public static List params() { + return Arrays.asList(1, 2); } - @Category(MediumTests.class) - @SuppressWarnings("JUnit4TestNotRun") - public static class TestWithMultiThread { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestWithMultiThread.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); - TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2); - - } - - @Before - public void setUp() throws Exception { - TEST_UTIL.startMiniCluster(1); - } - - @After - public void tearDown() throws Exception { - admin.disableTable(tableName); - admin.deleteTable(tableName); - admin.disableTable(tableName2); - admin.deleteTable(tableName2); - admin.close(); - TEST_UTIL.shutdownMiniCluster(); - TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); - } - - @Test - public void testCleaner() throws Exception { - testCleanerInternal(); - } + @Before + public void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, mobCleanerThreadCount); + TEST_UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.disableTable(tableName2); + admin.deleteTable(tableName2); + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCleaner() throws Exception { + testCleanerInternal(); } private static void init() throws Exception { From ed1a1dd35dba9e397174cc69617d9a0b28397b0d Mon Sep 17 00:00:00 2001 From: chandrasekhar-188k <154109917+chandrasekhar-188k@users.noreply.github.com> Date: Sat, 20 Sep 2025 08:57:48 +0530 Subject: [PATCH 4/6] Fix Review comments --- .../apache/hadoop/hbase/master/HMaster.java | 5 +- .../hadoop/hbase/mob/MobFileCleanerChore.java | 62 +++++++++++++++---- .../mob/TestExpiredMobFileCleanerChore.java | 50 ++++++++------- 3 files changed, 83 insertions(+), 34 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 849ddb457d7a..f8abca44e4c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -4570,7 +4570,10 @@ protected String getDescription() { }); } - public MobFileCleanerChore getMobFileCleanerChore() { + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public MobFileCleanerChore getMobFileCleanerChore() { return mobFileCleanerChore; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index 056cfb5ea79b..e594e5d9ada1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -20,15 +20,16 @@ import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT; import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -37,12 +38,12 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.master.HMaster; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -50,14 +51,15 @@ * (files which have no active references to) mob files. */ @InterfaceAudience.Private -public class MobFileCleanerChore extends ScheduledChore { +public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class); private final HMaster master; private final ExpiredMobFileCleaner cleaner; - private final ExecutorService threadPool; + private final ThreadPoolExecutor executor; private final int cleanerFutureTimeout; + private int threadCount; public MobFileCleanerChore(HMaster master) { super(master.getServerName() + "-MobFileCleanerChore", master, @@ -69,16 +71,18 @@ public MobFileCleanerChore(HMaster master) { this.master = master; cleaner = new ExpiredMobFileCleaner(); cleaner.setConf(master.getConfiguration()); - int threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, + threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); + if (threadCount <= 1) { + threadCount = 1; + } ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build(); - if (threadCount == 1) { - threadPool = MoreExecutors.newDirectExecutorService(); - } else { - threadPool = Executors.newFixedThreadPool(threadCount, threadFactory); - } + + executor = new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + checkObsoleteConfigurations(); cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT, DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT); @@ -114,7 +118,7 @@ protected void chore() { } List futureList = new ArrayList<>(map.size()); for (TableDescriptor htd : map.values()) { - Future future = threadPool.submit(() -> handleOneTable(htd)); + Future future = executor.submit(() -> handleOneTable(htd)); futureList.add(future); } @@ -123,6 +127,8 @@ protected void chore() { future.get(cleanerFutureTimeout, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.warn("Exception during the execution of MobFileCleanerChore", e); + // Restore the interrupted status + Thread.currentThread().interrupt(); } } } @@ -151,4 +157,36 @@ private void handleOneTable(TableDescriptor htd) { } } + @Override + public void onConfigurationChange(Configuration conf) { + int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, + MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); + if (newThreadCount < 1) { + return; // invalid value , skip the config change + } + + if (newThreadCount != threadCount) { + resizeThreadPool(newThreadCount, newThreadCount); + threadCount = newThreadCount; + } + } + + private void resizeThreadPool(int newCoreSize, int newMaxSize) { + int currentCoreSize = executor.getCorePoolSize(); + if (newCoreSize > currentCoreSize) { + // Increasing the pool size: Set max first, then core + executor.setMaximumPoolSize(newMaxSize); + executor.setCorePoolSize(newCoreSize); + } else { + // Decreasing the pool size: Set core first, then max + executor.setCorePoolSize(newCoreSize); + executor.setMaximumPoolSize(newMaxSize); + } + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public ThreadPoolExecutor getExecutor() { + return executor; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java index f26468727954..c2d39c40f275 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java @@ -21,8 +21,7 @@ import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT; import static org.junit.Assert.assertEquals; -import java.util.Arrays; -import java.util.List; +import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -35,19 +34,19 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) -@Category(MediumTests.class) +@Category({ MediumTests.class, MasterTests.class }) public class TestExpiredMobFileCleanerChore { @ClassRule @@ -65,35 +64,46 @@ public class TestExpiredMobFileCleanerChore { private static BufferedMutator table; private static Admin admin; private static BufferedMutator table2; - @Parameterized.Parameter() - public int mobCleanerThreadCount; + private static MobFileCleanerChore mobFileCleanerChore; - @Parameterized.Parameters - public static List params() { - return Arrays.asList(1, 2); - } - - @Before - public void setUp() throws Exception { + @BeforeClass + public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); - TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, mobCleanerThreadCount); TEST_UTIL.startMiniCluster(1); + mobFileCleanerChore = TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore(); } @After - public void tearDown() throws Exception { + public void cleanUp() throws IOException { admin.disableTable(tableName); admin.deleteTable(tableName); admin.disableTable(tableName2); admin.deleteTable(tableName2); admin.close(); + } + + @AfterClass + public static void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); } @Test - public void testCleaner() throws Exception { + public void testCleanerSingleThread() throws Exception { + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1); + mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration()); + int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize(); + Assert.assertEquals(1, corePoolSize); + testCleanerInternal(); + } + + @Test + public void testCleanerMultiThread() throws Exception { + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2); + mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration()); + int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize(); + Assert.assertEquals(2, corePoolSize); testCleanerInternal(); } @@ -219,8 +229,6 @@ public static void testCleanerInternal() throws Exception { modifyColumnExpiryDays(2); // ttl = 2, make the first row expired // run the cleaner chore - MobFileCleanerChore mobFileCleanerChore = - TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore(); mobFileCleanerChore.chore(); FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); From a499cb463c5244cddb453dbe4dd6a3751004b05b Mon Sep 17 00:00:00 2001 From: chandrasekhar-188k <154109917+chandrasekhar-188k@users.noreply.github.com> Date: Wed, 8 Oct 2025 08:32:10 +0530 Subject: [PATCH 5/6] Fix Review comments - New --- .../hadoop/hbase/mob/MobFileCleanerChore.java | 24 +++++++++++++++---- .../mob/TestExpiredMobFileCleanerChore.java | 3 +-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index e594e5d9ada1..07259d2e6518 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -116,23 +116,37 @@ protected void chore() { LOG.error("MobFileCleanerChore failed", e); return; } - List futureList = new ArrayList<>(map.size()); + List> futureList = new ArrayList<>(map.size()); for (TableDescriptor htd : map.values()) { Future future = executor.submit(() -> handleOneTable(htd)); futureList.add(future); } - for (Future future : futureList) { + for (Future future : futureList) { try { future.get(cleanerFutureTimeout, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Exception during the execution of MobFileCleanerChore", e); - // Restore the interrupted status + } catch (InterruptedException e) { + LOG.warn("MobFileCleanerChore interrupted while waiting for futures", e); Thread.currentThread().interrupt(); + cancelAllFutures(futureList); + break; + } catch (ExecutionException e) { + LOG.error("Exception during execution of MobFileCleanerChore task", e); + } catch (TimeoutException e) { + LOG.error("MobFileCleanerChore timed out waiting for a task to complete", e); } } } + private void cancelAllFutures(List> futureList) { + for (Future f : futureList) { + if (!f.isDone()) { + f.cancel(true); // interrupt running tasks + } + } + LOG.info("Cancelled all pending mob file cleaner tasks"); + } + private void handleOneTable(TableDescriptor htd) { for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java index c2d39c40f275..9ec87fe94c13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java @@ -164,8 +164,7 @@ private static void putKVAndFlush(BufferedMutator table, byte[] row, byte[] valu * 3 day old hfile is removed but the 1 day one is still present after the expiry based cleaner is * run. */ - - public static void testCleanerInternal() throws Exception { + private static void testCleanerInternal() throws Exception { init(); Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); From 1c09b526efb398d3d71f0e5ec64bc08230d359fe Mon Sep 17 00:00:00 2001 From: chandrasekhar-188k <154109917+chandrasekhar-188k@users.noreply.github.com> Date: Fri, 10 Oct 2025 08:47:10 +0530 Subject: [PATCH 6/6] Fix Review comments - 2 --- .../org/apache/hadoop/hbase/mob/MobFileCleanerChore.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index 07259d2e6518..3f9413bfe4d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -73,7 +73,7 @@ public MobFileCleanerChore(HMaster master) { cleaner.setConf(master.getConfiguration()); threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); - if (threadCount <= 1) { + if (threadCount < 1) { threadCount = 1; } @@ -139,12 +139,14 @@ protected void chore() { } private void cancelAllFutures(List> futureList) { + long pendingTaskCounter = 0; for (Future f : futureList) { if (!f.isDone()) { f.cancel(true); // interrupt running tasks + pendingTaskCounter++; } } - LOG.info("Cancelled all pending mob file cleaner tasks"); + LOG.info("Cancelled {} pending mob file cleaner tasks", pendingTaskCounter); } private void handleOneTable(TableDescriptor htd) {