Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean isSerial = false;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
* as protected for possible overridings.
*/
protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a quick comment why these exist as protected? Otherwise it can confuse the readers because the only callers in the source are in this class.

return ClusterConnectionFactory.createAsyncClusterConnection(conf,
null, User.getCurrent());
}

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* ReplicationSinkManager implementations, or initialize it in a different way,
* so defining createReplicationSinkManager as protected for possible overridings.
*/
protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) {
return new ReplicationSinkManager(conn, this, this.conf);
}

@Override
public void init(Context context) throws IOException {
super.init(context);
Expand All @@ -131,13 +150,12 @@ public void init(Context context) throws IOException {
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn =
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
this.conn = createConnection(this.conf);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
Expand Down