Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 25 additions & 41 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
tableParam = new ObTableParam(odpTable);
} else {
if (tryTimes > 1 && needRefreshPartitionLocation) {
needRefreshPartitionLocation = false;
// refresh partition location
TableEntry entry = tableRoute.getTableEntry(tableName);
long partId = tableRoute.getPartId(entry, rowKey);
Expand All @@ -485,8 +486,8 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
}
tableParam = getTableParamWithRoute(tableName, rowKey, route);
}
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}",
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}, ls_id: {}, tablet_id: {}",
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId(), tableParam.getTabletId());
T t = callback.execute(tableParam);
resetExecuteContinuousFailureCount(tableName);
return t;
Expand Down Expand Up @@ -687,6 +688,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
} else {
if (null != callback.getRowKey()) {
if (tryTimes > 1 && needRefreshPartitionLocation) {
needRefreshPartitionLocation = false;
// refresh partition location
TableEntry entry = tableRoute.getTableEntry(tableName);
long partId = tableRoute.getPartId(entry, callback.getRowKey());
Expand All @@ -696,6 +698,11 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
// using row key
tableParam = tableRoute.getTableParamWithRoute(tableName, callback.getRowKey(), route);
} else if (null != callback.getQuery()) {
if (tryTimes > 1 && needRefreshPartitionLocation) {
needRefreshPartitionLocation = false;
boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV;
tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV);
}
ObTableQuery tableQuery = callback.getQuery().getObTableQuery();
// using scan range
tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(),
Expand All @@ -704,8 +711,8 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
throw new ObTableException("RowKey or scan range is null");
}
}
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}",
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}, ls_id: {}, tablet_id: {}",
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId(), tableParam.getTabletId());
T t = callback.execute(tableParam);
resetExecuteContinuousFailureCount(tableName);
return t;
Expand Down Expand Up @@ -2260,8 +2267,8 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
return getOdpTable().execute(request);
} else {
int tryTimes = 0;
boolean needRefreshTabletLocation = false;
long startExecute = System.currentTimeMillis();
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
while (true) {
long currentExecute = System.currentTimeMillis();
long costMillis = currentExecute - startExecute;
Expand All @@ -2278,40 +2285,14 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
}
try {
// Recalculate partIdMapObTable
// Clear the map before recalculating
partIdMapObTable.clear();
for (ObNewRange rang : tableQuery.getKeyRanges()) {
ObRowKey startKey = rang.getStartKey();
int startKeySize = startKey.getObjs().size();
ObRowKey endKey = rang.getEndKey();
int endKeySize = endKey.getObjs().size();
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
ObObj curStart = startKey.getObj(i);
if (curStart.isMinObj()) {
start[i] = curStart;
} else {
start[i] = curStart.getValue();
}
}

for (int i = 0; i < endKeySize; i++) {
ObObj curEnd = endKey.getObj(i);
if (curEnd.isMaxObj()) {
end[i] = curEnd;
} else {
end[i] = curEnd.getValue();
}
}
ObBorderFlag borderFlag = rang.getBorderFlag();
List<ObTableParam> params = getTableParams(request.getTableName(),
tableQuery, start, borderFlag.isInclusiveStart(), end,
borderFlag.isInclusiveEnd());
for (ObTableParam param : params) {
partIdMapObTable.put(param.getPartId(), param);
}
if (needRefreshTabletLocation) {
needRefreshTabletLocation = false;
boolean isHKV = request.getEntityType() == ObTableEntityType.HKV;
tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV);
}
Map<Long, ObTableParam> partIdMapObTable = tableRoute.getPartIdParamMapForQuery(
request.getTableName(), tableQuery.getScanRangeColumns(),
tableQuery.getKeyRanges());

// Check if partIdMapObTable size is greater than 1
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();
Expand Down Expand Up @@ -2344,9 +2325,12 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
tryTimes, ex);

if (ex instanceof ObTableNeedFetchMetaException) {
// Refresh table info
refreshMeta(request.getTableName());
if (((ObTableException) ex).isNeedRefreshTableEntry()) {
needRefreshTabletLocation = true;
if (ex instanceof ObTableNeedFetchMetaException) {
// Refresh table info
refreshMeta(request.getTableName());
}
}
} else {
calculateContinuousFailure(request.getTableName(), ex.getMessage());
Expand Down
149 changes: 72 additions & 77 deletions src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,33 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception
}
}

