Skip to content
163 changes: 102 additions & 61 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
tryTimes);
if (ex instanceof ObTableNeedFetchAllException) {
needFetchAllRouteInfo = true;
getOrRefreshTableEntry(tableName, true, true, true);
// reset failure count while fetch all route info
this.resetExecuteContinuousFailureCount(tableName);
}
Expand Down Expand Up @@ -767,7 +768,6 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
throw new IllegalArgumentException("table name is null");
}
boolean needRefreshTableEntry = false;
boolean needFetchAllRouteInfo = false;
int tryTimes = 0;
long startExecute = System.currentTimeMillis();
while (true) {
Expand All @@ -787,10 +787,14 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
obPair = new ObPair<Long, ObTableParam>(0L, new ObTableParam(odpTable));
} else {
if (null != callback.getRowKey()) {
// in the case of retry, the location always needs to be refreshed here
if (tryTimes > 1) {
TableEntry entry = getOrRefreshTableEntry(tableName, false, false, false);
Long partId = getPartition(entry, callback.getRowKey());
refreshTableLocationByTabletId(entry, tableName, getTabletIdByPartId(entry, partId));
}
// using row key
obPair = getTable(tableName, callback.getRowKey(),
needRefreshTableEntry, tableEntryRefreshIntervalWait,
needFetchAllRouteInfo, route);
obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry, tableEntryRefreshIntervalWait, false, route);
} else if (null != callback.getKeyRanges()) {
// using scan range
obPair = getTable(tableName, new ObTableQuery(),
Expand Down Expand Up @@ -852,7 +856,7 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
((ObTableException) ex).getErrorCode(), ex.getMessage(),
tryTimes);
if (ex instanceof ObTableNeedFetchAllException) {
needFetchAllRouteInfo = true;
getOrRefreshTableEntry(tableName, needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
// reset failure count while fetch all route info
this.resetExecuteContinuousFailureCount(tableName);
}
Expand Down Expand Up @@ -1336,33 +1340,14 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
}
long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
long currentTime = System.currentTimeMillis();
if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) {
return tableEntry;
}

Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock());

if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) {
String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.",
dataSourceName, tableName, tableEntryRefreshLockTimeout);
RUNTIME.error(errMsg);
throw new ObTableEntryRefreshException(errMsg);
}

try {
lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
currentTime = System.currentTimeMillis();
if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
return tableEntry;
}
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);

tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
} finally {
lock.unlock();
}
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());

} catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
RUNTIME.error("RefreshTableEntry encountered an exception", e);
Expand Down Expand Up @@ -1676,7 +1661,9 @@ private ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey,
}

long partId = getPartition(tableEntry, row); // partition id in 3.x, origin partId in 4.x, logicId

if (refresh) {
refreshTableLocationByTabletId(tableEntry, tableName, getTabletIdByPartId(tableEntry, partId));
}
return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route);
}

Expand Down Expand Up @@ -3149,41 +3136,95 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
request.setTimeout(getOdpTable().getObTableOperationTimeout());
return getOdpTable().execute(request);
} else {
int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries
int tryTimes = 0;
long startExecute = System.currentTimeMillis();
boolean needRefreshTableEntry = false;
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
for (ObNewRange rang : tableQuery.getKeyRanges()) {
ObRowKey startKey = rang.getStartKey();
int startKeySize = startKey.getObjs().size();
ObRowKey endKey = rang.getEndKey();
int endKeySize = endKey.getObjs().size();
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
start[i] = startKey.getObj(i).getValue();
while (true) {
long currentExecute = System.currentTimeMillis();
long costMillis = currentExecute - startExecute;
if (costMillis > getRuntimeMaxWait()) {
logger.error(
"tablename:{} it has tried " + tryTimes
+ " times and it has waited " + costMillis
+ "/ms which exceeds response timeout "
+ getRuntimeMaxWait() + "/ms", request.getTableName());
throw new ObTableTimeoutExcetion("it has tried " + tryTimes
+ " times and it has waited " + costMillis
+ "/ms which exceeds response timeout "
+ getRuntimeMaxWait() + "/ms");
}
try {
// Recalculate partIdMapObTable
// Clear the map before recalculating
partIdMapObTable.clear();
for (ObNewRange rang : tableQuery.getKeyRanges()) {
ObRowKey startKey = rang.getStartKey();
int startKeySize = startKey.getObjs().size();
ObRowKey endKey = rang.getEndKey();
int endKeySize = endKey.getObjs().size();
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
start[i] = startKey.getObj(i).getValue();
}

for (int i = 0; i < endKeySize; i++) {
end[i] = endKey.getObj(i).getValue();
}
ObBorderFlag borderFlag = rang.getBorderFlag();
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
tableQuery, start, borderFlag.isInclusiveStart(), end,
borderFlag.isInclusiveEnd(), false, false);
for (ObPair<Long, ObTableParam> pair : pairList) {
partIdMapObTable.put(pair.getLeft(), pair.getRight());
}
}
if (partIdMapObTable.size() > 1) {
throw new ObTablePartitionConsistentException(
"query and mutate must be a atomic operation");
}
for (int i = 0; i < endKeySize; i++) {
end[i] = endKey.getObj(i).getValue();
}
ObBorderFlag borderFlag = rang.getBorderFlag();
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
tableQuery, start, borderFlag.isInclusiveStart(), end,
borderFlag.isInclusiveEnd(), needRefreshTableEntry, isTableEntryRefreshIntervalWait());
for (ObPair<Long, ObTableParam> pair : pairList) {
partIdMapObTable.put(pair.getLeft(), pair.getRight());
}
}

for (Long partId : partIdMapObTable.keySet()) {
ObTableParam tableParam = partIdMapObTable.get(partId);
request.setTableId(tableParam.getTableId());
request.setPartitionId(tableParam.getPartitionId());
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
ObTable obTable = tableParam.getObTable();
return executeWithRetry(obTable, request, request.getTableName());
// Check if partIdMapObTable size is greater than 1
if (partIdMapObTable.size() > 1) {
throw new ObTablePartitionConsistentException(
"query and mutate must be a atomic operation");
}
// Proceed with the operation
Map.Entry<Long, ObTableParam> entry = partIdMapObTable.entrySet().iterator().next();
ObTableParam tableParam = entry.getValue();
request.setTableId(tableParam.getTableId());
request.setPartitionId(tableParam.getPartitionId());
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
ObTable obTable = tableParam.getObTable();

// Attempt to execute the operation
return executeWithRetry(obTable, request, request.getTableName());
} catch (Exception ex) {
tryTimes++;
if (ex instanceof ObTableException && ((ObTableException) ex).isNeedRefreshTableEntry()) {
needRefreshTableEntry = true;
logger.warn(
"tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), ex);

if (isRetryOnChangeMasterTimes() && tryTimes <= maxRetries) {
logger.warn(
"tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
tryTimes, ex);

if (ex instanceof ObTableNeedFetchAllException) {
// Refresh table info
getOrRefreshTableEntry(request.getTableName(), needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
}
} else {
calculateContinuousFailure(request.getTableName(), ex.getMessage());
throw ex;
}
} else {
calculateContinuousFailure(request.getTableName(), ex.getMessage());
// Handle other exceptions or rethrow
throw ex;
}
}
}
}
}
Expand Down
Loading