From 05900c8f3f969b21aebf575e5b671e502f4a2037 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 12 Jun 2025 16:30:08 +0800 Subject: [PATCH 1/2] add debug log --- src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java | 8 ++++---- .../oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index db7ca5f0..af733261 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -485,8 +485,8 @@ private T execute(String tableName, TableExecuteCallback callback, ObServ } tableParam = getTableParamWithRoute(tableName, rowKey, route); } - logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}", - tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort()); + logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}, ls_id: {}, tablet_id: {}", + tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId(), tableParam.getTabletId()); T t = callback.execute(tableParam); resetExecuteContinuousFailureCount(tableName); return t; @@ -704,8 +704,8 @@ private T execute(String tableName, OperationExecuteCallback callback, throw new ObTableException("RowKey or scan range is null"); } } - logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}", - tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort()); + logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}, ls_id: {}, tablet_id: {}", + tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId(), tableParam.getTabletId()); T t = callback.execute(tableParam); resetExecuteContinuousFailureCount(tableName); return t; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index 4cd8899b..fd27e2ae 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -534,6 +534,7 @@ public void partitionExecute(ObTableSingleOpResult[] results, Map.Entry lsOperation) throws Exception { long lsId = lsOperation.getKey(); + logger.debug("ls batch lsId: {}", lsId); TabletOperationsMap tabletOperationsMap = lsOperation.getValue(); if (tabletOperationsMap.isEmpty()) { logger.warn("the size of tablet operations in ls operation is zero"); @@ -675,7 +676,7 @@ public void partitionExecute(ObTableSingleOpResult[] results, } } else if (ex instanceof ObTableException) { if (((ObTableException) ex).isNeedRefreshTableEntry()) { - logger.warn("meet need refresh exception, errCode: {}, ls id: {}, errMsg: {}", ((ObTableException) ex).getErrorCode(), lsId, ex.getMessage()); + logger.warn("meet need refresh exception, errCode: {}, ls id: {}, table_id: {}, errMsg: {}", ((ObTableException) ex).getErrorCode(), lsId, tableId, ex.getMessage()); if ((((ObTableException) ex).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) ex).getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode) && obTableClient.isTableGroupName(tableName) From 1232cb711cbc43e026409f3a50cf4c3f488b2830 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Jun 2025 12:15:16 +0800 Subject: [PATCH 2/2] refresh tablet location for atomic query --- .../alipay/oceanbase/rpc/ObTableClient.java | 58 +++---- .../rpc/location/model/TableRoute.java | 149 +++++++++--------- 2 files changed, 93 insertions(+), 114 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index af733261..67dd3202 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -477,6 +477,7 @@ private T execute(String tableName, TableExecuteCallback callback, ObServ tableParam = new ObTableParam(odpTable); } else { if (tryTimes > 1 && needRefreshPartitionLocation) { + needRefreshPartitionLocation = false; // refresh partition location TableEntry entry = tableRoute.getTableEntry(tableName); long partId = tableRoute.getPartId(entry, rowKey); @@ -687,6 +688,7 @@ private T execute(String tableName, OperationExecuteCallback callback, } else { if (null != callback.getRowKey()) { if (tryTimes > 1 && needRefreshPartitionLocation) { + needRefreshPartitionLocation = false; // refresh partition location TableEntry entry = tableRoute.getTableEntry(tableName); long partId = tableRoute.getPartId(entry, callback.getRowKey()); @@ -696,6 +698,11 @@ private T execute(String tableName, OperationExecuteCallback callback, // using row key tableParam = tableRoute.getTableParamWithRoute(tableName, callback.getRowKey(), route); } else if (null != callback.getQuery()) { + if (tryTimes > 1 && needRefreshPartitionLocation) { + needRefreshPartitionLocation = false; + boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV; + tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV); + } ObTableQuery tableQuery = callback.getQuery().getObTableQuery(); // using scan range tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(), @@ -2260,8 +2267,8 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E return getOdpTable().execute(request); } else { int tryTimes = 0; + boolean needRefreshTabletLocation = false; long startExecute = System.currentTimeMillis(); - Map partIdMapObTable = new HashMap(); while (true) { long currentExecute = System.currentTimeMillis(); long costMillis = currentExecute - startExecute; @@ -2278,40 +2285,14 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E } try { // Recalculate partIdMapObTable - // Clear the map before recalculating - partIdMapObTable.clear(); - for (ObNewRange rang : tableQuery.getKeyRanges()) { - ObRowKey startKey = rang.getStartKey(); - int startKeySize = startKey.getObjs().size(); - ObRowKey endKey = rang.getEndKey(); - int endKeySize = endKey.getObjs().size(); - Object[] start = new Object[startKeySize]; - Object[] end = new Object[endKeySize]; - for (int i = 0; i < startKeySize; i++) { - ObObj curStart = startKey.getObj(i); - if (curStart.isMinObj()) { - start[i] = curStart; - } else { - start[i] = curStart.getValue(); - } - } - - for (int i = 0; i < endKeySize; i++) { - ObObj curEnd = endKey.getObj(i); - if (curEnd.isMaxObj()) { - end[i] = curEnd; - } else { - end[i] = curEnd.getValue(); - } - } - ObBorderFlag borderFlag = rang.getBorderFlag(); - List params = getTableParams(request.getTableName(), - tableQuery, start, borderFlag.isInclusiveStart(), end, - borderFlag.isInclusiveEnd()); - for (ObTableParam param : params) { - partIdMapObTable.put(param.getPartId(), param); - } + if (needRefreshTabletLocation) { + needRefreshTabletLocation = false; + boolean isHKV = request.getEntityType() == ObTableEntityType.HKV; + tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV); } + Map partIdMapObTable = tableRoute.getPartIdParamMapForQuery( + request.getTableName(), tableQuery.getScanRangeColumns(), + tableQuery.getKeyRanges()); // Check if partIdMapObTable size is greater than 1 boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute(); @@ -2344,9 +2325,12 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), tryTimes, ex); - if (ex instanceof ObTableNeedFetchMetaException) { - // Refresh table info - refreshMeta(request.getTableName()); + if (((ObTableException) ex).isNeedRefreshTableEntry()) { + needRefreshTabletLocation = true; + if (ex instanceof ObTableNeedFetchMetaException) { + // Refresh table info + refreshMeta(request.getTableName()); + } } } else { calculateContinuousFailure(request.getTableName(), ex.getMessage()); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java index 79873d32..e55f69ea 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java @@ -580,6 +580,33 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception } } + public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery query, boolean isHKV) throws Exception { + Map partIdParamMap = getPartIdParamMapForQuery(tableName, query.getScanRangeColumns(), query.getKeyRanges()); + if (isHKV) { + // for HBase process, if distributed function is enabled, no need to do routing refresh + boolean isDistributedSupported = getServerCapacity().isSupportDistributedExecute(); + if (partIdParamMap.size() > 1 && !isDistributedSupported) { + throw new ObTablePartitionConsistentException( + "query and mutate must be a atomic operation"); + } else if (isDistributedSupported) { + return; + } + } else { + // for table process, distributed function is not supported yet, need to refresh routing + // for now only support to query single tablet + if (partIdParamMap.size() > 1) { + throw new ObTablePartitionConsistentException( + "query and mutate must be a atomic operation"); + } else if (partIdParamMap.isEmpty()) { + throw new ObTableException("could not find part id of range"); + } + } + Map.Entry entry = partIdParamMap.entrySet().iterator().next(); + TableEntry tableEntry = getTableEntry(tableName); + long tabletId = entry.getValue().getTabletId(); + refreshPartitionLocation(tableName, tabletId, tableEntry); + } + private Long[] getTabletsFromTableEntry(TableEntry tableEntry) { Long[] tablets = null; if (tableEntry.isPartitionTable()) { @@ -693,6 +720,44 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry, return obPartitionLocationInfo; } + public Map getPartIdParamMapForQuery(String tableName, List scanRangeColumns, + List keyRanges) throws Exception { + Map parIdParamMapObTable = new HashMap(); + for (ObNewRange keyRange : keyRanges) { + ObRowKey startKey = keyRange.getStartKey(); + int startKeySize = startKey.getObjs().size(); + ObRowKey endKey = keyRange.getEndKey(); + int endKeySize = endKey.getObjs().size(); + Object[] start = new Object[startKeySize]; + Object[] end = new Object[endKeySize]; + for (int i = 0; i < startKeySize; i++) { + ObObj curStart = startKey.getObj(i); + if (curStart.isMinObj()) { + start[i] = curStart; + } else { + start[i] = curStart.getValue(); + } + } + + for (int i = 0; i < endKeySize; i++) { + ObObj curEnd = endKey.getObj(i); + if (curEnd.isMaxObj()) { + end[i] = curEnd; + } else { + end[i] = curEnd.getValue(); + } + } + ObBorderFlag borderFlag = keyRange.getBorderFlag(); + List paramList = getTablesInternal(tableName, scanRangeColumns, start, + borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), + tableClient.getRoute(false)); + for (ObTableParam param : paramList) { + parIdParamMapObTable.put(param.getPartId(), param); + } + } + return parIdParamMapObTable; + } + /** * get addr by partId * @param tableName table want to get @@ -869,50 +934,18 @@ public ObTableParam getTableParam(String tableName, List scanRangeColumn */ public ObTableParam getTableParam(String tableName, List scanRangeColumns, List keyRanges) throws Exception { - Map tabletIdIdMapObTable = new HashMap(); - for (ObNewRange keyRange : keyRanges) { - ObRowKey startKey = keyRange.getStartKey(); - int startKeySize = startKey.getObjs().size(); - ObRowKey endKey = keyRange.getEndKey(); - int endKeySize = endKey.getObjs().size(); - Object[] start = new Object[startKeySize]; - Object[] end = new Object[endKeySize]; - for (int i = 0; i < startKeySize; i++) { - ObObj curStart = startKey.getObj(i); - if (curStart.isMinObj()) { - start[i] = curStart; - } else { - start[i] = curStart.getValue(); - } - } - - for (int i = 0; i < endKeySize; i++) { - ObObj curEnd = endKey.getObj(i); - if (curEnd.isMaxObj()) { - end[i] = curEnd; - } else { - end[i] = curEnd.getValue(); - } - } - ObBorderFlag borderFlag = keyRange.getBorderFlag(); - List paramList = getTablesInternal(tableName, scanRangeColumns, start, - borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), - tableClient.getRoute(false)); - for (ObTableParam param : paramList) { - tabletIdIdMapObTable.put(param.getTabletId(), param); - } - } + Map partIdIdMapObTable = getPartIdParamMapForQuery( + tableName, scanRangeColumns, keyRanges); // for now only support to query single tablet - if (tabletIdIdMapObTable.size() > 1) { + if (partIdIdMapObTable.size() > 1) { throw new ObTablePartitionConsistentException( "query and mutate must be a atomic operation"); - } else if (tabletIdIdMapObTable.size() < 1) { + } else if (partIdIdMapObTable.isEmpty()) { throw new ObTableException("could not find part id of range"); } ObTableParam ans = null; - for (Long tabletId : tabletIdIdMapObTable.keySet()) { - ans = tabletIdIdMapObTable.get(tabletId); - } + Map.Entry entry = partIdIdMapObTable.entrySet().iterator().next(); + ans = entry.getValue(); return ans; } @@ -979,45 +1012,7 @@ private List getTablesInternal(String tableName, List scan List params = new ArrayList<>(); for (ObPair partIdWithReplica : partIdWithReplicaList) { long partId = partIdWithReplica.getLeft(); - long tabletId = getTabletIdByPartId(tableEntry, partId); - ReplicaLocation replica = partIdWithReplica.getRight(); - ObServerAddr addr = replica.getAddr(); - ObTable obTable = tableRoster.getTable(addr); - int retryTimes = 0; - while (obTable == null && retryTimes < 2) { - ++retryTimes; - // need to refresh table roster to ensure the current roster is the latest - tableClient.syncRefreshMetadata(true); - // the addr is wrong, need to refresh location - if (logger.isInfoEnabled()) { - logger.info("Cannot get ObTable by addr {}, refreshing metadata.", addr); - } - // refresh tablet location based on the latest roster, in case that some of the observers hase been killed - // and used the old location - tableEntry = refreshPartitionLocation(tableName, tabletId, tableEntry); - ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId); - replica = getPartitionLocation(locationInfo, route); - - if (replica == null) { - RUNTIME.error("Cannot get replica by tableName: {}, tabletId: {}", tableName, tabletId); - throw new ObTableGetException("Cannot get replica by tableName: " + tableName + ", tabletId: " + tabletId); - } - addr = replica.getAddr(); - obTable = tableRoster.getTable(addr); - } - if (obTable == null) { - RUNTIME.error("cannot get table by addr: " + addr); - throw new ObTableGetException("obTable is null, addr is: " + addr.getIp() + ":" + addr.getSvrPort()); - } - - ObTableParam param = new ObTableParam(obTable); - param.setLsId(tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getTabletLsId()); - param.setTableId(tableEntry.getTableId()); - param.setPartId(partId); - // real tablet id - param.setPartitionId(tabletId); - - addr.recordAccess(); + ObTableParam param = getTableInternal(tableName, tableEntry, partId, route); params.add(param); } return params;