Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,41 +79,6 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
return Collections.unmodifiableMap(newTableCFsMap);
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder} to create new ReplicationPeerConfig.
*/
@Deprecated
public ReplicationPeerConfig() {
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
this.configuration = new HashMap<>(0);
this.serial = false;
}

/**
* Set the clusterKey which is the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setClusterKey(String)} instead.
*/
@Deprecated
public ReplicationPeerConfig setClusterKey(String clusterKey) {
this.clusterKey = clusterKey;
return this;
}

/**
* Sets the ReplicationEndpoint plugin class for this peer.
* @param replicationEndpointImpl a class implementing ReplicationEndpoint
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setReplicationEndpointImpl(String)} instead.
*/
@Deprecated
public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
this.replicationEndpointImpl = replicationEndpointImpl;
return this;
}

public String getClusterKey() {
return clusterKey;
}
Expand All @@ -134,88 +99,26 @@ public Map<TableName, List<String>> getTableCFsMap() {
return (Map<TableName, List<String>>) tableCFsMap;
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setTableCFsMap(Map)} instead.
*/
@Deprecated
public ReplicationPeerConfig setTableCFsMap(Map<TableName,
? extends Collection<String>> tableCFsMap) {
this.tableCFsMap = tableCFsMap;
return this;
}

public Set<String> getNamespaces() {
return this.namespaces;
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setNamespaces(Set)} instead.
*/
@Deprecated
public ReplicationPeerConfig setNamespaces(Set<String> namespaces) {
this.namespaces = namespaces;
return this;
}

public long getBandwidth() {
return this.bandwidth;
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setBandwidth(long)} instead.
*/
@Deprecated
public ReplicationPeerConfig setBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
return this;
}

public boolean replicateAllUserTables() {
return this.replicateAllUserTables;
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setReplicateAllUserTables(boolean)} instead.
*/
@Deprecated
public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) {
this.replicateAllUserTables = replicateAllUserTables;
return this;
}

public Map<TableName, List<String>> getExcludeTableCFsMap() {
return (Map<TableName, List<String>>) excludeTableCFsMap;
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setExcludeTableCFsMap(Map)} instead.
*/
@Deprecated
public ReplicationPeerConfig setExcludeTableCFsMap(Map<TableName,
? extends Collection<String>> tableCFsMap) {
this.excludeTableCFsMap = tableCFsMap;
return this;
}

public Set<String> getExcludeNamespaces() {
return this.excludeNamespaces;
}

/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setExcludeNamespaces(Set)} instead.
*/
@Deprecated
public ReplicationPeerConfig setExcludeNamespaces(Set<String> namespaces) {
this.excludeNamespaces = namespaces;
return this;
}

public String getRemoteWALDir() {
return this.remoteWALDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
Expand Down Expand Up @@ -108,10 +109,8 @@ public void clearPeerAndQueues() throws IOException, ReplicationException {

@Test
public void testAddRemovePeer() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_TWO);
ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build();
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
// try adding the same (fails)
Expand Down Expand Up @@ -142,10 +141,11 @@ public void testAddRemovePeer() throws Exception {

@Test
public void testPeerConfig() throws Exception {
ReplicationPeerConfig config = new ReplicationPeerConfig();
config.setClusterKey(KEY_ONE);
config.getConfiguration().put("key1", "value1");
config.getConfiguration().put("key2", "value2");
ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder()
.setClusterKey(KEY_ONE)
.putConfiguration("key1", "value1")
.putConfiguration("key2", "value2")
.build();
admin.addReplicationPeer(ID_ONE, config).join();

List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
Expand All @@ -160,8 +160,7 @@ public void testPeerConfig() throws Exception {

@Test
public void testEnableDisablePeer() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
admin.addReplicationPeer(ID_ONE, rpc1).join();
List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
assertEquals(1, peers.size());
Expand All @@ -176,8 +175,8 @@ public void testEnableDisablePeer() throws Exception {

@Test
public void testAppendPeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfigBuilder rpcBuilder =
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
Expand All @@ -186,9 +185,9 @@ public void testAppendPeerTableCFs() throws Exception {
final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");

// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
rpc1.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
rpcBuilder.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();

Map<TableName, List<String>> tableCFs = new HashMap<>();

Expand Down Expand Up @@ -280,16 +279,16 @@ public void testAppendPeerTableCFs() throws Exception {

@Test
public void testRemovePeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfigBuilder rpcBuilder =
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
rpc1.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
rpcBuilder.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();

Map<TableName, List<String>> tableCFs = new HashMap<>();
try {
Expand Down Expand Up @@ -369,30 +368,28 @@ public void testSetPeerNamespaces() throws Exception {
String ns1 = "ns1";
String ns2 = "ns2";

ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpc).join();
rpc.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
ReplicationPeerConfigBuilder rpcBuilder =
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
rpcBuilder.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();

// add ns1 and ns2 to peer config
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
Set<String> namespaces = new HashSet<>();
namespaces.add(ns1);
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
rpcBuilder.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
assertEquals(2, namespaces.size());
assertTrue(namespaces.contains(ns1));
assertTrue(namespaces.contains(ns2));

// update peer config only contains ns1
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
namespaces = new HashSet<>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
rpcBuilder.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
assertEquals(1, namespaces.size());
assertTrue(namespaces.contains(ns1));
Expand All @@ -407,40 +404,36 @@ public void testNamespacesAndTableCfsConfigConflict() throws Exception {
final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");

ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpc).join();
rpc.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
ReplicationPeerConfigBuilder rpcBuilder =
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
rpcBuilder.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();

rpc = admin.getReplicationPeerConfig(ID_ONE).get();
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
rpcBuilder.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName1, new ArrayList<>());
rpc.setTableCFsMap(tableCfs);
rpcBuilder.setTableCFsMap(tableCfs);
try {
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
fail(
"Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
} catch (CompletionException e) {
// OK
}

rpc = admin.getReplicationPeerConfig(ID_ONE).get();
tableCfs.clear();
tableCfs.put(tableName2, new ArrayList<>());
rpc.setTableCFsMap(tableCfs);
admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
rpcBuilder.setTableCFsMap(tableCfs);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
namespaces.clear();
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
rpcBuilder.setNamespaces(namespaces);
try {
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
fail(
"Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
} catch (CompletionException e) {
Expand All @@ -452,15 +445,14 @@ public void testNamespacesAndTableCfsConfigConflict() throws Exception {

@Test
public void testPeerBandwidth() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
ReplicationPeerConfigBuilder rpcBuilder =
ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);

admin.addReplicationPeer(ID_ONE, rpc).join();
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
assertEquals(0, rpc.getBandwidth());
admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();;
assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth());

rpc.setBandwidth(2097152);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
rpcBuilder.setBandwidth(2097152);
admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());

admin.removeReplicationPeer(ID_ONE).join();
Expand Down
Loading