6161import org .apache .hadoop .hbase .coprocessor .RegionCoprocessor ;
6262import org .apache .hadoop .hbase .coprocessor .RegionCoprocessorEnvironment ;
6363import org .apache .hadoop .hbase .coprocessor .RegionObserver ;
64+ import org .apache .hadoop .hbase .coprocessor .RegionServerCoprocessor ;
65+ import org .apache .hadoop .hbase .coprocessor .RegionServerCoprocessorEnvironment ;
66+ import org .apache .hadoop .hbase .coprocessor .RegionServerObserver ;
6467import org .apache .hadoop .hbase .io .hfile .HFile ;
6568import org .apache .hadoop .hbase .io .hfile .HFileContextBuilder ;
6669import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
@@ -113,6 +116,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
113116 private static AtomicInteger BULK_LOADS_COUNT ;
114117 private static CountDownLatch BULK_LOAD_LATCH ;
115118
119+ private static AtomicInteger REPLICATION_COUNT ;
120+ private static CountDownLatch REPLICATION_LATCH ;
121+
116122 protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility ();
117123 protected static final Configuration CONF3 = UTIL3 .getConfiguration ();
118124
@@ -124,7 +130,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
124130 @ ClassRule
125131 public static TemporaryFolder testFolder = new TemporaryFolder ();
126132
127- private static ReplicationQueueStorage queueStorage ;
133+ private static final List < ReplicationQueueStorage > queueStorages = new ArrayList <>() ;
128134
129135 private static boolean replicationPeersAdded = false ;
130136
@@ -136,8 +142,11 @@ public static void setUpBeforeClass() throws Exception {
136142 setupConfig (UTIL3 , "/3" );
137143 TestReplicationBase .setUpBeforeClass ();
138144 startThirdCluster ();
139- queueStorage = ReplicationStorageFactory .getReplicationQueueStorage (UTIL1 .getZooKeeperWatcher (),
140- UTIL1 .getConfiguration ());
145+ for (HBaseTestingUtility util : new HBaseTestingUtility [] { UTIL1 , UTIL2 , UTIL3 }) {
146+ ReplicationQueueStorage queueStorage = ReplicationStorageFactory
147+ .getReplicationQueueStorage (util .getZooKeeperWatcher (), util .getConfiguration ());
148+ queueStorages .add (queueStorage );
149+ }
141150 }
142151
143152 private static void startThirdCluster () throws Exception {
@@ -183,6 +192,7 @@ public void setUpBase() throws Exception {
183192 }
184193
185194 BULK_LOADS_COUNT = new AtomicInteger (0 );
195+ REPLICATION_COUNT = new AtomicInteger (0 );
186196 }
187197
188198 private ReplicationPeerConfig getPeerConfigForCluster (HBaseTestingUtility util ) {
@@ -191,21 +201,37 @@ private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util)
191201 }
192202
193203 private void setupCoprocessor (HBaseTestingUtility cluster ) {
204+ Configuration conf = cluster .getConfiguration ();
205+ String clusterKey = cluster .getClusterKey ();
206+ Class <TestBulkLoadReplication .BulkReplicationTestObserver > cpClass =
207+ TestBulkLoadReplication .BulkReplicationTestObserver .class ;
208+
194209 cluster .getHBaseCluster ().getRegions (tableName ).forEach (r -> {
195210 try {
196- TestBulkLoadReplication . BulkReplicationTestObserver cp = r .getCoprocessorHost ()
197- .findCoprocessor (TestBulkLoadReplication . BulkReplicationTestObserver . class );
211+ RegionCoprocessorHost cpHost = r .getCoprocessorHost ();
212+ TestBulkLoadReplication . BulkReplicationTestObserver cp = cpHost .findCoprocessor (cpClass );
198213 if (cp == null ) {
199- r .getCoprocessorHost ().load (TestBulkLoadReplication .BulkReplicationTestObserver .class , 0 ,
200- cluster .getConfiguration ());
201- cp = r .getCoprocessorHost ()
202- .findCoprocessor (TestBulkLoadReplication .BulkReplicationTestObserver .class );
203- cp .clusterName = cluster .getClusterKey ();
214+ cpHost .load (cpClass , 0 , conf );
215+ cp = cpHost .findCoprocessor (cpClass );
216+ cp .clusterName = clusterKey ;
204217 }
205218 } catch (Exception e ) {
206219 LOG .error (e .getMessage (), e );
207220 }
208221 });
222+
223+ try {
224+ RegionServerCoprocessorHost cpHost =
225+ cluster .getHBaseCluster ().getRegionServer (0 ).getRegionServerCoprocessorHost ();
226+ TestBulkLoadReplication .BulkReplicationTestObserver cp = cpHost .findCoprocessor (cpClass );
227+ if (cp == null ) {
228+ cpHost .load (cpClass , 0 , conf );
229+ cp = cpHost .findCoprocessor (cpClass );
230+ cp .clusterName = clusterKey ;
231+ }
232+ } catch (Exception e ) {
233+ LOG .error (e .getMessage (), e );
234+ }
209235 }
210236
211237 protected static void setupBulkLoadConfigsForCluster (Configuration config ,
@@ -241,13 +267,21 @@ public void testBulkLoadReplicationActiveActive() throws Exception {
241267 // Each event gets 3 counts (the originator cluster, plus the two peers),
242268 // so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
243269 assertEquals (9 , BULK_LOADS_COUNT .get ());
270+ // Each bulk load event gets replicated twice (to two peers),
271+ // so REPLICATION_COUNT expected value is 3 * 2 = 6.
272+ assertEquals (6 , REPLICATION_COUNT .get ());
273+ for (ReplicationQueueStorage queueStorage : queueStorages ) {
274+ assertEquals (0 , queueStorage .getAllHFileRefs ().size ());
275+ }
244276 }
245277
246278 protected void assertBulkLoadConditions (TableName tableName , byte [] row , byte [] value ,
247279 HBaseTestingUtility utility , Table ... tables ) throws Exception {
248280 BULK_LOAD_LATCH = new CountDownLatch (3 );
281+ REPLICATION_LATCH = new CountDownLatch (2 );
249282 bulkLoadOnCluster (tableName , row , value , utility );
250283 assertTrue (BULK_LOAD_LATCH .await (1 , TimeUnit .MINUTES ));
284+ assertTrue (REPLICATION_LATCH .await (1 , TimeUnit .MINUTES ));
251285 assertTableHasValue (tables [0 ], row , value );
252286 assertTableHasValue (tables [1 ], row , value );
253287 assertTableHasValue (tables [2 ], row , value );
@@ -305,10 +339,12 @@ private String createHFileForFamilies(byte[] row, byte[] value, Configuration cl
305339 return hFileLocation .getAbsoluteFile ().getAbsolutePath ();
306340 }
307341
308- public static class BulkReplicationTestObserver implements RegionCoprocessor {
342+ public static class BulkReplicationTestObserver
343+ implements RegionCoprocessor , RegionServerCoprocessor {
309344
310345 String clusterName ;
311346 AtomicInteger bulkLoadCounts = new AtomicInteger ();
347+ AtomicInteger replicationCount = new AtomicInteger ();
312348
313349 @ Override
314350 public Optional <RegionObserver > getRegionObserver () {
@@ -325,6 +361,22 @@ public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
325361 }
326362 });
327363 }
364+
365+ @ Override
366+ public Optional <RegionServerObserver > getRegionServerObserver () {
367+ return Optional .of (new RegionServerObserver () {
368+
369+ // FIXME: this is deprecated, but we need it for validating bulk load replication
370+ @ Override
371+ public void
372+ postReplicateLogEntries (final ObserverContext <RegionServerCoprocessorEnvironment > ctx ) {
373+ REPLICATION_LATCH .countDown ();
374+ REPLICATION_COUNT .incrementAndGet ();
375+ LOG .info ("Replication succeeded. Total for {}: {}" , clusterName ,
376+ replicationCount .addAndGet (1 ));
377+ }
378+ });
379+ }
328380 }
329381
330382 @ Test
@@ -339,7 +391,11 @@ public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception
339391 // additional wait to make sure no extra bulk load happens
340392 Thread .sleep (400 );
341393 assertEquals (1 , BULK_LOADS_COUNT .get ());
342- assertEquals (0 , queueStorage .getAllHFileRefs ().size ());
394+ // No replication should happen for no-replication family
395+ assertEquals (0 , REPLICATION_COUNT .get ());
396+ for (ReplicationQueueStorage queueStorage : queueStorages ) {
397+ assertEquals (0 , queueStorage .getAllHFileRefs ().size ());
398+ }
343399 }
344400
345401 private void assertBulkLoadConditionsForNoRepFamily (byte [] row , byte [] value ,
0 commit comments