Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alipay.oceanbase.rpc.mutation.*;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregation;
Expand Down Expand Up @@ -1829,14 +1830,20 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
ObServerRoute route) throws Exception {
ReplicaLocation replica = null;
long tabletId = getTabletIdByPartId(tableEntry, partId);
long partitionId = partId;
ObPartitionLocationInfo obPartitionLocationInfo = null;
if (ObGlobal.obVsnMajor() >= 4) {

obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);

replica = getPartitionLocation(obPartitionLocationInfo, route);
} else {
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
if (tableEntry.isPartitionTable()
&& null != tableEntry.getPartitionInfo().getSubPartDesc()) {
partitionId = ObPartIdCalculator.getPartIdx(partId, tableEntry
.getPartitionInfo().getSubPartDesc().getPartNum());
}
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partitionId,
route);
replica = partitionReplica.getRight();
}
Expand All @@ -1861,7 +1868,7 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
replica = getPartitionLocation(obPartitionLocationInfo, route);
} else {
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
replica = getPartitionReplica(tableEntry, partId, route).getRight();
replica = getPartitionReplica(tableEntry, partitionId, route).getRight();
}

addr = replica.getAddr();
Expand All @@ -1872,9 +1879,8 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
throw new ObTableGetException("Cannot get table by addr: " + addr);
}
}
ObTableParam param = null;
ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
if (ObGlobal.obVsnMajor() >= 4) {
param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
} else {
param.setPartId(partId);
param.setTableId(tableEntry.getTableId());
Expand All @@ -1890,7 +1896,9 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry,
ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry()
.getPartitionInfo(tabletId);
if (!obPartitionLocationInfo.initialized.get()) {
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
if (ObGlobal.obVsnMajor() >= 4) {
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
}
obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
obPartitionLocationInfo.initializationLatch.await();
}
Expand Down Expand Up @@ -1940,10 +1948,19 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
ObPartitionLevel partitionLevel = tableEntry.getPartitionInfo().getLevel();
List<Long> partIds = getPartitionTablePartitionIds(tableEntry, startRow, startIncluded, endRow, endIncluded, partitionLevel);

for (Long partId : partIds) {
long tabletId = getTabletIdByPartId(tableEntry, partId);
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
if (ObGlobal.obVsnMajor() >= 4) {
for (Long partId : partIds) {
long tabletId = getTabletIdByPartId(tableEntry, partId);
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
}
} else {
for (Long partId : partIds) {
long partitionId = ObPartIdCalculator.getPartIdx(partId, tableEntry
.getPartitionInfo().getSubPartDesc().getPartNum());
replicas.add(new ObPair<Long, ReplicaLocation>(partId, getPartitionLocation(
tableEntry, partitionId, route)));
}
}

return replicas;
Expand Down Expand Up @@ -3136,7 +3153,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
request.setTimeout(getOdpTable().getObTableOperationTimeout());
return getOdpTable().execute(request);
} else {
int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries
int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries
int tryTimes = 0;
long startExecute = System.currentTimeMillis();
boolean needRefreshTableEntry = false;
Expand All @@ -3158,7 +3175,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
try {
// Recalculate partIdMapObTable
// Clear the map before recalculating
partIdMapObTable.clear();
partIdMapObTable.clear();
for (ObNewRange rang : tableQuery.getKeyRanges()) {
ObRowKey startKey = rang.getStartKey();
int startKeySize = startKey.getObjs().size();
Expand All @@ -3167,11 +3184,21 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
start[i] = startKey.getObj(i).getValue();
ObObj curStart = startKey.getObj(i);
if (curStart.isMinObj()) {
start[i] = curStart;
} else {
start[i] = curStart.getValue();
}
}

for (int i = 0; i < endKeySize; i++) {
end[i] = endKey.getObj(i).getValue();
ObObj curEnd = endKey.getObj(i);
if (curEnd.isMaxObj()) {
end[i] = curEnd;
} else {
end[i] = curEnd.getValue();
}
}
ObBorderFlag borderFlag = rang.getBorderFlag();
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
Expand All @@ -3182,7 +3209,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
}
}

// Check if partIdMapObTable size is greater than 1
// Check if partIdMapObTable size is greater than 1
if (partIdMapObTable.size() > 1) {
throw new ObTablePartitionConsistentException(
"query and mutate must be a atomic operation");
Expand All @@ -3195,7 +3222,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
ObTable obTable = tableParam.getObTable();

// Attempt to execute the operation
// Attempt to execute the operation
return executeWithRetry(obTable, request, request.getTableName());
} catch (Exception ex) {
tryTimes++;
Expand All @@ -3212,7 +3239,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
tryTimes, ex);

if (ex instanceof ObTableNeedFetchAllException) {
// Refresh table info
// Refresh table info
getOrRefreshTableEntry(request.getTableName(), needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
} else if (e instanceof ObTableException) {
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e)
.getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode)
&& ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery())
|| (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()))
&& client.getTableGroupInverted().get(indexTableName) != null) {
// table not exists && hbase mode && table group exists , three condition both
client.eraseTableGroupFromCache(tableName);
Expand Down