public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery query, boolean isHKV) throws Exception {
Map<Long, ObTableParam> partIdParamMap = getPartIdParamMapForQuery(tableName, query.getScanRangeColumns(), query.getKeyRanges());
if (isHKV) {
// for HBase process, if distributed function is enabled, no need to do routing refresh
boolean isDistributedSupported = getServerCapacity().isSupportDistributedExecute();
if (partIdParamMap.size() > 1 && !isDistributedSupported) {
throw new ObTablePartitionConsistentException(
"query and mutate must be a atomic operation");
} else if (isDistributedSupported) {
return;
}
} else {
// for table process, distributed function is not supported yet, need to refresh routing
// for now only support to query single tablet
if (partIdParamMap.size() > 1) {
throw new ObTablePartitionConsistentException(
"query and mutate must be a atomic operation");
} else if (partIdParamMap.isEmpty()) {
throw new ObTableException("could not find part id of range");
}
}
Map.Entry<Long, ObTableParam> entry = partIdParamMap.entrySet().iterator().next();
TableEntry tableEntry = getTableEntry(tableName);
long tabletId = entry.getValue().getTabletId();
refreshPartitionLocation(tableName, tabletId, tableEntry);
}

private Long[] getTabletsFromTableEntry(TableEntry tableEntry) {
Long[] tablets = null;
if (tableEntry.isPartitionTable()) {
Expand Down Expand Up @@ -693,6 +720,44 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry,
return obPartitionLocationInfo;
}

public Map<Long, ObTableParam> getPartIdParamMapForQuery(String tableName, List<String> scanRangeColumns,
List<ObNewRange> keyRanges) throws Exception {
Map<Long, ObTableParam> parIdParamMapObTable = new HashMap<Long, ObTableParam>();
for (ObNewRange keyRange : keyRanges) {
ObRowKey startKey = keyRange.getStartKey();
int startKeySize = startKey.getObjs().size();
ObRowKey endKey = keyRange.getEndKey();
int endKeySize = endKey.getObjs().size();
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
ObObj curStart = startKey.getObj(i);
if (curStart.isMinObj()) {
start[i] = curStart;
} else {
start[i] = curStart.getValue();
}
}

for (int i = 0; i < endKeySize; i++) {
ObObj curEnd = endKey.getObj(i);
if (curEnd.isMaxObj()) {
end[i] = curEnd;
} else {
end[i] = curEnd.getValue();
}
}
ObBorderFlag borderFlag = keyRange.getBorderFlag();
List<ObTableParam> paramList = getTablesInternal(tableName, scanRangeColumns, start,
borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
tableClient.getRoute(false));
for (ObTableParam param : paramList) {
parIdParamMapObTable.put(param.getPartId(), param);
}
}
return parIdParamMapObTable;
}

/**
* get addr by partId
* @param tableName table want to get
Expand Down Expand Up @@ -869,50 +934,18 @@ public ObTableParam getTableParam(String tableName, List<String> scanRangeColumn
*/
public ObTableParam getTableParam(String tableName, List<String> scanRangeColumns,
List<ObNewRange> keyRanges) throws Exception {
Map<Long, ObTableParam> tabletIdIdMapObTable = new HashMap<Long, ObTableParam>();
for (ObNewRange keyRange : keyRanges) {
ObRowKey startKey = keyRange.getStartKey();
int startKeySize = startKey.getObjs().size();
ObRowKey endKey = keyRange.getEndKey();
int endKeySize = endKey.getObjs().size();
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
ObObj curStart = startKey.getObj(i);
if (curStart.isMinObj()) {
start[i] = curStart;
} else {
start[i] = curStart.getValue();
}
}

for (int i = 0; i < endKeySize; i++) {
ObObj curEnd = endKey.getObj(i);
if (curEnd.isMaxObj()) {
end[i] = curEnd;
} else {
end[i] = curEnd.getValue();
}
}
ObBorderFlag borderFlag = keyRange.getBorderFlag();
List<ObTableParam> paramList = getTablesInternal(tableName, scanRangeColumns, start,
borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
tableClient.getRoute(false));
for (ObTableParam param : paramList) {
tabletIdIdMapObTable.put(param.getTabletId(), param);
}
}
Map<Long, ObTableParam> partIdIdMapObTable = getPartIdParamMapForQuery(
tableName, scanRangeColumns, keyRanges);
// for now only support to query single tablet
if (tabletIdIdMapObTable.size() > 1) {
if (partIdIdMapObTable.size() > 1) {
throw new ObTablePartitionConsistentException(
"query and mutate must be a atomic operation");
} else if (tabletIdIdMapObTable.size() < 1) {
} else if (partIdIdMapObTable.isEmpty()) {
throw new ObTableException("could not find part id of range");
}
ObTableParam ans = null;
for (Long tabletId : tabletIdIdMapObTable.keySet()) {
ans = tabletIdIdMapObTable.get(tabletId);
}
Map.Entry<Long, ObTableParam> entry = partIdIdMapObTable.entrySet().iterator().next();
ans = entry.getValue();
return ans;
}

