diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 94d977b30371..28b2d1ccf664 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1390,6 +1390,16 @@ public static enum Modify { "hbase.master.executor.logreplayops.threads"; public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + /** + * Number of rows in a batch operation above which a warning will be logged. + */ + public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; + + /** + * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME} + */ + public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 34f322c9ba46..86bbe8943f0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -231,15 +231,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; - /** - * Number of rows in a batch operation above which a warning will be logged. - */ - static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; - /** - * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} - */ - static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; - /* * Whether to reject rows with size > threshold defined by * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} @@ -1128,7 +1119,8 @@ public RSRpcServices(HRegionServer rs) throws IOException { RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { this.ld = ld; regionServer = rs; - rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + rowSizeWarnThreshold = rs.conf.getInt( + HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; rejectRowsWithSizeOverThreshold = rs.conf .getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index d6f48b962e8d..03cd86bfd9a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -245,7 +245,7 @@ public void startReplicationService() throws IOException { } catch (ReplicationException e) { throw new IOException(e); } - this.replicationSink = new ReplicationSink(this.conf, this.server); + this.replicationSink = new ReplicationSink(this.conf); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 9143f3dd805f..34cb867f20bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -18,10 +18,13 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import com.google.common.collect.Lists; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,7 +43,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -90,16 +92,21 @@ public class ReplicationSink { private long hfilesReplicated = 0; private SourceFSConfigurationProvider provider; + /** + * Row size threshold for multi requests above which a warning is logged + */ + private final int rowSizeWarnThreshold; + /** * Create a sink for replication - * - * @param conf conf object - * @param stopper boolean to tell this thread to stop + * @param conf conf object * @throws IOException thrown when HDFS goes bad or bad file name */ - public ReplicationSink(Configuration conf, Stoppable stopper) + public ReplicationSink(Configuration conf) throws IOException { this.conf = HBaseConfiguration.create(conf); + rowSizeWarnThreshold = conf.getInt( + HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); decorateConf(); this.metrics = new MetricsSink(); @@ -215,7 +222,7 @@ public void replicateEntries(List entries, final CellScanner cells, if (!rowMap.isEmpty()) { LOG.debug("Started replicating mutations."); for (Entry, List>> entry : rowMap.entrySet()) { - batch(entry.getKey(), entry.getValue().values()); + batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); } @@ -380,9 +387,10 @@ public void stopReplicationSinkServices() { * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions - * @throws IOException + * @param batchRowSizeThreshold rowSize threshold for batch mutation */ - protected void batch(TableName tableName, Collection> allRows) throws IOException { + private void batch(TableName tableName, Collection> allRows, int batchRowSizeThreshold) + throws IOException { if (allRows.isEmpty()) { return; } @@ -391,7 +399,15 @@ protected void batch(TableName tableName, Collection> allRows) throws Connection connection = getConnection(); table = connection.getTable(tableName); for (List rows : allRows) { - table.batch(rows); + List> batchRows; + if (rows.size() > batchRowSizeThreshold) { + batchRows = Lists.partition(rows, batchRowSizeThreshold); + } else { + batchRows = Collections.singletonList(rows); + } + for (List rowList : batchRows) { + table.batch(rowList); + } } } catch (RetriesExhaustedWithDetailsException rewde) { for (Throwable ex : rewde.getCauses()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java index af38ea2bb791..c4ca78c62460 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java @@ -24,6 +24,7 @@ import org.junit.Before; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -75,8 +76,8 @@ public void setupTest() throws Exception { final TableName tableName = TableName.valueOf("tableName"); TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); - THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, - RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); + THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, + HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp); TEST_UTIL.startMiniCluster(); TEST_UTIL.createTable(tableName, TEST_FAM); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index d475e007a4b7..9a489385f024 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.security.SecureRandom; import java.util.ArrayList; @@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -76,7 +75,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(MediumTests.class) +@Category(LargeTests.class) public class TestReplicationSink { private static final Log LOG = LogFactory.getLog(TestReplicationSink.class); private static final int BATCH_SIZE = 10; @@ -123,10 +122,8 @@ public static void setUpBeforeClass() throws Exception { HConstants.REPLICATION_ENABLE_DEFAULT); TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); - TEST_UTIL.startMiniCluster(3); - SINK = - new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -199,6 +196,40 @@ public void testMixedPutDelete() throws Exception { assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); } + @Test + public void testLargeEditsPutDelete() throws Exception { + List entries = new ArrayList<>(); + List cells = new ArrayList<>(); + for (int i = 0; i < 5510; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + + ResultScanner resultScanner = table1.getScanner(new Scan()); + int totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5510, totalRows); + + entries = new ArrayList<>(); + cells = new ArrayList<>(); + for (int i = 0; i < 11000; i++) { + entries.add( + createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, + cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + resultScanner = table1.getScanner(new Scan()); + totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5500, totalRows); + } + /** * Insert to 2 different tables * @throws Exception @@ -217,7 +248,11 @@ public void testMixedPutTables() throws Exception { Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for(Result res : scanRes) { - assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); + assertEquals(0, Bytes.toInt(res.getRow()) % 2); + } + scanRes = table1.getScanner(scan); + for(Result res : scanRes) { + assertEquals(1, Bytes.toInt(res.getRow()) % 2); } }