From 7767c5cc8a420ff387850e9561d09002e7a60bdc Mon Sep 17 00:00:00 2001 From: niuyulin Date: Wed, 24 Feb 2021 20:07:49 +0800 Subject: [PATCH] HBASE-25603 Add switch for compaction after bulkload --- .../src/main/resources/hbase-default.xml | 7 +++ .../hadoop/hbase/regionserver/HRegion.java | 32 ++++++++----- .../TestCompactionAfterBulkLoad.java | 46 +++++++++++-------- .../asciidoc/_chapters/hbase-default.adoc | 11 +++++ 4 files changed, 64 insertions(+), 32 deletions(-) diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 20f3881edb2c..ae6e18e68791 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -593,6 +593,13 @@ possible configurations would overwhelm and obscure the important. to atomic bulk loads are attempted in the face of splitting operations 0 means never give up. + + hbase.compaction.after.bulkload.enable + true + Request Compaction after bulkload immediately. + If bulkload is continuous, the triggered compactions may increase load, + bring about performance side effect. + hbase.master.balancer.maxRitPercent 1.0 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index db5dd7a34655..9cbb4e9f0f28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -244,6 +244,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync"; public static final boolean DEFAULT_WAL_HSYNC = false; + /** Parameter name for compaction after bulkload */ + public static final String COMPACTION_AFTER_BULKLOAD_ENABLE = + "hbase.compaction.after.bulkload.enable"; + /** * This is for for using HRegion as a local storage, where we may put the recovered edits in a * special place. Once this is set, we will only replay the recovered edits under this directory @@ -7025,19 +7029,23 @@ public Map> bulkLoadHFiles(Collection> f } isSuccessful = true; - //request compaction - familyWithFinalPath.keySet().forEach(family -> { - HStore store = getStore(family); - try { - if (this.rsServices != null && store.needsCompaction()) { - this.rsServices.getCompactionRequestor().requestCompaction(this, store, - "bulkload hfiles request compaction", Store.PRIORITY_USER + 1, - CompactionLifeCycleTracker.DUMMY, null); + if (conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true)) { + // request compaction + familyWithFinalPath.keySet().forEach(family -> { + HStore store = getStore(family); + try { + if (this.rsServices != null && store.needsCompaction()) { + this.rsServices.getCompactionRequestor().requestCompaction(this, store, + "bulkload hfiles request compaction", Store.PRIORITY_USER + 1, + CompactionLifeCycleTracker.DUMMY, null); + LOG.debug("bulkload hfiles request compaction region : {}, family : {}", + this.getRegionInfo(), family); + } + } catch (IOException e) { + LOG.error("bulkload hfiles request compaction error ", e); } - } catch (IOException e) { - LOG.error("bulkload hfiles request compaction error ", e); - } - }); + }); + } } finally { if (wal != null && !storeFiles.isEmpty()) { // Write a bulk load event for hfiles that are loaded diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java index 423659ecaa82..c736513e97ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -84,27 +85,32 @@ public void shouldRequestCompactionAfterBulkLoad() throws IOException { for (int i = 0; i < 5; i++) { familyPaths.addAll(withFamilyPathsFor(family1, family2, family3)); } - when(regionServerServices.getConfiguration()).thenReturn(conf); - when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); - when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) - .thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - WALKeyImpl walKey = invocation.getArgument(1); - MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); - if (mvcc != null) { - MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); - walKey.setWriteEntry(we); + try { + conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); + when(regionServerServices.getConfiguration()).thenReturn(conf); + when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + WALKeyImpl walKey = invocation.getArgument(1); + MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); + walKey.setWriteEntry(we); + } + return 01L; } - return 01L; - } - }); + }); - Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), - any(), any()); - testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); - // invoke three times for 3 families - verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class), - isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null)); + Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), + any(), any()); + testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); + // invoke three times for 3 families + verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class), + isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null)); + } finally { + conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); + } } } diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc index 32dfb1650916..449d65fa95e8 100644 --- a/src/main/asciidoc/_chapters/hbase-default.adoc +++ b/src/main/asciidoc/_chapters/hbase-default.adoc @@ -756,6 +756,17 @@ Maximum retries. This is a maximum number of iterations `10` +[[hbase.compaction.after.bulkload.enable]] +*`hbase.compaction.after.bulkload.enable`*:: ++ +.Description +Request Compaction after bulkload immediately. +If bulkload is continuous, the triggered compactions may increase load, +bring about performance side effect. ++ +.Default +`true` + [[hbase.balancer.period ]] *`hbase.balancer.period