Expand Down Expand Up @@ -979,45 +1012,7 @@ private List<ObTableParam> getTablesInternal(String tableName, List<String> scan
List<ObTableParam> params = new ArrayList<>();
for (ObPair<Long, ReplicaLocation> partIdWithReplica : partIdWithReplicaList) {
long partId = partIdWithReplica.getLeft();
long tabletId = getTabletIdByPartId(tableEntry, partId);
ReplicaLocation replica = partIdWithReplica.getRight();
ObServerAddr addr = replica.getAddr();
ObTable obTable = tableRoster.getTable(addr);
int retryTimes = 0;
while (obTable == null && retryTimes < 2) {
++retryTimes;
// need to refresh table roster to ensure the current roster is the latest
tableClient.syncRefreshMetadata(true);
// the addr is wrong, need to refresh location
if (logger.isInfoEnabled()) {
logger.info("Cannot get ObTable by addr {}, refreshing metadata.", addr);
}
// refresh tablet location based on the latest roster, in case that some of the observers hase been killed
// and used the old location
tableEntry = refreshPartitionLocation(tableName, tabletId, tableEntry);
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replica = getPartitionLocation(locationInfo, route);

if (replica == null) {
RUNTIME.error("Cannot get replica by tableName: {}, tabletId: {}", tableName, tabletId);
throw new ObTableGetException("Cannot get replica by tableName: " + tableName + ", tabletId: " + tabletId);
}
addr = replica.getAddr();
obTable = tableRoster.getTable(addr);
}
if (obTable == null) {
RUNTIME.error("cannot get table by addr: " + addr);
throw new ObTableGetException("obTable is null, addr is: " + addr.getIp() + ":" + addr.getSvrPort());
}

ObTableParam param = new ObTableParam(obTable);
param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getTabletLsId());
param.setTableId(tableEntry.getTableId());
param.setPartId(partId);
// real tablet id
param.setPartitionId(tabletId);

addr.recordAccess();
ObTableParam param = getTableInternal(tableName, tableEntry, partId, route);
params.add(param);
}
return params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
Map.Entry<Long, TabletOperationsMap> lsOperation)
throws Exception {
long lsId = lsOperation.getKey();
logger.debug("ls batch lsId: {}", lsId);
TabletOperationsMap tabletOperationsMap = lsOperation.getValue();
if (tabletOperationsMap.isEmpty()) {
logger.warn("the size of tablet operations in ls operation is zero");
Expand Down Expand Up @@ -675,7 +676,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
}
} else if (ex instanceof ObTableException) {
if (((ObTableException) ex).isNeedRefreshTableEntry()) {
logger.warn("meet need refresh exception, errCode: {}, ls id: {}, errMsg: {}", ((ObTableException) ex).getErrorCode(), lsId, ex.getMessage());
logger.warn("meet need refresh exception, errCode: {}, ls id: {}, table_id: {}, errMsg: {}", ((ObTableException) ex).getErrorCode(), lsId, tableId, ex.getMessage());
if ((((ObTableException) ex).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode ||
((ObTableException) ex).getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode)
&& obTableClient.isTableGroupName(tableName)
Expand Down