diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 740531e5..3f71128d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -2754,7 +2754,7 @@ public ObTableClientType getClientType(RunningMode runningMode) { } } - public void setReadConsistency(ObReadConsistency readConsistency) throws IllegalArgumentException { + public void setReadConsistency(ObReadConsistency readConsistency) { tableRoute.setReadConsistency(readConsistency); } @@ -2762,7 +2762,7 @@ public String getReadConsistency() { return tableRoute.getReadConsistency().name(); } - public void setRoutePolicy(ObRoutePolicy policy) throws IllegalArgumentException { + public void setRoutePolicy(ObRoutePolicy policy) { tableRoute.setRoutePolicy(policy); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java index a9d754fa..8e2b1068 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java @@ -158,7 +158,7 @@ public ObReadConsistency getReadConsistency() { * Set read consistency level. * @param readConsistency read consistency level */ - public void setReadConsistency(ObReadConsistency readConsistency) throws IllegalArgumentException { + public void setReadConsistency(ObReadConsistency readConsistency) { this.consistencyLevel = readConsistency; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocation.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocation.java index 0efe70c7..af337160 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocation.java @@ -104,26 +104,38 @@ private ReplicaLocation getReadReplicaNoLdc(ObRoutePolicy routePolicy) { private ReplicaLocation getReadReplicaByRoutePolicy(ObRoutePolicy routePolicy) throws IllegalArgumentException { + // 路由策略优先:FOLLOWER_FIRST 优先选择 follower,FOLLOWER_ONLY 只能选择 follower + // 在满足路由策略的前提下,按就近原则选择(同机房 -> 同 region -> 其他 region) + + // 优先在同机房找 follower for (ReplicaLocation r : sameIdc) { - if (r.isValid()) { + if (r.isValid() && !r.isLeader()) { return r; } } + + // 如果同机房没有 follower,在同 region 找 follower for (ReplicaLocation r : sameRegion) { - if (r.isValid()) { + if (r.isValid() && !r.isLeader()) { return r; } } + + // 如果同 region 没有 follower,在其他 region 找 follower for (ReplicaLocation r : otherRegion) { - if (r.isValid()) { + if (r.isValid() && !r.isLeader()) { return r; } } - + + // 如果都没有找到 follower if (routePolicy == ObRoutePolicy.FOLLOWER_ONLY) { + // FOLLOWER_ONLY 必须选择 follower,没有就抛出异常 throw new IllegalArgumentException("No follower replica found for route policy: " - + routePolicy); + + routePolicy); } + + // 如果都没有找到,返回 leader(兜底) return leader; } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java index 56e870a1..881adbeb 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java @@ -26,6 +26,7 @@ import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.get.Get; +import com.alipay.oceanbase.rpc.location.model.ObRoutePolicy; import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObReadConsistency; @@ -162,13 +163,16 @@ public class ObTableWeakReadTest { private static String PROXY_SYS_USER_NAME = "root"; private static String PROXY_SYS_USER_PASSWORD = ""; private static boolean USE_ODP = false; - private static String ODP_IP = "ip-addr"; - private static int ODP_PORT = 0; - private static String ODP_DATABASE = "database-name"; + private static String ODP_IP = ""; + private static int ODP_SQL_PORT = 2883; + private static int ODP_RPC_PORT = 2885; + private static String ODP_DATABASE = "test"; + private static String JDBC_DATABASE = "test"; private static String JDBC_IP = ""; private static String JDBC_PORT = ""; - private static String JDBC_URL = "jdbc:mysql://"+JDBC_IP+":"+JDBC_PORT+"/test?rewriteBatchedStatements=TRUE&allowMultiQueries=TRUE&useLocalSessionState=TRUE&useUnicode=TRUE&characterEncoding=utf-8&socketTimeout=30000000&connectTimeout=600000&sessionVariables=ob_query_timeout=60000000000"; - + private static String JDBC_URL = "jdbc:mysql://"+JDBC_IP+":"+JDBC_PORT+"/"+JDBC_DATABASE+"?rewriteBatchedStatements=TRUE&allowMultiQueries=TRUE&useLocalSessionState=TRUE&useUnicode=TRUE&characterEncoding=utf-8&socketTimeout=30000000&connectTimeout=600000&sessionVariables=ob_query_timeout=60000000000"; + private static String JDBC_PROXY_URL = "jdbc:mysql://"+ODP_IP+":"+ODP_SQL_PORT+"/"+JDBC_DATABASE+"?rewriteBatchedStatements=TRUE&allowMultiQueries=TRUE&useLocalSessionState=TRUE&useUnicode=TRUE&characterEncoding=utf-8&socketTimeout=30000000&connectTimeout=600000&sessionVariables=ob_query_timeout=60000000000"; + private static boolean printDebug = true; private static int SQL_AUDIT_PERSENT = 20; private static String TENANT_NAME = "mysql"; @@ -180,6 +184,9 @@ public class ObTableWeakReadTest { private static String IDC1 = "idc1"; private static String IDC2 = "idc2"; private static String IDC3 = "idc3"; + private static String REGION1 = "region1"; + private static String REGION2 = "region2"; + private static String REGION3 = "region3"; private static String GET_STMT_TYPE = "KV_GET"; private static String SCAN_STMT_TYPE = "KV_QUERY"; private static String BATCH_GET_STMT_TYPE = "KV_MULTI_GET"; @@ -203,8 +210,12 @@ public class ObTableWeakReadTest { + "WHERE t.table_name = ? AND t.tenant_id = ? AND t.tablet_id = ?;"; private static String SET_SQL_AUDIT_PERSENT_SQL = "SET GLOBAL ob_sql_audit_percentage =?;"; private static String GET_TENANT_ID_SQL = "SELECT tenant_id FROM oceanbase.__all_tenant WHERE tenant_name = ?"; + private static String SET_IDC_SQL = "ALTER PROXYCONFIG SET proxy_idc_name = ?;"; + private static String SET_ROUTE_POLICY_SQL = "ALTER PROXYCONFIG SET proxy_route_policy = ?;"; private Connection tenantConnection = null; private Connection sysConnection = null; + private Connection proxyConnection = null; + private static Connection staticProxyConnection = null; private static Connection staticTenantConnection = null; private static Connection staticSysConnection = null; private static int staticTenantId = 0; @@ -225,7 +236,7 @@ private static ObTableClient newTestClient() throws Exception { obTableClient.setOdpMode(true); obTableClient.setFullUserName(FULL_USER_NAME); obTableClient.setOdpAddr(ODP_IP); - obTableClient.setOdpPort(ODP_PORT); + obTableClient.setOdpPort(ODP_RPC_PORT); obTableClient.setDatabase(ODP_DATABASE); obTableClient.setPassword(PASSWORD); } @@ -247,11 +258,32 @@ private static Connection getSysConnection() throws SQLException { return DriverManager.getConnection(JDBC_URL, "root@sys", PASSWORD); } + /** + * 获取代理连接 + */ + private static Connection getProxyConnection() throws SQLException { + if (USE_ODP) { + // ODP 连接需要包含集群信息的用户名 + // FULL_USER_NAME 格式: user@tenant#cluster + // 对于代理连接,使用 root@sys#cluster 格式 + String[] parts = FULL_USER_NAME.split("#"); + String clusterName = parts.length > 1 ? parts[1] : ""; + String proxyUserName = PROXY_SYS_USER_NAME + "@sys"; + if (!clusterName.isEmpty()) { + proxyUserName += "#" + clusterName; + } + return DriverManager.getConnection(JDBC_PROXY_URL, proxyUserName, PROXY_SYS_USER_PASSWORD); + } else { + return null; + } + } + @org.junit.BeforeClass public static void beforeClass() throws Exception { // 所有测试用例执行前创建表和连接(只执行一次) staticTenantConnection = getConnection(); staticSysConnection = getSysConnection(); + staticProxyConnection = getProxyConnection(); staticTenantId = getTenantId(staticSysConnection); createTable(staticTenantConnection); setSqlAuditPersent(staticTenantConnection, SQL_AUDIT_PERSENT); @@ -268,6 +300,7 @@ public static void afterClass() throws Exception { public void setup() throws Exception { tenantConnection = staticTenantConnection; sysConnection = staticSysConnection; + proxyConnection = staticProxyConnection; tenant_id = staticTenantId; } @@ -301,6 +334,7 @@ private static void cleanupAllData(Connection connection) throws Exception { private static void setMinimalImage(Connection connection) throws Exception { Statement statement = connection.createStatement(); statement.execute("SET GLOBAL binlog_row_image=MINIMAL"); + Thread.sleep(5000); } private static void setFullImage(Connection connection) throws Exception { @@ -328,6 +362,19 @@ private void setZoneIdc(String zone, String idc) throws Exception { statement.execute(); } + private void setZoneRegion(String zone, String region) throws Exception { + PreparedStatement statement = sysConnection.prepareStatement("ALTER SYSTEM MODIFY ZONE ? SET REGION = ?;"); + statement.setString(1, zone); + statement.setString(2, region); + debugPrint("setZoneRegion SQL: %s", statement.toString()); + statement.execute(); + } + + private void setZoneRegionIdc(String zone, String region, String idc) throws Exception { + setZoneRegion(zone, region); + setZoneIdc(zone, idc); + } + // 通过当前纳秒时间戳生成随机字符串 private String getRandomRowkString() { return System.nanoTime() + ""; @@ -469,6 +516,26 @@ private void debugPrint(String format, Object... args) { } } + private void setIdc(ObTableClient client, String idc) throws Exception { + if (USE_ODP) { + PreparedStatement statement = proxyConnection.prepareStatement(SET_IDC_SQL); + statement.setString(1, idc); + statement.execute(); + } else { + client.setCurrentIDC(idc); + } + } + + private void setRoutePolicy(ObTableClient client, String routePolicy) throws Exception { + if (USE_ODP) { + PreparedStatement statement = proxyConnection.prepareStatement(SET_ROUTE_POLICY_SQL); + statement.setString(1, routePolicy); + statement.execute(); + } else { + client.setRoutePolicy(ObRoutePolicy.getByName(routePolicy)); + } + } + /* * 测试场景:用户正常使用场景,使用get接口进行指定IDC读 * 测试预期:发到对应的IDC上进行读取 @@ -476,16 +543,17 @@ private void debugPrint(String format, Object... args) { @Test public void testIdcGet1() throws Exception { ObTableClient client = newTestClient(); - client.setCurrentIDC(IDC2); // 设置当前 idc + setIdc(client, IDC2); // 设置当前 idc + setRoutePolicy(client, "follower_first"); // 设置路由策略 client.init(); // 1. 准备数据 String rowkey = getRandomRowkString(); insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -506,6 +574,45 @@ public void testIdcGet1() throws Exception { Assert.assertEquals(IDC2, readReplica.getIdc()); } + /* + * 测试场景:同城弱读,读到follower + * 测试预期:读到follower + */ + @Test + public void testIdcGet1_1() throws Exception { + ObTableClient client = newTestClient(); + setIdc(client, IDC1); // 设置当前 idc + setRoutePolicy(client, "follower_first"); // 设置路由策略 + client.init(); + // 1. 准备数据 + String rowkey = getRandomRowkString(); + insertData(client, rowkey); + Thread.sleep(1000); // 等待数据同步到所有节点 + // 2. 设置 idc + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION1, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); + // 3. 获取数据 + Map result = client.get(TABLE_NAME) + .setRowKey(row(colVal("c1", rowkey))) + .setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读 + .select("c2") + .execute(); + debugPrint("c2_val: %s", result.get("c2")); + Assert.assertEquals("c2_val", result.get("c2")); + // 4. 查询 sql audit,确定读请求发到哪个节点和分区上 + SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, GET_STMT_TYPE); + debugPrint("sqlAuditResult: %s", sqlAuditResult.toString()); + // 5. 查询分区的位置信息 + PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId); + debugPrint("partitionLocation: %s", partitionLocation.toString()); + // 6. 校验 + ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, sqlAuditResult.svrPort); + debugPrint("readReplica: %s", readReplica.toString()); + Assert.assertTrue(readReplica.isFollower()); + Assert.assertEquals(IDC2, readReplica.getIdc()); + } + /* * 测试场景:未设置当前IDC进行弱读 * 测试预期:发到任意follower上进行弱读 @@ -520,9 +627,9 @@ public void testIdcGet2() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -557,9 +664,9 @@ public void testIdcGet3() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -594,9 +701,9 @@ public void testIdcGet4() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -632,9 +739,9 @@ public void testIdcGet5() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,使用strong consistency Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -669,9 +776,9 @@ public void testIdcGet6() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -706,9 +813,9 @@ public void testIdcGet7() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用非法的ReadConsistency值,应该抛出异常 try { ObReadConsistency.getByName("invalid_consistency"); // 非法的consistency值 @@ -737,9 +844,9 @@ public void testIdcGet8() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc(只设置IDC1, IDC2, IDC3,不设置IDC4) - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -775,9 +882,9 @@ public void testIdcGet9() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 不设置ReadConsistency,使用默认值(应该是strong) Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -812,9 +919,9 @@ public void testIdcGet10() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用不同大小写的weak Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -850,9 +957,9 @@ public void testIdcGet11() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -888,9 +995,9 @@ public void testIdcGet12() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置 Map result = client.get(TABLE_NAME) .setRowKey(row(colVal("c1", rowkey))) @@ -925,9 +1032,9 @@ public void testIdcScan1() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -965,9 +1072,9 @@ public void testIdcScan2() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1005,9 +1112,9 @@ public void testIdcScan3() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1045,9 +1152,9 @@ public void testIdcScan4() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1086,9 +1193,9 @@ public void testIdcScan5() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,使用strong consistency QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1126,9 +1233,9 @@ public void testIdcScan6() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1166,9 +1273,9 @@ public void testIdcScan7() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用非法的ReadConsistency值,应该抛出异常 try { ObReadConsistency.getByName("invalid_consistency"); // 非法的consistency值 @@ -1197,9 +1304,9 @@ public void testIdcScan8() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc(只设置IDC1, IDC2, IDC3,不设置IDC4) - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1238,9 +1345,9 @@ public void testIdcScan9() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 不设置ReadConsistency,使用默认值(应该是strong) QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1278,9 +1385,9 @@ public void testIdcScan10() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用不同大小写的weak QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1319,9 +1426,9 @@ public void testIdcScan11() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1360,9 +1467,9 @@ public void testIdcScan12() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置 QueryResultSet res = client.query(TABLE_NAME) .addScanRange(new Object[] { rowkey }, new Object[] { rowkey }) @@ -1400,9 +1507,9 @@ public void testIdcBatchGet1() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1440,9 +1547,9 @@ public void testIdcBatchGet2() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1480,9 +1587,9 @@ public void testIdcBatchGet3() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1520,9 +1627,9 @@ public void testIdcBatchGet4() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1561,9 +1668,9 @@ public void testIdcBatchGet5() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,使用strong consistency BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1601,9 +1708,9 @@ public void testIdcBatchGet6() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1641,9 +1748,9 @@ public void testIdcBatchGet7() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用非法的ReadConsistency值,应该抛出异常 try { ObReadConsistency.getByName("invalid_consistency"); // 非法的consistency值 @@ -1672,9 +1779,9 @@ public void testIdcBatchGet8() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc(只设置IDC1, IDC2, IDC3,不设置IDC4) - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1713,9 +1820,9 @@ public void testIdcBatchGet9() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 不设置ReadConsistency,使用默认值(应该是strong) BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1753,9 +1860,9 @@ public void testIdcBatchGet10() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 使用不同大小写的weak BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1794,9 +1901,9 @@ public void testIdcBatchGet11() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1835,9 +1942,9 @@ public void testIdcBatchGet12() throws Exception { insertData(client, rowkey); Thread.sleep(1000); // 等待数据同步到所有节点 // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置 BatchOperation batch = client.batchOperation(TABLE_NAME); Get get = client.get(TABLE_NAME) @@ -1872,9 +1979,9 @@ public void testDmlInsertWithGlobalWeak() throws Exception { client.setCurrentIDC(IDC2); // 设置当前 idc client.init(); // 1. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 2. 执行INSERT操作 String rowkey = getRandomRowkString(); client.insert(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))) @@ -1906,9 +2013,9 @@ public void testDmlUpdateWithGlobalWeak() throws Exception { insertData(client, rowkey); Thread.sleep(500); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 执行UPDATE操作 client.update(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))) .addMutateRow(row(colVal("c2", "c2_val_updated"))).execute(); @@ -1938,9 +2045,9 @@ public void testDmlDeleteWithGlobalWeak() throws Exception { String rowkey = getRandomRowkString(); insertData(client, rowkey); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 执行DELETE操作 client.delete(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))).execute(); // 4. 查询 sql audit,确定写请求发到哪个节点和分区上 @@ -1967,9 +2074,9 @@ public void testDmlReplaceWithGlobalWeak() throws Exception { client.init(); // 1. 设置 idc String rowkey = getRandomRowkString(); - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 2. 执行REPLACE操作 client.replace(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))) .addMutateRow(row(colVal("c2", "c2_val_replaced"))).execute(); @@ -1997,9 +2104,9 @@ public void testDmlInsertOrUpdateWithGlobalWeak() throws Exception { client.init(); // 1. 设置 idc String rowkey = getRandomRowkString(); - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 2. 执行INSERT_OR_UPDATE操作 client.insertOrUpdate(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))) .addMutateRow(row(colVal("c2", "c2_val"))).execute(); @@ -2029,9 +2136,9 @@ public void testDmlPutWithGlobalWeak() throws Exception { client.init(); // 1. 设置 idc String rowkey = getRandomRowkString(); - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 2. 执行PUT操作 client.put(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))) .addMutateRow(row(colVal("c2", "c2_val_put"))).execute(); @@ -2064,9 +2171,9 @@ public void testDmlIncrementWithGlobalWeak() throws Exception { client.init(); // 1. 设置 idc String rowkey = getRandomRowkString(); - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 2. 执行INCREMENT操作(注意:INCREMENT通常用于数值类型,这里仅测试路由逻辑) // 由于表结构是varchar,这里可能失败,但我们可以先测试路由 try { @@ -2103,9 +2210,9 @@ public void testDmlAppendWithGlobalWeak() throws Exception { String rowkey = getRandomRowkString(); insertData(client, rowkey); // 2. 设置 idc - setZoneIdc(ZONE1, IDC1); - setZoneIdc(ZONE2, IDC2); - setZoneIdc(ZONE3, IDC3); + setZoneRegionIdc(ZONE1, REGION1, IDC1); + setZoneRegionIdc(ZONE2, REGION2, IDC2); + setZoneRegionIdc(ZONE3, REGION3, IDC3); // 3. 执行APPEND操作 client.append(TABLE_NAME).setRowKey(row(colVal("c1", rowkey))) .addMutateRow(row(colVal("c2", "_appended"))).execute();