From 804308fe2fbd36ea77a70cbd883d732d91453564 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 8 Jun 2019 20:40:23 +0800 Subject: [PATCH] HBASE-22553 NPE in RegionReplicaReplicationEndpoint --- .../RegionReplicaReplicationEndpoint.java | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index cc2650f803f2..2c3b19b6a45c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -32,12 +32,15 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.WALEntryFilter; @@ -162,9 +165,9 @@ private void getRegionLocations(CompletableFuture future, return; } // check if the number of region replicas is correct, and also the primary region name - // matches, and also there is no null elements in the returned RegionLocations + // matches. if (locs.size() == tableDesc.getRegionReplication() && - locs.size() == locs.numNonNullElements() && + locs.getDefaultRegionLocation() != null && Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), encodedRegionName)) { future.complete(locs); @@ -182,8 +185,8 @@ private void replicate(CompletableFuture future, RegionLocations locs, future.complete(Long.valueOf(entries.size())); return; } - if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), - encodedRegionName)) { + RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion(); + if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) { // the region name is not equal, this usually means the region has been split or merged, so // give up replicating as the new region(s) should already have all the data of the parent // region(s). @@ -191,7 +194,7 @@ private void replicate(CompletableFuture future, RegionLocations locs, LOG.trace( "Skipping {} entries in table {} because located region {} is different than" + " the original region {} from WALEdit", - tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(), + tableDesc.getTableName(), defaultReplica.getEncodedName(), Bytes.toStringBinary(encodedRegionName)); } future.complete(Long.valueOf(entries.size())); @@ -202,24 +205,26 @@ private void replicate(CompletableFuture future, RegionLocations locs, AtomicLong skippedEdits = new AtomicLong(0); for (int i = 1, n = locs.size(); i < n; i++) { - final int replicaId = i; - FutureUtils.addListener(connection.replay(tableDesc.getTableName(), - locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries, - replicaId, numRetries, operationTimeoutNs), (r, e) -> { - if (e != null) { - LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e); - error.compareAndSet(null, e); - } else { - AtomicUtils.updateMax(skippedEdits, r.longValue()); - } - if (remainingTasks.decrementAndGet() == 0) { - if (error.get() != null) { - future.completeExceptionally(error.get()); + // Do not use the elements other than the default replica as they may be null. We will fail + // earlier if the location for default replica is null. + final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i); + FutureUtils + .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(), + row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> { + if (e != null) { + LOG.warn("Failed to replicate to {}", replica, e); + error.compareAndSet(null, e); } else { - future.complete(skippedEdits.get()); + AtomicUtils.updateMax(skippedEdits, r.longValue()); } - } - }); + if (remainingTasks.decrementAndGet() == 0) { + if (error.get() != null) { + future.completeExceptionally(error.get()); + } else { + future.complete(skippedEdits.get()); + } + } + }); } } @@ -245,6 +250,10 @@ private CompletableFuture replicate(TableDescriptor tableDesc, byte[] enco FutureUtils.addListener(locateFuture, (locs, error) -> { if (error != null) { future.completeExceptionally(error); + } else if (locs.getDefaultRegionLocation() == null) { + future.completeExceptionally( + new HBaseIOException("No location found for default replica of table=" + + tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'")); } else { replicate(future, locs, tableDesc, encodedRegionName, row, entries); }