From f005f4b6749feef7e9df663957947e881f88ab4e Mon Sep 17 00:00:00 2001 From: maochongxin Date: Thu, 7 Nov 2024 00:33:20 +0800 Subject: [PATCH 1/3] add threshold for refresh table entry with location --- .../alipay/oceanbase/rpc/ObTableClient.java | 1 + .../oceanbase/rpc/location/LocationUtil.java | 71 ++++++++++++++----- .../oceanbase/rpc/property/Property.java | 2 + .../rpc/table/AbstractObTableClient.java | 2 + 4 files changed, 57 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index f289bcb0..98f8eaa1 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -401,6 +401,7 @@ private void initProperties() { routeMap.put(TABLE_ENTRY_ACQUIRE_CONNECT_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireConnectTimeout)); routeMap.put(TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireSocketTimeout)); routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_BASE.getKey(), String.valueOf(tableEntryRefreshIntervalBase)); + routeMap.put(TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getKey(), String.valueOf(tableEntryLocationRefreshThreshold)); routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_CEILING.getKey(), String.valueOf(tableEntryRefreshIntervalCeiling)); routeMap.put(TABLE_ENTRY_REFRESH_TRY_TIMES.getKey(), String.valueOf(tableEntryRefreshTryTimes)); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index 9e1fde4d..eda6cbe7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -47,6 +47,7 @@ import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MAX_PARTITION_ELEMENT; import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MIN_PARTITION_ELEMENT; +import static com.alipay.oceanbase.rpc.property.Property.TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD; import static com.alipay.oceanbase.rpc.util.RandomUtil.getRandomNum; import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*; import static java.lang.String.format; @@ -745,10 +746,14 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn } if (ObGlobal.obVsnMajor() >= 4) { - // only set empty partitionEntry - ObPartitionEntry partitionEntry = new ObPartitionEntry(); - tableEntry.setPartitionEntry(partitionEntry); - tableEntry.setRefreshTimeMills(System.currentTimeMillis()); + // only set empty partitionEntry + if (tableEntry.getPartitionNum() <= TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getDefaultLong()) { + getTableEntryLocationFromRemote(connection, key, tableEntry); + } else { + ObPartitionEntry partitionEntry = new ObPartitionEntry(); + tableEntry.setPartitionEntry(partitionEntry); + tableEntry.setRefreshTimeMills(System.currentTimeMillis()); + } } else { // get location info getTableEntryLocationFromRemote(connection, key, tableEntry); @@ -915,6 +920,7 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection, ps.setString(1, key.getTenantName()); ps.setString(2, key.getDatabaseName()); ps.setString(3, key.getTableName()); + ps.setString(4, key.getTenantName()); rs = ps.executeQuery(); partitionEntry = getPartitionLocationFromResultSet(tableEntry, rs, partitionEntry); } catch (Exception e) { @@ -1279,28 +1285,55 @@ private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tab } else { tabletLsIdMap.put(partitionId, INVALID_LS_ID); // non-partitioned table } + ObPartitionLocationInfo partitionLocationInfo = partitionEntry + .getPartitionInfo(partitionId); + ObPartitionLocation location = partitionLocationInfo.getPartitionLocation(); + if (location == null) { + partitionLocationInfo.rwLock.writeLock().lock(); + try { + location = partitionLocationInfo.getPartitionLocation(); + if (location == null) { + location = new ObPartitionLocation(); + partitionLocationInfo.updateLocation(location, lsId); + } + } finally { + partitionLocationInfo.rwLock.writeLock().unlock(); + } + } + if (!replica.isValid()) { + RUNTIME + .warn(format( + "Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d", + replica, partitionId, tableEntry.getTableId())); + continue; + } + location.addReplicaLocation(replica); + + if (partitionLocationInfo.initialized.compareAndSet(false, true)) { + partitionLocationInfo.initializationLatch.countDown(); + } } else { partitionId = rs.getLong("partition_id"); if (tableEntry.isPartitionTable() - && null != tableEntry.getPartitionInfo().getSubPartDesc()) { + && null != tableEntry.getPartitionInfo().getSubPartDesc()) { partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry - .getPartitionInfo().getSubPartDesc().getPartNum()); + .getPartitionInfo().getSubPartDesc().getPartNum()); } - } - if (!replica.isValid()) { - RUNTIME - .warn(format( - "replica is invalid, continue, replica=%s, partitionId/tabletId=%d, tableId=%d", - replica, partitionId, tableEntry.getTableId())); - continue; - } - ObPartitionLocation location = partitionLocation.get(partitionId); + if (!replica.isValid()) { + RUNTIME + .warn(format( + "replica is invalid, continue, replica=%s, partitionId/tabletId=%d, tableId=%d", + replica, partitionId, tableEntry.getTableId())); + continue; + } + ObPartitionLocation location = partitionLocation.get(partitionId); - if (location == null) { - location = new ObPartitionLocation(); - partitionLocation.put(partitionId, location); + if (location == null) { + location = new ObPartitionLocation(); + partitionLocation.put(partitionId, location); + } + location.addReplicaLocation(replica); } - location.addReplicaLocation(replica); } if (ObGlobal.obVsnMajor() < 4) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java index 8ee42ebb..3b39839d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java +++ b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java @@ -59,6 +59,8 @@ public enum Property { TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT("table.entry.acquire.socket.timeout", 3000L, "刷新TABLE地址的SOCKET超时时间"), + TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD("table.entry.location.refresh.threshold", 100L, "刷新TABLE ENTRY同时刷新LOCATION的阈值"), + TABLE_ENTRY_REFRESH_INTERVAL_BASE("table.entry.refresh.interval.base", 100L, "刷新TABLE地址的基础时间间隔"), @Deprecated diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java index 49168ebe..e5e73818 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java @@ -50,6 +50,8 @@ public abstract class AbstractObTableClient extends AbstractTable { protected long tableEntryRefreshIntervalBase = TABLE_ENTRY_REFRESH_INTERVAL_BASE .getDefaultLong(); + protected long tableEntryLocationRefreshThreshold = TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getDefaultLong(); + protected long tableEntryRefreshIntervalCeiling = TABLE_ENTRY_REFRESH_INTERVAL_CEILING .getDefaultLong(); From f040f1610546515a6ca126b16919dc29b9a2830d Mon Sep 17 00:00:00 2001 From: "shenyunlong.syl" Date: Thu, 7 Nov 2024 17:08:40 +0800 Subject: [PATCH 2/3] [Fix] remove TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD in Property --- src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java | 1 - .../com/alipay/oceanbase/rpc/location/LocationUtil.java | 6 +++--- .../java/com/alipay/oceanbase/rpc/property/Property.java | 2 -- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 98f8eaa1..f289bcb0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -401,7 +401,6 @@ private void initProperties() { routeMap.put(TABLE_ENTRY_ACQUIRE_CONNECT_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireConnectTimeout)); routeMap.put(TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireSocketTimeout)); routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_BASE.getKey(), String.valueOf(tableEntryRefreshIntervalBase)); - routeMap.put(TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getKey(), String.valueOf(tableEntryLocationRefreshThreshold)); routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_CEILING.getKey(), String.valueOf(tableEntryRefreshIntervalCeiling)); routeMap.put(TABLE_ENTRY_REFRESH_TRY_TIMES.getKey(), String.valueOf(tableEntryRefreshTryTimes)); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index eda6cbe7..a78ea8c9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -47,7 +47,6 @@ import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MAX_PARTITION_ELEMENT; import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MIN_PARTITION_ELEMENT; -import static com.alipay.oceanbase.rpc.property.Property.TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD; import static com.alipay.oceanbase.rpc.util.RandomUtil.getRandomNum; import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*; import static java.lang.String.format; @@ -214,7 +213,8 @@ public class LocationUtil { private static final int TEMPLATE_PART_ID = -1; // limit the size of get tableEntry location from remote each time - private static final int MAX_TABLET_NUMS_EPOCH = 300; + private static final int MAX_TABLET_NUMS_EPOCH = 300; + private static final int TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD = 100; private abstract static class TableEntryRefreshWithPriorityCallback { abstract T execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException; @@ -747,7 +747,7 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn if (ObGlobal.obVsnMajor() >= 4) { // only set empty partitionEntry - if (tableEntry.getPartitionNum() <= TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getDefaultLong()) { + if (tableEntry.getPartitionNum() <= TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD) { getTableEntryLocationFromRemote(connection, key, tableEntry); } else { ObPartitionEntry partitionEntry = new ObPartitionEntry(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java index 3b39839d..8ee42ebb 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java +++ b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java @@ -59,8 +59,6 @@ public enum Property { TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT("table.entry.acquire.socket.timeout", 3000L, "刷新TABLE地址的SOCKET超时时间"), - TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD("table.entry.location.refresh.threshold", 100L, "刷新TABLE ENTRY同时刷新LOCATION的阈值"), - TABLE_ENTRY_REFRESH_INTERVAL_BASE("table.entry.refresh.interval.base", 100L, "刷新TABLE地址的基础时间间隔"), @Deprecated From 71508a992b54e707038a05158ef6f463aa6a3f09 Mon Sep 17 00:00:00 2001 From: "shenyunlong.syl" Date: Thu, 7 Nov 2024 17:10:38 +0800 Subject: [PATCH 3/3] [Fix] fix compile error --- .../com/alipay/oceanbase/rpc/table/AbstractObTableClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java index e5e73818..49168ebe 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java @@ -50,8 +50,6 @@ public abstract class AbstractObTableClient extends AbstractTable { protected long tableEntryRefreshIntervalBase = TABLE_ENTRY_REFRESH_INTERVAL_BASE .getDefaultLong(); - protected long tableEntryLocationRefreshThreshold = TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getDefaultLong(); - protected long tableEntryRefreshIntervalCeiling = TABLE_ENTRY_REFRESH_INTERVAL_CEILING .getDefaultLong();