diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 203e1679..b1c7ec09 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -677,6 +677,7 @@ private T execute(String tableName, TableExecuteCallback callback, ObServ tryTimes); if (ex instanceof ObTableNeedFetchAllException) { needFetchAllRouteInfo = true; + getOrRefreshTableEntry(tableName, true, true, true); // reset failure count while fetch all route info this.resetExecuteContinuousFailureCount(tableName); } @@ -767,7 +768,6 @@ private T executeMutation(String tableName, MutationExecuteCallback callb throw new IllegalArgumentException("table name is null"); } boolean needRefreshTableEntry = false; - boolean needFetchAllRouteInfo = false; int tryTimes = 0; long startExecute = System.currentTimeMillis(); while (true) { @@ -787,10 +787,14 @@ private T executeMutation(String tableName, MutationExecuteCallback callb obPair = new ObPair(0L, new ObTableParam(odpTable)); } else { if (null != callback.getRowKey()) { + // in the case of retry, the location always needs to be refreshed here + if (tryTimes > 1) { + TableEntry entry = getOrRefreshTableEntry(tableName, false, false, false); + Long partId = getPartition(entry, callback.getRowKey()); + refreshTableLocationByTabletId(entry, tableName, getTabletIdByPartId(entry, partId)); + } // using row key - obPair = getTable(tableName, callback.getRowKey(), - needRefreshTableEntry, tableEntryRefreshIntervalWait, - needFetchAllRouteInfo, route); + obPair = getTable(tableName, callback.getRowKey(), needRefreshTableEntry, tableEntryRefreshIntervalWait, false, route); } else if (null != callback.getKeyRanges()) { // using scan range obPair = getTable(tableName, new ObTableQuery(), @@ -852,7 +856,7 @@ private T executeMutation(String tableName, MutationExecuteCallback callb ((ObTableException) ex).getErrorCode(), ex.getMessage(), tryTimes); if (ex instanceof ObTableNeedFetchAllException) { - needFetchAllRouteInfo = true; + getOrRefreshTableEntry(tableName, needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true); // reset failure count while fetch all route info this.resetExecuteContinuousFailureCount(tableName); } @@ -1336,33 +1340,14 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t } long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime(); long currentTime = System.currentTimeMillis(); - if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) { + if (currentTime - lastRefreshTime < tableEntryRefreshIntervalCeiling) { return tableEntry; } - - Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock()); - - if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) { - String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.", - dataSourceName, tableName, tableEntryRefreshLockTimeout); - RUNTIME.error(errMsg); - throw new ObTableEntryRefreshException(errMsg); - } - - try { - lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime(); - currentTime = System.currentTimeMillis(); - if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) { - return tableEntry; - } - tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId, - tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, - serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA); + tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId, + tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, + serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA); - tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation()); - } finally { - lock.unlock(); - } + tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation()); } catch (ObTableNotExistException | ObTableServerCacheExpiredException e) { RUNTIME.error("RefreshTableEntry encountered an exception", e); @@ -1676,7 +1661,9 @@ private ObPair getTable(String tableName, Object[] rowKey, } long partId = getPartition(tableEntry, row); // partition id in 3.x, origin partId in 4.x, logicId - + if (refresh) { + refreshTableLocationByTabletId(tableEntry, tableName, getTabletIdByPartId(tableEntry, partId)); + } return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route); } @@ -3149,41 +3136,95 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.setTimeout(getOdpTable().getObTableOperationTimeout()); return getOdpTable().execute(request); } else { + int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries + int tryTimes = 0; + long startExecute = System.currentTimeMillis(); + boolean needRefreshTableEntry = false; Map partIdMapObTable = new HashMap(); - for (ObNewRange rang : tableQuery.getKeyRanges()) { - ObRowKey startKey = rang.getStartKey(); - int startKeySize = startKey.getObjs().size(); - ObRowKey endKey = rang.getEndKey(); - int endKeySize = endKey.getObjs().size(); - Object[] start = new Object[startKeySize]; - Object[] end = new Object[endKeySize]; - for (int i = 0; i < startKeySize; i++) { - start[i] = startKey.getObj(i).getValue(); + while (true) { + long currentExecute = System.currentTimeMillis(); + long costMillis = currentExecute - startExecute; + if (costMillis > getRuntimeMaxWait()) { + logger.error( + "tablename:{} it has tried " + tryTimes + + " times and it has waited " + costMillis + + "/ms which exceeds response timeout " + + getRuntimeMaxWait() + "/ms", request.getTableName()); + throw new ObTableTimeoutExcetion("it has tried " + tryTimes + + " times and it has waited " + costMillis + + "/ms which exceeds response timeout " + + getRuntimeMaxWait() + "/ms"); } + try { + // Recalculate partIdMapObTable + // Clear the map before recalculating + partIdMapObTable.clear(); + for (ObNewRange rang : tableQuery.getKeyRanges()) { + ObRowKey startKey = rang.getStartKey(); + int startKeySize = startKey.getObjs().size(); + ObRowKey endKey = rang.getEndKey(); + int endKeySize = endKey.getObjs().size(); + Object[] start = new Object[startKeySize]; + Object[] end = new Object[endKeySize]; + for (int i = 0; i < startKeySize; i++) { + start[i] = startKey.getObj(i).getValue(); + } - for (int i = 0; i < endKeySize; i++) { - end[i] = endKey.getObj(i).getValue(); - } - ObBorderFlag borderFlag = rang.getBorderFlag(); - List> pairList = getTables(request.getTableName(), - tableQuery, start, borderFlag.isInclusiveStart(), end, - borderFlag.isInclusiveEnd(), false, false); - for (ObPair pair : pairList) { - partIdMapObTable.put(pair.getLeft(), pair.getRight()); - } - } - if (partIdMapObTable.size() > 1) { - throw new ObTablePartitionConsistentException( - "query and mutate must be a atomic operation"); - } + for (int i = 0; i < endKeySize; i++) { + end[i] = endKey.getObj(i).getValue(); + } + ObBorderFlag borderFlag = rang.getBorderFlag(); + List> pairList = getTables(request.getTableName(), + tableQuery, start, borderFlag.isInclusiveStart(), end, + borderFlag.isInclusiveEnd(), needRefreshTableEntry, isTableEntryRefreshIntervalWait()); + for (ObPair pair : pairList) { + partIdMapObTable.put(pair.getLeft(), pair.getRight()); + } + } - for (Long partId : partIdMapObTable.keySet()) { - ObTableParam tableParam = partIdMapObTable.get(partId); - request.setTableId(tableParam.getTableId()); - request.setPartitionId(tableParam.getPartitionId()); - request.setTimeout(tableParam.getObTable().getObTableOperationTimeout()); - ObTable obTable = tableParam.getObTable(); - return executeWithRetry(obTable, request, request.getTableName()); + // Check if partIdMapObTable size is greater than 1 + if (partIdMapObTable.size() > 1) { + throw new ObTablePartitionConsistentException( + "query and mutate must be a atomic operation"); + } + // Proceed with the operation + Map.Entry entry = partIdMapObTable.entrySet().iterator().next(); + ObTableParam tableParam = entry.getValue(); + request.setTableId(tableParam.getTableId()); + request.setPartitionId(tableParam.getPartitionId()); + request.setTimeout(tableParam.getObTable().getObTableOperationTimeout()); + ObTable obTable = tableParam.getObTable(); + + // Attempt to execute the operation + return executeWithRetry(obTable, request, request.getTableName()); + } catch (Exception ex) { + tryTimes++; + if (ex instanceof ObTableException && ((ObTableException) ex).isNeedRefreshTableEntry()) { + needRefreshTableEntry = true; + logger.warn( + "tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}", + request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), ex); + + if (isRetryOnChangeMasterTimes() && tryTimes <= maxRetries) { + logger.warn( + "tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}", + request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), + tryTimes, ex); + + if (ex instanceof ObTableNeedFetchAllException) { + // Refresh table info + getOrRefreshTableEntry(request.getTableName(), needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true); + } + } else { + calculateContinuousFailure(request.getTableName(), ex.getMessage()); + throw ex; + } + } else { + calculateContinuousFailure(request.getTableName(), ex.getMessage()); + // Handle other exceptions or rethrow + throw ex; + } + } } } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index ecc37f25..9e1fde4d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -56,156 +56,164 @@ public class LocationUtil { - private static final Logger logger = TableClientLoggerFactory - .getLogger(LocationUtil.class); + private static final Logger logger = TableClientLoggerFactory + .getLogger(LocationUtil.class); static { ParserConfig.getGlobalInstance().setSafeMode(true); } - private static final String OB_VERSION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ OB_VERSION() AS CLUSTER_VERSION;"; + private static final String OB_VERSION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ OB_VERSION() AS CLUSTER_VERSION;"; - private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table " - + "where table_name = ?"; + private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table " + + "where table_name = ?"; - private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema " - + "where tenant_name = ? and database_name = ? and table_name = ? limit 1"; + private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema " + + "where tenant_name = ? and database_name = ? and table_name = ? limit 1"; - private static final String OB_TENANT_EXIST_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tenant_id from __all_tenant where tenant_name = ?;"; + private static final String OB_TENANT_EXIST_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tenant_id from __all_tenant where tenant_name = ?;"; @Deprecated @SuppressWarnings("unused") - private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT = "SELECT /*+READ_CONSISTENCY(WEAK)*/ partition_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 " - + "FROM oceanbase.__all_virtual_proxy_schema " - + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND partition_id in ({0}) AND sql_port > 0 " - + "ORDER BY role ASC LIMIT ?"; - - private static final String PROXY_PART_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, " - + "part_range_type, part_interval_bin, interval_start_bin, " - + "sub_part_num, sub_part_type, sub_part_space, " - + "sub_part_range_type, def_sub_part_interval_bin, def_sub_interval_start_bin, sub_part_expr, " - + "part_key_name, part_key_type, part_key_idx, part_key_extra, spare1 " - + "FROM oceanbase.__all_virtual_proxy_partition_info " - + "WHERE table_id = ? group by part_key_name order by part_key_name LIMIT ?;"; + private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT = "SELECT /*+READ_CONSISTENCY(WEAK)*/ partition_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 " + + "FROM oceanbase.__all_virtual_proxy_schema " + + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND partition_id in ({0}) AND sql_port > 0 " + + "ORDER BY role ASC LIMIT ?"; + + private static final String PROXY_PART_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, " + + "part_range_type, part_interval_bin, interval_start_bin, " + + "sub_part_num, sub_part_type, sub_part_space, " + + "sub_part_range_type, def_sub_part_interval_bin, def_sub_interval_start_bin, sub_part_expr, " + + "part_key_name, part_key_type, part_key_idx, part_key_extra, spare1 " + + "FROM oceanbase.__all_virtual_proxy_partition_info " + + "WHERE table_id = ? group by part_key_name order by part_key_name LIMIT ?;"; @Deprecated @SuppressWarnings("unused") - private static final String PROXY_TENANT_SCHEMA_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 " - + "FROM oceanbase.__all_virtual_proxy_schema " - + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 " - + "ORDER BY partition_id ASC, role ASC LIMIT ?"; - - private static final String PROXY_DUMMY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " - + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " - + ", A.spare1 as replica_type " - + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " - + "WHERE tenant_name = ? and database_name=? and table_name = ?"; - - private static final String PROXY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " - + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " - + ", A.spare1 as replica_type " - + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " - + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id = 0"; - - private static final String PROXY_LOCATION_SQL_PARTITION = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " - + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " - + ", A.spare1 as replica_type " - + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " - + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id in ({0})"; - - private static final String PROXY_FIRST_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, high_bound_val " - + "FROM oceanbase.__all_virtual_proxy_partition " - + "WHERE table_id = ? LIMIT ?;"; - - private static final String PROXY_SUB_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, high_bound_val " - + "FROM oceanbase.__all_virtual_proxy_sub_partition " - + "WHERE table_id = ? LIMIT ?;"; - - private static final String PROXY_SERVER_STATUS_INFO = "SELECT ss.svr_ip, ss.zone, zs.region, zs.spare4 as idc " - + "FROM oceanbase.__all_virtual_proxy_server_stat ss, oceanbase.__all_virtual_zone_stat zs " - + "WHERE zs.zone = ss.zone ;"; + private static final String PROXY_TENANT_SCHEMA_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 " + + "FROM oceanbase.__all_virtual_proxy_schema " + + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 " + + "ORDER BY partition_id ASC, role ASC LIMIT ?"; + + private static final String PROXY_DUMMY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " + + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " + + ", A.spare1 as replica_type " + + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " + + "WHERE tenant_name = ? and database_name=? and table_name = ?"; + + private static final String PROXY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " + + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " + + ", A.spare1 as replica_type " + + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " + + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id = 0"; + + private static final String PROXY_LOCATION_SQL_PARTITION = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " + + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " + + ", A.spare1 as replica_type " + + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " + + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id in ({0})"; + + private static final String PROXY_FIRST_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, high_bound_val " + + "FROM oceanbase.__all_virtual_proxy_partition " + + "WHERE table_id = ? LIMIT ?;"; + + private static final String PROXY_SUB_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, high_bound_val " + + "FROM oceanbase.__all_virtual_proxy_sub_partition " + + "WHERE table_id = ? LIMIT ?;"; + + private static final String PROXY_SERVER_STATUS_INFO = "SELECT ss.svr_ip, ss.zone, zs.region, zs.spare4 as idc " + + "FROM oceanbase.__all_virtual_proxy_server_stat ss, oceanbase.__all_virtual_zone_stat zs " + + "WHERE zs.zone = ss.zone ;"; @Deprecated @SuppressWarnings("unused") - private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tablet_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 " - + "FROM oceanbase.__all_virtual_proxy_schema " - + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND tablet_id in ({0}) AND sql_port > 0 " - + "ORDER BY role ASC LIMIT ?"; - - private static final String PROXY_PART_INFO_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, " - + "part_range_type, sub_part_num, sub_part_type, sub_part_space, sub_part_range_type, sub_part_expr, " - + "part_key_name, part_key_type, part_key_idx, part_key_extra, part_key_collation_type " - + "FROM oceanbase.__all_virtual_proxy_partition_info " - + "WHERE tenant_name = ? and table_id = ? group by part_key_name order by part_key_name LIMIT ?;"; + private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tablet_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 " + + "FROM oceanbase.__all_virtual_proxy_schema " + + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND tablet_id in ({0}) AND sql_port > 0 " + + "ORDER BY role ASC LIMIT ?"; + + private static final String PROXY_PART_INFO_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, " + + "part_range_type, sub_part_num, sub_part_type, sub_part_space, sub_part_range_type, sub_part_expr, " + + "part_key_name, part_key_type, part_key_idx, part_key_extra, part_key_collation_type " + + "FROM oceanbase.__all_virtual_proxy_partition_info " + + "WHERE tenant_name = ? and table_id = ? group by part_key_name order by part_key_name LIMIT ?;"; @Deprecated @SuppressWarnings("unused") - private static final String PROXY_TENANT_SCHEMA_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 " - + "FROM oceanbase.__all_virtual_proxy_schema " - + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 " - + "ORDER BY tablet_id ASC, role ASC LIMIT ?"; - - private static final String PROXY_DUMMY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " - + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " - + ", A.spare1 as replica_type " - + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " - + "WHERE tenant_name = ? and database_name=? and table_name = ?"; - - private static final String PROXY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " - + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " - + ", A.spare1 as replica_type " - + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " - + "WHERE tenant_name = ? and database_name=? and table_name = ? and tablet_id = 0"; - - private static final String PROXY_LOCATION_SQL_PARTITION_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ * FROM ( " - + " SELECT A.tablet_id as tablet__id, A.svr_ip as svr_ip, A.sql_port as sql_port, A.table_id as table_id, " - + " A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, " - + " B.stop_time as stop_time, A.spare1 as replica_type " - + " FROM oceanbase.__all_virtual_proxy_schema A " - + " INNER JOIN oceanbase.__all_server B ON A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port " - + " WHERE A.tablet_id IN ({0}) AND A.tenant_name = ? AND A.database_name = ? AND A.table_name = ?) AS left_table " - + "LEFT JOIN (" - + " SELECT D.ls_id, D.tablet_id " - + " FROM oceanbase.__all_virtual_tablet_to_ls D " - + " INNER JOIN oceanbase.DBA_OB_TENANTS C ON D.tenant_id = C.tenant_id " - + " WHERE C.tenant_name = ? " - + ") AS right_table ON left_table.tablet__id = right_table.tablet_id;"; - - private static final String PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ * FROM ( " - + " SELECT A.tablet_id as tablet__id, A.svr_ip as svr_ip, A.sql_port as sql_port, A.table_id as table_id, " - + " A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, " - + " B.stop_time as stop_time, A.spare1 as replica_type " - + " FROM oceanbase.__all_virtual_proxy_schema A " - + " INNER JOIN oceanbase.__all_server B ON A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port " - + " WHERE A.tablet_id = ? AND A.tenant_name = ? AND A.database_name = ? AND A.table_name = ?) AS left_table " - + "LEFT JOIN (" - + " SELECT D.ls_id, D.tablet_id " - + " FROM oceanbase.__all_virtual_tablet_to_ls D " - + " INNER JOIN oceanbase.DBA_OB_TENANTS C ON D.tenant_id = C.tenant_id " - + " WHERE C.tenant_name = ? " - + ") AS right_table ON left_table.tablet__id = right_table.tablet_id;"; - - private static final String PROXY_FIRST_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, tablet_id, high_bound_val, sub_part_num " - + "FROM oceanbase.__all_virtual_proxy_partition " - + "WHERE tenant_name = ? and table_id = ? LIMIT ?;"; - - private static final String PROXY_SUB_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, tablet_id, high_bound_val " - + "FROM oceanbase.__all_virtual_proxy_sub_partition " - + "WHERE tenant_name = ? and table_id = ? LIMIT ?;"; - - private static final String PROXY_SERVER_STATUS_INFO_V4 = "SELECT ss.svr_ip, ss.zone, zs.region, zs.idc as idc " - + "FROM DBA_OB_SERVERS ss, DBA_OB_ZONES zs " - + "WHERE zs.zone = ss.zone ;"; - - private static final String home = System.getProperty( - "user.home", - "/home/admin"); - - private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name " - + "FROM oceanbase.CDB_OB_TABLEGROUP_TABLES " - + "WHERE tablegroup_name = ? and tenant_id = ? limit 1;"; - - private static final int TEMPLATE_PART_ID = -1; + private static final String PROXY_TENANT_SCHEMA_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 " + + "FROM oceanbase.__all_virtual_proxy_schema " + + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 " + + "ORDER BY tablet_id ASC, role ASC LIMIT ?"; + + private static final String PROXY_DUMMY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " + + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " + + ", A.spare1 as replica_type " + + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " + + "WHERE tenant_name = ? and database_name=? and table_name = ?"; + + private static final String PROXY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, " + + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time " + + ", A.spare1 as replica_type " + + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port " + + "WHERE tenant_name = ? and database_name=? and table_name = ? and tablet_id = 0"; + + private static final String PROXY_LOCATION_SQL_PARTITION_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ * FROM ( " + + " SELECT A.tablet_id as tablet__id, A.svr_ip as svr_ip, A.sql_port as sql_port, A.table_id as table_id, " + + " A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, " + + " B.stop_time as stop_time, A.spare1 as replica_type " + + " FROM oceanbase.__all_virtual_proxy_schema A " + + " INNER JOIN oceanbase.__all_server B ON A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port " + + " WHERE A.tablet_id IN ({0}) AND A.tenant_name = ? AND A.database_name = ? AND A.table_name = ?) AS left_table " + + "LEFT JOIN (" + + " SELECT D.ls_id, D.tablet_id " + + " FROM oceanbase.__all_virtual_tablet_to_ls D " + + " INNER JOIN oceanbase.DBA_OB_TENANTS C ON D.tenant_id = C.tenant_id " + + " WHERE C.tenant_name = ? " + + ") AS right_table ON left_table.tablet__id = right_table.tablet_id;"; + + private static final String PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ " + + " A.tablet_id as tablet_id, " + + " A.svr_ip as svr_ip, " + + " A.sql_port as sql_port, " + + " A.table_id as table_id, " + + " A.role as role, " + + " A.replica_num as replica_num, " + + " A.part_num as part_num, " + + " (SELECT B.svr_port FROM oceanbase.__all_server B WHERE A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port) as svr_port, " + + " (SELECT B.status FROM oceanbase.__all_server B WHERE A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port) as status, " + + " (SELECT B.stop_time FROM oceanbase.__all_server B WHERE A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port) as stop_time, " + + " A.spare1 as replica_type, " + + " (SELECT D.ls_id FROM oceanbase.__all_virtual_tablet_to_ls D WHERE A.tablet_id = D.tablet_id AND D.tenant_id = " + + " (SELECT C.tenant_id FROM oceanbase.DBA_OB_TENANTS C WHERE C.tenant_name = ?)) as ls_id " + + "FROM " + + " oceanbase.__all_virtual_proxy_schema A " + + "WHERE " + + " A.tablet_id = ? " + + " AND A.tenant_name = ? " + + " AND A.database_name = ? " + + " AND A.table_name = ?;"; + + private static final String PROXY_FIRST_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, tablet_id, high_bound_val, sub_part_num " + + "FROM oceanbase.__all_virtual_proxy_partition " + + "WHERE tenant_name = ? and table_id = ? LIMIT ?;"; + + private static final String PROXY_SUB_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, tablet_id, high_bound_val " + + "FROM oceanbase.__all_virtual_proxy_sub_partition " + + "WHERE tenant_name = ? and table_id = ? LIMIT ?;"; + + private static final String PROXY_SERVER_STATUS_INFO_V4 = "SELECT ss.svr_ip, ss.zone, zs.region, zs.idc as idc " + + "FROM DBA_OB_SERVERS ss, DBA_OB_ZONES zs " + + "WHERE zs.zone = ss.zone ;"; + + private static final String home = System.getProperty( + "user.home", + "/home/admin"); + + private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name " + + "FROM oceanbase.CDB_OB_TABLEGROUP_TABLES " + + "WHERE tablegroup_name = ? and tenant_id = ? limit 1;"; + + private static final int TEMPLATE_PART_ID = -1; // limit the size of get tableEntry location from remote each time - private static final int MAX_TABLET_NUMS_EPOCH = 300; + private static final int MAX_TABLET_NUMS_EPOCH = 300; private abstract static class TableEntryRefreshWithPriorityCallback { abstract T execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException; @@ -735,7 +743,7 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn } } } - + if (ObGlobal.obVsnMajor() >= 4) { // only set empty partitionEntry ObPartitionEntry partitionEntry = new ObPartitionEntry(); @@ -857,11 +865,11 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection, String sql = genLocationSQLByTabletId(); try { ps = connection.prepareStatement(sql); - ps.setLong(1, tabletId); - ps.setString(2, key.getTenantName()); - ps.setString(3, key.getDatabaseName()); - ps.setString(4, key.getTableName()); - ps.setString(5, key.getTenantName()); + ps.setString(1, key.getTenantName()); + ps.setLong(2, tabletId); + ps.setString(3, key.getTenantName()); + ps.setString(4, key.getDatabaseName()); + ps.setString(5, key.getTableName()); rs = ps.executeQuery(); getPartitionLocationFromResultSetByTablet(tableEntry, rs, partitionEntry, tabletId); } catch (Exception e) { @@ -912,8 +920,8 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection, } catch (Exception e) { RUNTIME.error(LCD.convert("01-00010"), key, partitionNum, tableEntry, e); throw new ObTablePartitionLocationRefreshException(format( - "fail to get partition location entry from remote entryKey = %s partNum = %d tableEntry =%s " - + "offset =%d epoch =%d", key, partitionNum, tableEntry, i, epoch), e); + "fail to get partition location entry from remote entryKey = %s partNum = %d tableEntry =%s " + + "offset =%d epoch =%d", key, partitionNum, tableEntry, i, epoch), e); } finally { try { if (null != rs) { @@ -1205,46 +1213,47 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE ObPartitionLocationInfo partitionLocationInfo = partitionEntry.getPartitionInfo(tabletId); - partitionLocationInfo.rwLock.writeLock().lock(); - try { - while (rs.next()) { - ReplicaLocation replica = buildReplicaLocation(rs); - - long partitionId = (ObGlobal.obVsnMajor() >= 4) ? rs.getLong("tablet_id") : rs - .getLong("partition_id"); - long lsId = ObGlobal.obVsnMajor() >= 4 ? rs.getLong("ls_id") : INVALID_LS_ID; - if (rs.wasNull() && ObGlobal.obVsnMajor() >= 4) { - lsId = INVALID_LS_ID; // For non-partitioned table - } - partitionLocationInfo.setTabletLsId(lsId); - - if (ObGlobal.obVsnMajor() < 4 && tableEntry.isPartitionTable() - && tableEntry.getPartitionInfo().getSubPartDesc() != null) { - partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry - .getPartitionInfo().getSubPartDesc().getPartNum()); - } + while (rs.next()) { + ReplicaLocation replica = buildReplicaLocation(rs); + long partitionId = (ObGlobal.obVsnMajor() >= 4) ? rs.getLong("tablet_id") : rs + .getLong("partition_id"); + long lsId = ObGlobal.obVsnMajor() >= 4 ? rs.getLong("ls_id") : INVALID_LS_ID; + if (rs.wasNull() && ObGlobal.obVsnMajor() >= 4) { + lsId = INVALID_LS_ID; // For non-partitioned table + } - if (!replica.isValid()) { - RUNTIME - .warn(format( - "Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d", - replica, partitionId, tableEntry.getTableId())); - continue; - } - ObPartitionLocation location = partitionLocationInfo.getPartitionLocation(); - if (location == null) { - location = new ObPartitionLocation(); - partitionLocationInfo.updateLocation(location); + if (ObGlobal.obVsnMajor() < 4 && tableEntry.isPartitionTable() + && tableEntry.getPartitionInfo().getSubPartDesc() != null) { + partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry + .getPartitionInfo().getSubPartDesc().getPartNum()); + } + if (!replica.isValid()) { + RUNTIME + .warn(format( + "Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d", + replica, partitionId, tableEntry.getTableId())); + continue; + } + ObPartitionLocation location = partitionLocationInfo.getPartitionLocation(); + if (location == null) { + partitionLocationInfo.rwLock.writeLock().lock(); + try { + location = partitionLocationInfo.getPartitionLocation(); + if (location == null) { + location = new ObPartitionLocation(); + partitionLocationInfo.updateLocation(location, lsId); + } + } finally { + partitionLocationInfo.rwLock.writeLock().unlock(); } - location.addReplicaLocation(replica); + } + location.addReplicaLocation(replica); - if (partitionLocationInfo.initialized.compareAndSet(false, true)) { - partitionLocationInfo.initializationLatch.countDown(); - } + if (partitionLocationInfo.initialized.compareAndSet(false, true)) { + partitionLocationInfo.initializationLatch.countDown(); } - } finally { - partitionLocationInfo.rwLock.writeLock().unlock(); } + return partitionEntry; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java index 8b9181b9..b12b7ba0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java @@ -40,17 +40,24 @@ public ObPartitionLocation getPartitionLocation() { } } - public void updateLocation(ObPartitionLocation newLocation) { - this.partitionLocation = newLocation; - this.lastUpdateTime = System.currentTimeMillis(); + public void updateLocation(ObPartitionLocation newLocation, Long tabletLsId) { + rwLock.writeLock().lock(); + try { + this.partitionLocation = newLocation; + this.tabletLsId = tabletLsId; + this.lastUpdateTime = System.currentTimeMillis(); + } finally { + rwLock.writeLock().unlock(); + } } public Long getTabletLsId() { - return tabletLsId; - } - - public void setTabletLsId(Long tabletLsId) { - this.tabletLsId = tabletLsId; + rwLock.readLock().lock(); + try { + return tabletLsId; + } finally { + rwLock.readLock().unlock(); + } } public Long getLastUpdateTime() { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 5da49bbc..7662aa69 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -35,6 +35,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult; import com.alipay.oceanbase.rpc.table.ObTable; import com.alipay.oceanbase.rpc.table.ObTableParam; @@ -45,6 +46,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME; + public abstract class AbstractQueryStreamResult extends AbstractPayload implements QueryStreamResult { @@ -61,14 +64,15 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen // global index: key is index table name (be like: __idx__) protected String indexTableName; protected ObTableEntityType entityType; - protected Map> expectant; // Map> + protected Map> expectant; protected List cacheProperties = new LinkedList(); protected LinkedList> cacheRows = new LinkedList>(); private LinkedList, ObTableQueryResult>> partitionLastResult = new LinkedList, ObTableQueryResult>>(); private ObReadConsistency readConsistency = ObReadConsistency.STRONG; // ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT] public List currentStartKey; - + protected ObTableClient client; + /* * Get pcode. */ @@ -228,7 +232,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, } else if (e instanceof ObTableException) { if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e) .getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) - && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery() + && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery() && client.getTableGroupInverted().get(indexTableName) != null) { // table not exists && hbase mode && table group exists , three condition both client.eraseTableGroupFromCache(tableName); @@ -549,9 +553,32 @@ public void init() throws Exception { return; } if (tableQuery.getBatchSize() == -1) { - for (Map.Entry> entry : expectant.entrySet()) { - // mark the refer partition - referToNewPartition(entry.getValue()); + if (!expectant.isEmpty()) { + Iterator>> it = expectant.entrySet() + .iterator(); + int retryTimes = 0; + while (it.hasNext()) { + Map.Entry> entry = it.next(); + try { + // try access new partition, async will not remove useless expectant + referToNewPartition(entry.getValue()); + } catch (Exception e) { + if (e instanceof ObTableNeedFetchAllException) { + setExpectant(refreshPartition(tableQuery, tableName)); + it = expectant.entrySet().iterator(); + retryTimes++; + if (retryTimes > client.getRuntimeRetryTimes()) { + RUNTIME.error("Fail to get refresh table entry response after {}", + retryTimes); + throw new ObTableRetryExhaustedException( + "Fail to get refresh table entry response after " + retryTimes); + + } + } else { + throw e; + } + } + } } expectant.clear(); } else { @@ -692,4 +719,19 @@ public ObReadConsistency getReadConsistency() { public void setReadConsistency(ObReadConsistency readConsistency) { this.readConsistency = readConsistency; } + + /** + * Get client. + * @return client + */ + public ObTableClient getClient() { + return client; + } + + /* + * Set client. + */ + public void setClient(ObTableClient client) { + this.client = client; + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index cae7f39c..8a933d64 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -44,7 +44,6 @@ public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult { private static final Logger logger = LoggerFactory .getLogger(ObTableClientQueryStreamResult.class); - protected ObTableClient client; private boolean isEnd = true; private long sessionId = Constants.OB_INVALID_ID; private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest(); @@ -338,19 +337,7 @@ public void close() throws Exception { closeLastStreamResult(lastEntry.getValue()); } } - - public ObTableClient getClient() { - return client; - } - - /** - * Set client. - * @param client client want to set - */ - public void setClient(ObTableClient client) { - this.client = client; - } - + public boolean isEnd() { return isEnd; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java index 2c0bf482..b091b4b7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java @@ -34,10 +34,8 @@ import java.util.concurrent.atomic.AtomicReference; public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult { - private static final Logger logger = TableClientLoggerFactory .getLogger(ObTableClientQueryStreamResult.class); - protected ObTableClient client; protected ObTableQueryResult referToNewPartition(ObPair partIdWithObTable) throws Exception { @@ -84,19 +82,4 @@ protected Map> refreshPartition(ObTableQuery ta throws Exception { return buildPartitions(client, tableQuery, tableName); } - - /** - * Get client. - * @return client - */ - public ObTableClient getClient() { - return client; - } - - /* - * Set client. - */ - public void setClient(ObTableClient client) { - this.client = client; - } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index 6459398c..885cc96f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -631,7 +631,7 @@ private void executeWithRetries( Map>>>> currentPartitions = new HashMap<>(); currentPartitions.put(entry.getKey(), entry.getValue()); - while (retryCount < maxRetries && !success) { + while (retryCount <= maxRetries && !success) { boolean allPartitionsSuccess = true; for (Map.Entry>>>> currentEntry : currentPartitions.entrySet()) {