Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,49 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po

this.id = COUNTER.incrementAndGet();

this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
ConnectionConfiguration connConf =
hc.getConfiguration() == conf
? hc.getConnectionConfiguration()
// Slow: parse conf in ConnectionConfiguration constructor
: new ConnectionConfiguration(conf);
if (connConf == null) {
// Slow: parse conf in ConnectionConfiguration constructor
connConf = new ConnectionConfiguration(conf);
}

this.pause = connConf.getPause();
this.numTries = connConf.getRetriesNumber();
this.rpcTimeout = rpcTimeout;
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);

this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);

this.operationTimeout = connConf.getOperationTimeout();

// Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put
// Can be null when constructing hc's AsyncProcess or it's not reusable
AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess() : null;

this.primaryCallTimeoutMicroseconds =
globalAsyncProcess == null
? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000)
: globalAsyncProcess.primaryCallTimeoutMicroseconds;

this.maxTotalConcurrentTasks =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)
: globalAsyncProcess.maxTotalConcurrentTasks;
this.maxConcurrentTasksPerServer =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS)
: globalAsyncProcess.maxConcurrentTasksPerServer;
this.maxConcurrentTasksPerRegion =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS)
: globalAsyncProcess.maxConcurrentTasksPerRegion;
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
globalAsyncProcess == null
? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT)
: globalAsyncProcess.startLogErrorsCnt;

if (this.maxTotalConcurrentTasks <= 0) {
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
Expand Down Expand Up @@ -335,8 +360,10 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po
this.rpcFactory = rpcFactory;

this.thresholdToLogUndoneTaskDetails =
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
globalAsyncProcess == null
? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS)
: globalAsyncProcess.thresholdToLogUndoneTaskDetails;
}

public void setRpcTimeout(int rpcTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
Expand Down Expand Up @@ -90,16 +89,17 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.pool = params.getPool();
this.listener = params.getListener();

ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
ConnectionConfiguration connConf = conn.getConnectionConfiguration();
if (connConf == null) {
// Slow: parse conf in ConnectionConfiguration constructor
connConf = new ConnectionConfiguration(conf);
}
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
params.getWriteBufferSize() : connConf.getWriteBufferSize();
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
this.rpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize();
this.rpcTimeout = connConf.getRpcTimeout();
this.operationTimeout = connConf.getOperationTimeout();
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, rpcTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ConnectionConfiguration {
private final int replicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
private final int rpcTimeout;
private final long pause;

/**
* Constructor
Expand Down Expand Up @@ -71,9 +73,14 @@ public class ConnectionConfiguration {
conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms

this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);

this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);

this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);

this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
}

/**
Expand All @@ -92,6 +99,8 @@ protected ConnectionConfiguration() {
this.replicaCallTimeoutMicroSecondScan = 1000000;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
}

public long getWriteBufferSize() {
Expand Down Expand Up @@ -129,4 +138,12 @@ public int getMaxKeyValueSize() {
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}

public int getRpcTimeout() {
return rpcTimeout;
}

public long getPause() {
return pause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,11 @@ static class HConnectionImplementation implements ClusterConnection, Closeable {
this.managed = managed;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.pause = connectionConfig.getPause();
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS);
this.numTries = connectionConfig.getRetriesNumber();
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.rpcTimeout = connectionConfig.getRpcTimeout();
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
synchronized (nonceGeneratorCreateLock) {
if (ConnectionManager.nonceGenerator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,7 @@ private void finishSetup() throws IOException {
}
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.rpcTimeout = connConfiguration.getRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
Expand Down