From fe498bbb0b85f7e2476a4c9271be812ba53e2d85 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Fri, 10 Jan 2020 14:19:38 +0000 Subject: [PATCH 1/3] HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible Defines create methods for Connection and ReplicationSinkManager instances. --- .../HBaseInterClusterReplicationEndpoint.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index ccdcee13f3b3..138765ac0062 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -113,6 +113,15 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean dropOnDeletedTables; private boolean isSerial = false; + protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { + return ClusterConnectionFactory.createAsyncClusterConnection(conf, + null, User.getCurrent()); + } + + protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) { + return new ReplicationSinkManager(conn,this, this.conf); + } + @Override public void init(Context context) throws IOException { super.init(context); @@ -131,13 +140,12 @@ public void init(Context context) throws IOException { // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = - ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); + this.conn = (AsyncClusterConnection) createConnection(this.conf); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us - this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf); + this.replicationSinkMgr = createReplicationSinkManager(conn); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); From 02498b7f182a93dd83134f2e4c07fad14ece3fb8 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 13 Jan 2020 12:47:19 +0000 Subject: [PATCH 2/3] fixed spacing Change-Id: I0fe5b951a58fb6fe08b899a6805b870f1b3b9777 --- .../regionserver/HBaseInterClusterReplicationEndpoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 138765ac0062..0dbb6513b1ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -119,7 +119,7 @@ protected AsyncClusterConnection createConnection(Configuration conf) throws IOE } protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) { - return new ReplicationSinkManager(conn,this, this.conf); + return new ReplicationSinkManager(conn, this, this.conf); } @Override From 94a9456847a06cf0ddbcade12285dbd34c6d143d Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 14 Jan 2020 13:20:00 +0000 Subject: [PATCH 3/3] Fixing latest round of nit reviews Change-Id: I156f5cc212666babcc644f24dd35d03634451963 --- .../HBaseInterClusterReplicationEndpoint.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 0dbb6513b1ae..6f1f8b379aef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -113,11 +113,21 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean dropOnDeletedTables; private boolean isSerial = false; + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different + * Connection implementations, or initialize it in a different way, so defining createConnection + * as protected for possible overridings. + */ protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); } + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different + * ReplicationSinkManager implementations, or initialize it in a different way, + * so defining createReplicationSinkManager as protected for possible overridings. + */ protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) { return new ReplicationSinkManager(conn, this, this.conf); } @@ -140,7 +150,7 @@ public void init(Context context) throws IOException { // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = (AsyncClusterConnection) createConnection(this.conf); + this.conn = createConnection(this.conf); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics();