From 48e73262c83de68b1b3ae75cfa9127e7b13caef4 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Thu, 4 Dec 2025 22:05:52 +0800 Subject: [PATCH] fix weak read in batch get --- .../rpc/mutation/BatchOperation.java | 44 ++++-- .../oceanbase/rpc/ObTableWeakReadTest.java | 126 +++++++++++++++--- 2 files changed, 139 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index ca02e48e..ad9eb306 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -54,6 +54,7 @@ public class BatchOperation { boolean isSameType = true; protected ObTableEntityType entityType = ObTableEntityType.KV; protected OHOperationType hbaseOpType = OHOperationType.INVALID; + protected ObReadConsistency readConsistency = null; // BatchOperation 级别的弱读设置 /* * default constructor @@ -63,6 +64,7 @@ public BatchOperation() { client = null; withResult = false; operations = new ArrayList<>(); + readConsistency = null; } /* @@ -188,6 +190,25 @@ public void setEntityType(ObTableEntityType entityType) { this.entityType = entityType; } + /** + * Set read consistency level for batch operation. + * This setting will override the readConsistency settings on individual Get operations. + * @param readConsistency read consistency level + * @return this + */ + public BatchOperation setReadConsistency(ObReadConsistency readConsistency) { + this.readConsistency = readConsistency; + return this; + } + + /** + * Get read consistency level for batch operation. + * @return read consistency level + */ + public ObReadConsistency getReadConsistency() { + return readConsistency; + } + public void setServerCanRetry(boolean canRetry) { this.serverCanRetry = canRetry; } @@ -317,13 +338,6 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception { return new BatchOperationResult(batchOps.executeWithResult()); } - private boolean checkReadConsistency(ObTableClient obTableClient, ObReadConsistency readConsistency) throws IllegalArgumentException { - // 如果没有设置语句级别的 readConsistency(null),使用 TableRoute 上的 consistencyLevel - if (readConsistency == null) { - return obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK; - } - return readConsistency == ObReadConsistency.WEAK; - } private BatchOperationResult executeWithLSBatchOp() throws Exception { if (tableName == null || tableName.isEmpty()) { @@ -369,11 +383,23 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception { if (get.getRowKey() == null) { throw new IllegalArgumentException("RowKey is null in Get operation"); } - isWeakRead = checkReadConsistency(obTableClient, get.getReadConsistency()); + // BatchOperation 级别的 readConsistency 优先,忽略 Get 上的设置 + if (readConsistency != null) { + isWeakRead = (readConsistency == ObReadConsistency.WEAK); + } else { + // 如果 BatchOperation 没有设置,使用 TableRoute 上的全局设置 + isWeakRead = obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK; + } batchOps.addOperation(get); } else if (operation instanceof TableQuery) { TableQuery query = (TableQuery) operation; - isWeakRead = checkReadConsistency(obTableClient, query.getReadConsistency()); + // BatchOperation 级别的 readConsistency 优先,忽略 TableQuery 上的设置 + if (readConsistency != null) { + isWeakRead = (readConsistency == ObReadConsistency.WEAK); + } else { + // 如果 BatchOperation 没有设置,使用 TableRoute 上的全局设置 + isWeakRead = obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK; + } batchOps.addOperation(query); } else if (operation instanceof QueryAndMutate) { QueryAndMutate qm = (QueryAndMutate) operation; diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java index 881adbeb..1e386a38 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java @@ -48,16 +48,18 @@ class SqlAuditResult { public String svrIp; public int svrPort; public int tabletId; + public int consistency_level; - public SqlAuditResult(String svrIp, int svrPort, int tabletId) { + public SqlAuditResult(String svrIp, int svrPort, int tabletId, int consistency_level) { this.svrIp = svrIp; this.svrPort = svrPort; this.tabletId = tabletId; + this.consistency_level = consistency_level; } @Override public String toString() { - return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + '}'; + return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + ", consistency_level=" + consistency_level + '}'; } } @@ -413,6 +415,38 @@ private int extractTabletId(String querySql) { } } + private int extractConsistencyLevel(String querySql) { + // 查找 consistency_level: 的模式 + String pattern = "consistency_level:"; + int startIndex = querySql.indexOf(pattern); + if (startIndex == -1) { + return -1; // 如果找不到,返回 -1 + } + // 找到 : 后面的数字开始位置 + int levelStartIndex = startIndex + pattern.length(); + // 跳过可能的空格 + while (levelStartIndex < querySql.length() && Character.isWhitespace(querySql.charAt(levelStartIndex))) { + levelStartIndex++; + } + // 找到数字结束位置(遇到 , 或 } 或空格) + int levelEndIndex = levelStartIndex; + while (levelEndIndex < querySql.length()) { + char c = querySql.charAt(levelEndIndex); + if (c == ',' || c == '}' || c == ' ') { + break; + } + levelEndIndex++; + } + // 提取数字字符串 + String consistencyLevelStr = querySql.substring(levelStartIndex, levelEndIndex).trim(); + try { + return Integer.parseInt(consistencyLevelStr); + } catch (NumberFormatException e) { + debugPrint("Failed to parse consistency_level from: %s", consistencyLevelStr); + return -1; + } + } + private static int getTenantId(Connection connection) throws Exception { PreparedStatement statement = connection.prepareStatement(GET_TENANT_ID_SQL); statement.setString(1, TENANT_NAME); @@ -444,7 +478,8 @@ private SqlAuditResult getServerBySqlAudit(String rowkey, String stmtType) throw int svrPort = resultSet.getInt("svr_port"); String querySql = resultSet.getString("query_sql"); int tabletId = extractTabletId(querySql); - sqlAuditResult = new SqlAuditResult(svrIp, svrPort, tabletId); + int consistencyLevel = extractConsistencyLevel(querySql); + sqlAuditResult = new SqlAuditResult(svrIp, svrPort, tabletId, consistencyLevel); debugPrint("querySql: %s", querySql); debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); } @@ -1511,19 +1546,23 @@ public void testIdcBatchGet1() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 - BatchOperation batch = client.batchOperation(TABLE_NAME); - Get get = client.get(TABLE_NAME) + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读 + Get get1 = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读 .select("c2"); - batch.addOperation(get); + Get get2 = client.get(TABLE_NAME) + .setRowKey(row(colVal("c1", rowkey + "2"))) + .select("c2"); + batch.addOperation(get1, get2); BatchOperationResult res = batch.execute(); Assert.assertNotNull(res); - Assert.assertEquals(1, res.getResults().size()); + Assert.assertEquals(2, res.getResults().size()); Assert.assertEquals("c2_val", res.get(0).getOperationRow().get("c2")); // 4. 查询 sql audit,确定读请求发到哪个节点和分区上 SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, BATCH_GET_STMT_TYPE); debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + Assert.assertEquals(ObReadConsistency.WEAK.getValue(), sqlAuditResult.consistency_level); // 5. 查询分区的位置信息 PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); debugPrint("partitionLocation: %s", partitionLocation.toString()); @@ -1533,6 +1572,49 @@ public void testIdcBatchGet1() throws Exception { Assert.assertEquals(IDC2, readReplica.getIdc()); } + /* + * 测试场景:用户正常使用场景,使用batch get接口进行指定IDC读 + * 测试预期:发到对应的IDC上进行读取 + */ + @Test + public void testIdcBatchGet1_1() throws Exception { + ObTableClient client = newTestClient(); + client.setCurrentIDC(IDC2); // 设置当前 idc + client.init(); + // 1. 准备数据 + String rowkey = getRandomRowkString(); + insertData(client, rowkey); + Thread.sleep(1000); // 等待数据同步到所有节点 + // 2. 设置 idc + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); + // 3. 获取数据 + BatchOperation batch = client.batchOperation(TABLE_NAME); + Get get1 = client.get(TABLE_NAME) + .setRowKey(row(colVal("c1", rowkey))) + .select("c2"); + Get get2 = client.get(TABLE_NAME) + .setRowKey(row(colVal("c1", rowkey + "2"))) + .select("c2"); + batch.addOperation(get1, get2); + BatchOperationResult res = batch.execute(); + Assert.assertNotNull(res); + Assert.assertEquals(2, res.getResults().size()); + Assert.assertEquals("c2_val", res.get(0).getOperationRow().get("c2")); + // 4. 查询 sql audit,确定读请求发到哪个节点和分区上 + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, BATCH_GET_STMT_TYPE); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + Assert.assertEquals(ObReadConsistency.STRONG.getValue(), sqlAuditResult.consistency_level); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验 + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isLeader()); + } + /* * 测试场景:未设置当前IDC进行弱读 * 测试预期:发到任意follower上进行弱读 @@ -1551,10 +1633,10 @@ public void testIdcBatchGet2() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读 Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读 .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute(); @@ -1631,10 +1713,10 @@ public void testIdcBatchGet4() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读 Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读 .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute(); @@ -1672,10 +1754,10 @@ public void testIdcBatchGet5() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,使用strong consistency - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.STRONG); // 设置强一致性读 Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.STRONG) // 设置强一致性读 .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute(); @@ -1712,10 +1794,10 @@ public void testIdcBatchGet6() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读 Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读 .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute(); @@ -1783,10 +1865,10 @@ public void testIdcBatchGet8() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读 Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.WEAK) .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute(); @@ -1864,10 +1946,10 @@ public void testIdcBatchGet10() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用不同大小写的weak - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读 Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.WEAK) // 大写 .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute(); @@ -1946,10 +2028,10 @@ public void testIdcBatchGet12() throws Exception { setZoneRegionIdc(ZONE2, REGION2, IDC2); setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置 - BatchOperation batch = client.batchOperation(TABLE_NAME); + BatchOperation batch = client.batchOperation(TABLE_NAME) + .setReadConsistency(ObReadConsistency.STRONG); // 语句级别设置为strong,应该覆盖全局的weak Get get = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) - .setReadConsistency(ObReadConsistency.STRONG) // 语句级别设置为strong,应该覆盖全局的weak .select("c2"); batch.addOperation(get); BatchOperationResult res = batch.execute();