From be6e57079cccbc717919d302972c5bab26f248eb Mon Sep 17 00:00:00 2001 From: apoonia Date: Fri, 13 Sep 2024 13:09:12 +0530 Subject: [PATCH 1/4] HBASE-28836 Parallize the file archival to improve the split times in object store (s3) --- .../java/org/apache/hadoop/hbase/backup/HFileArchiver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index b2ea9cd33a0b..f2427c6fdbf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -439,7 +439,7 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, List failures = new ArrayList<>(); String startTime = Long.toString(start); - for (File file : toArchive) { + toArchive.parallelStream().forEach(file -> { // if its a file archive it try { LOG.trace("Archiving {}", file); @@ -463,7 +463,7 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, LOG.warn("Failed to archive {}", file, e); failures.add(file); } - } + }); return failures; } From 2f405e974b01fb862e838fe6865947020bebe6d6 Mon Sep 17 00:00:00 2001 From: apoonia Date: Mon, 16 Sep 2024 15:18:58 +0530 Subject: [PATCH 2/4] Revert "HBASE-28836 Parallize the file archival to improve the split times in object store (s3)" This reverts commit be6e57079cccbc717919d302972c5bab26f248eb. --- .../java/org/apache/hadoop/hbase/backup/HFileArchiver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index f2427c6fdbf3..b2ea9cd33a0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -439,7 +439,7 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, List failures = new ArrayList<>(); String startTime = Long.toString(start); - toArchive.parallelStream().forEach(file -> { + for (File file : toArchive) { // if its a file archive it try { LOG.trace("Archiving {}", file); @@ -463,7 +463,7 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, LOG.warn("Failed to archive {}", file, e); failures.add(file); } - }); + } return failures; } From b33878a4b86f104d787aabfd55d1ac0d7692a0ee Mon Sep 17 00:00:00 2001 From: apoonia Date: Tue, 17 Sep 2024 10:43:45 +0530 Subject: [PATCH 3/4] Use executor service for cleanup of files --- .../hadoop/hbase/backup/HFileArchiver.java | 54 ++++++++++++++++--- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index b2ea9cd33a0b..fcbc456bd2ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -23,8 +23,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -439,17 +443,11 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, List failures = new ArrayList<>(); String startTime = Long.toString(start); + List filesOnly = new ArrayList<>(); for (File file : toArchive) { // if its a file archive it try { - LOG.trace("Archiving {}", file); - if (file.isFile()) { - // attempt to archive the file - if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) { - LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir); - failures.add(file); - } - } else { + if (!file.isFile()) { // otherwise its a directory and we need to archive all files LOG.trace("{} is a directory, archiving children files", file); // so we add the directory name to the one base archive @@ -458,12 +456,52 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, // archive those too Collection children = file.getChildren(); failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start)); + } else { + filesOnly.add(file); } } catch (IOException e) { LOG.warn("Failed to archive {}", file, e); failures.add(file); } } + ExecutorService executorService = Executors.newFixedThreadPool(25); + Map> futures = new HashMap<>(); + // In current baseDir all files will be process concurrently + for(File file : filesOnly) { + LOG.trace("Archiving {}", file); + Future archiveTask = + executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); + futures.put(file, archiveTask); + } + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdown(); + } + } catch (InterruptedException e) { + LOG.warn("HFileArchive Cleanup thread was interrupted while shutting down"); + } + + for (Map.Entry> fileFutureEntry : futures.entrySet()) { + try { + boolean fileCleaned = fileFutureEntry.getValue().get(); + if(!fileCleaned) { + LOG.warn( + "Couldn't archive %s into backup directory: %s".formatted(fileFutureEntry.getKey(), + baseArchiveDir)); + failures.add(fileFutureEntry.getKey()); + } + } catch (InterruptedException e) { + LOG.warn("HFileArchive Cleanup thread was interrupted"); + } catch (ExecutionException e) { + // this is IOException + LOG.warn("Failed to archive {}",fileFutureEntry.getKey() , e); + failures.add(fileFutureEntry.getKey()); + } + + } + return failures; } From 1e21b43849e10ea2cdffd45869080c15415e79aa Mon Sep 17 00:00:00 2001 From: apoonia Date: Tue, 17 Sep 2024 15:57:45 +0530 Subject: [PATCH 4/4] Use synchronized collection --- .../apache/hadoop/hbase/backup/HFileArchiver.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index fcbc456bd2ae..d0040e29e7e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -441,7 +441,7 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, LOG.trace("Created archive directory {}", baseArchiveDir); } - List failures = new ArrayList<>(); + List failures = Collections.synchronizedList(new ArrayList<>()); String startTime = Long.toString(start); List filesOnly = new ArrayList<>(); for (File file : toArchive) { @@ -467,7 +467,7 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, ExecutorService executorService = Executors.newFixedThreadPool(25); Map> futures = new HashMap<>(); // In current baseDir all files will be process concurrently - for(File file : filesOnly) { + for (File file : filesOnly) { LOG.trace("Archiving {}", file); Future archiveTask = executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); @@ -486,17 +486,16 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, for (Map.Entry> fileFutureEntry : futures.entrySet()) { try { boolean fileCleaned = fileFutureEntry.getValue().get(); - if(!fileCleaned) { - LOG.warn( - "Couldn't archive %s into backup directory: %s".formatted(fileFutureEntry.getKey(), - baseArchiveDir)); + if (!fileCleaned) { + LOG.warn("Couldn't archive %s into backup directory: %s" + .formatted(fileFutureEntry.getKey(), baseArchiveDir)); failures.add(fileFutureEntry.getKey()); } } catch (InterruptedException e) { LOG.warn("HFileArchive Cleanup thread was interrupted"); } catch (ExecutionException e) { // this is IOException - LOG.warn("Failed to archive {}",fileFutureEntry.getKey() , e); + LOG.warn("Failed to archive {}", fileFutureEntry.getKey(), e); failures.add(fileFutureEntry.getKey()); }