diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index b1c7ec09..f289bcb0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -27,6 +27,7 @@ import com.alipay.oceanbase.rpc.mutation.*; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregation; @@ -1829,6 +1830,7 @@ public ObPair getTableInternal(String tableName, TableEntry ObServerRoute route) throws Exception { ReplicaLocation replica = null; long tabletId = getTabletIdByPartId(tableEntry, partId); + long partitionId = partId; ObPartitionLocationInfo obPartitionLocationInfo = null; if (ObGlobal.obVsnMajor() >= 4) { @@ -1836,7 +1838,12 @@ public ObPair getTableInternal(String tableName, TableEntry replica = getPartitionLocation(obPartitionLocationInfo, route); } else { - ObPair partitionReplica = getPartitionReplica(tableEntry, partId, + if (tableEntry.isPartitionTable() + && null != tableEntry.getPartitionInfo().getSubPartDesc()) { + partitionId = ObPartIdCalculator.getPartIdx(partId, tableEntry + .getPartitionInfo().getSubPartDesc().getPartNum()); + } + ObPair partitionReplica = getPartitionReplica(tableEntry, partitionId, route); replica = partitionReplica.getRight(); } @@ -1861,7 +1868,7 @@ public ObPair getTableInternal(String tableName, TableEntry replica = getPartitionLocation(obPartitionLocationInfo, route); } else { tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false); - replica = getPartitionReplica(tableEntry, partId, route).getRight(); + replica = getPartitionReplica(tableEntry, partitionId, route).getRight(); } addr = replica.getAddr(); @@ -1872,9 +1879,8 @@ public ObPair getTableInternal(String tableName, TableEntry throw new ObTableGetException("Cannot get table by addr: " + addr); } } - ObTableParam param = null; + ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId); if (ObGlobal.obVsnMajor() >= 4) { - param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId); } else { param.setPartId(partId); param.setTableId(tableEntry.getTableId()); @@ -1890,7 +1896,9 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry, ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry() .getPartitionInfo(tabletId); if (!obPartitionLocationInfo.initialized.get()) { - tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId); + if (ObGlobal.obVsnMajor() >= 4) { + tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId); + } obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId); obPartitionLocationInfo.initializationLatch.await(); } @@ -1940,10 +1948,19 @@ private List> getPartitionReplica(TableEntry table ObPartitionLevel partitionLevel = tableEntry.getPartitionInfo().getLevel(); List partIds = getPartitionTablePartitionIds(tableEntry, startRow, startIncluded, endRow, endIncluded, partitionLevel); - for (Long partId : partIds) { - long tabletId = getTabletIdByPartId(tableEntry, partId); - ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId); - replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route))); + if (ObGlobal.obVsnMajor() >= 4) { + for (Long partId : partIds) { + long tabletId = getTabletIdByPartId(tableEntry, partId); + ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId); + replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route))); + } + } else { + for (Long partId : partIds) { + long partitionId = ObPartIdCalculator.getPartIdx(partId, tableEntry + .getPartitionInfo().getSubPartDesc().getPartNum()); + replicas.add(new ObPair(partId, getPartitionLocation( + tableEntry, partitionId, route))); + } } return replicas; @@ -3136,7 +3153,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.setTimeout(getOdpTable().getObTableOperationTimeout()); return getOdpTable().execute(request); } else { - int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries + int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries int tryTimes = 0; long startExecute = System.currentTimeMillis(); boolean needRefreshTableEntry = false; @@ -3158,7 +3175,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E try { // Recalculate partIdMapObTable // Clear the map before recalculating - partIdMapObTable.clear(); + partIdMapObTable.clear(); for (ObNewRange rang : tableQuery.getKeyRanges()) { ObRowKey startKey = rang.getStartKey(); int startKeySize = startKey.getObjs().size(); @@ -3167,11 +3184,21 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E Object[] start = new Object[startKeySize]; Object[] end = new Object[endKeySize]; for (int i = 0; i < startKeySize; i++) { - start[i] = startKey.getObj(i).getValue(); + ObObj curStart = startKey.getObj(i); + if (curStart.isMinObj()) { + start[i] = curStart; + } else { + start[i] = curStart.getValue(); + } } for (int i = 0; i < endKeySize; i++) { - end[i] = endKey.getObj(i).getValue(); + ObObj curEnd = endKey.getObj(i); + if (curEnd.isMaxObj()) { + end[i] = curEnd; + } else { + end[i] = curEnd.getValue(); + } } ObBorderFlag borderFlag = rang.getBorderFlag(); List> pairList = getTables(request.getTableName(), @@ -3182,7 +3209,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E } } - // Check if partIdMapObTable size is greater than 1 + // Check if partIdMapObTable size is greater than 1 if (partIdMapObTable.size() > 1) { throw new ObTablePartitionConsistentException( "query and mutate must be a atomic operation"); @@ -3195,7 +3222,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.setTimeout(tableParam.getObTable().getObTableOperationTimeout()); ObTable obTable = tableParam.getObTable(); - // Attempt to execute the operation + // Attempt to execute the operation return executeWithRetry(obTable, request, request.getTableName()); } catch (Exception ex) { tryTimes++; @@ -3212,7 +3239,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E tryTimes, ex); if (ex instanceof ObTableNeedFetchAllException) { - // Refresh table info + // Refresh table info getOrRefreshTableEntry(request.getTableName(), needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true); } } else { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 7662aa69..0c8b691d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -232,7 +232,8 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, } else if (e instanceof ObTableException) { if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e) .getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) - && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery() + && ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()) + || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery())) && client.getTableGroupInverted().get(indexTableName) != null) { // table not exists && hbase mode && table group exists , three condition both client.eraseTableGroupFromCache(tableName);