Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
Expand Down Expand Up @@ -162,9 +165,9 @@ private void getRegionLocations(CompletableFuture<RegionLocations> future,
return;
}
// check if the number of region replicas is correct, and also the primary region name
// matches, and also there is no null elements in the returned RegionLocations
// matches.
if (locs.size() == tableDesc.getRegionReplication() &&
locs.size() == locs.numNonNullElements() &&
locs.getDefaultRegionLocation() != null &&
Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
encodedRegionName)) {
future.complete(locs);
Expand All @@ -182,16 +185,16 @@ private void replicate(CompletableFuture<Long> future, RegionLocations locs,
future.complete(Long.valueOf(entries.size()));
return;
}
if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
encodedRegionName)) {
RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
// the region name is not equal, this usually means the region has been split or merged, so
// give up replicating as the new region(s) should already have all the data of the parent
// region(s).
if (LOG.isTraceEnabled()) {
LOG.trace(
"Skipping {} entries in table {} because located region {} is different than" +
" the original region {} from WALEdit",
tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
tableDesc.getTableName(), defaultReplica.getEncodedName(),
Bytes.toStringBinary(encodedRegionName));
}
future.complete(Long.valueOf(entries.size()));
Expand All @@ -202,24 +205,26 @@ private void replicate(CompletableFuture<Long> future, RegionLocations locs,
AtomicLong skippedEdits = new AtomicLong(0);

for (int i = 1, n = locs.size(); i < n; i++) {
final int replicaId = i;
FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
replicaId, numRetries, operationTimeoutNs), (r, e) -> {
if (e != null) {
LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
error.compareAndSet(null, e);
} else {
AtomicUtils.updateMax(skippedEdits, r.longValue());
}
if (remainingTasks.decrementAndGet() == 0) {
if (error.get() != null) {
future.completeExceptionally(error.get());
// Do not use the elements other than the default replica as they may be null. We will fail
// earlier if the location for default replica is null.
final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
FutureUtils
.addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
if (e != null) {
LOG.warn("Failed to replicate to {}", replica, e);
error.compareAndSet(null, e);
} else {
future.complete(skippedEdits.get());
AtomicUtils.updateMax(skippedEdits, r.longValue());
}
}
});
if (remainingTasks.decrementAndGet() == 0) {
if (error.get() != null) {
future.completeExceptionally(error.get());
} else {
future.complete(skippedEdits.get());
}
}
});
}
}

Expand All @@ -245,6 +250,10 @@ private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] enco
FutureUtils.addListener(locateFuture, (locs, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (locs.getDefaultRegionLocation() == null) {
future.completeExceptionally(
new HBaseIOException("No location found for default replica of table=" +
tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
} else {
replicate(future, locs, tableDesc, encodedRegionName, row, entries);
}
Expand Down