Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,21 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "server/region/Admin.proto";

message StartReplicationSourceRequest {
required ServerName server_name = 1;
required string queue_id = 2;
}

message StartReplicationSourceResponse {
}

service ReplicationServerService {
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);
}

rpc StartReplicationSource(StartReplicationSourceRequest)
returns(StartReplicationSourceResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
* </pre>
*/
@InterfaceAudience.Private
class ZKReplicationQueueStorage extends ZKReplicationStorageBase
public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage {

private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
Expand Down Expand Up @@ -123,7 +123,7 @@ public String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}

private String getQueueNode(ServerName serverName, String queueId) {
public String getQueueNode(ServerName serverName, String queueId) {
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,8 @@ protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.Sta
throw new RuntimeException(e);
}
}

public ZKWatcher getZookeeper() {
return this.zookeeper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3396,7 +3396,7 @@ public ListReplicationSinkServersResponse listReplicationSinkServers(
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
builder.addAllServerName(master.listReplicationSinkServers().stream()
builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
Expand All @@ -271,7 +270,7 @@
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
ConfigurationObserver, ReplicationServerService.BlockingInterface {
ConfigurationObserver {
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);

/** RPC scheduler to use for the region server. */
Expand Down Expand Up @@ -1491,9 +1490,6 @@ protected List<BlockingServiceAndInterface> getServices() {
bssi.add(new BlockingServiceAndInterface(
AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(
ReplicationServerService.newReflectiveBlockingService(this),
ReplicationServerService.BlockingInterface.class));
}
return new org.apache.hbase.thirdparty.com.google.common.collect.
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,20 +321,26 @@ protected void chooseSinks() {
if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
useZk = false;
slaveAddresses = fetchSlavesAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Try fetch sinks by using zk.");
useZk = true;
}
} else {
useZk = true;
}
} catch (Throwable t) {
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t);
useZk = true;
}

if (useZk) {
slaveAddresses = fetchSlavesAddressesByZK();
}

if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
LOG.warn("No sinks available at peer. Will not be able to replicate.");
}

Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
synchronized (this) {
Expand Down Expand Up @@ -368,10 +374,10 @@ protected SinkPeer getReplicationSink() throws IOException {
}

private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
} else {
if (fetchServersUseZk) {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
} else {
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
}
}

Expand Down
Loading