Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class BatchOperation {
boolean isSameType = true;
protected ObTableEntityType entityType = ObTableEntityType.KV;
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
protected ObReadConsistency readConsistency = null; // BatchOperation 级别的弱读设置

/*
* default constructor
Expand All @@ -63,6 +64,7 @@ public BatchOperation() {
client = null;
withResult = false;
operations = new ArrayList<>();
readConsistency = null;
}

/*
Expand Down Expand Up @@ -188,6 +190,25 @@ public void setEntityType(ObTableEntityType entityType) {
this.entityType = entityType;
}

/**
* Set read consistency level for batch operation.
* This setting will override the readConsistency settings on individual Get operations.
* @param readConsistency read consistency level
* @return this
*/
public BatchOperation setReadConsistency(ObReadConsistency readConsistency) {
this.readConsistency = readConsistency;
return this;
}

/**
* Get read consistency level for batch operation.
* @return read consistency level
*/
public ObReadConsistency getReadConsistency() {
return readConsistency;
}

public void setServerCanRetry(boolean canRetry) {
this.serverCanRetry = canRetry;
}
Expand Down Expand Up @@ -317,13 +338,6 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception {
return new BatchOperationResult(batchOps.executeWithResult());
}

private boolean checkReadConsistency(ObTableClient obTableClient, ObReadConsistency readConsistency) throws IllegalArgumentException {
// 如果没有设置语句级别的 readConsistency(null),使用 TableRoute 上的 consistencyLevel
if (readConsistency == null) {
return obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK;
}
return readConsistency == ObReadConsistency.WEAK;
}

private BatchOperationResult executeWithLSBatchOp() throws Exception {
if (tableName == null || tableName.isEmpty()) {
Expand Down Expand Up @@ -369,11 +383,23 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
if (get.getRowKey() == null) {
throw new IllegalArgumentException("RowKey is null in Get operation");
}
isWeakRead = checkReadConsistency(obTableClient, get.getReadConsistency());
// BatchOperation 级别的 readConsistency 优先,忽略 Get 上的设置
if (readConsistency != null) {
isWeakRead = (readConsistency == ObReadConsistency.WEAK);
} else {
// 如果 BatchOperation 没有设置,使用 TableRoute 上的全局设置
isWeakRead = obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK;
}
batchOps.addOperation(get);
} else if (operation instanceof TableQuery) {
TableQuery query = (TableQuery) operation;
isWeakRead = checkReadConsistency(obTableClient, query.getReadConsistency());
// BatchOperation 级别的 readConsistency 优先,忽略 TableQuery 上的设置
if (readConsistency != null) {
isWeakRead = (readConsistency == ObReadConsistency.WEAK);
} else {
// 如果 BatchOperation 没有设置,使用 TableRoute 上的全局设置
isWeakRead = obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK;
}
batchOps.addOperation(query);
} else if (operation instanceof QueryAndMutate) {
QueryAndMutate qm = (QueryAndMutate) operation;
Expand Down
126 changes: 104 additions & 22 deletions src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ class SqlAuditResult {
public String svrIp;
public int svrPort;
public int tabletId;
public int consistency_level;

public SqlAuditResult(String svrIp, int svrPort, int tabletId) {
public SqlAuditResult(String svrIp, int svrPort, int tabletId, int consistency_level) {
this.svrIp = svrIp;
this.svrPort = svrPort;
this.tabletId = tabletId;
this.consistency_level = consistency_level;
}

@Override
public String toString() {
return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + '}';
return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + ", consistency_level=" + consistency_level + '}';
}
}

Expand Down Expand Up @@ -413,6 +415,38 @@ private int extractTabletId(String querySql) {
}
}

private int extractConsistencyLevel(String querySql) {
// 查找 consistency_level: 的模式
String pattern = "consistency_level:";
int startIndex = querySql.indexOf(pattern);
if (startIndex == -1) {
return -1; // 如果找不到,返回 -1
}
// 找到 : 后面的数字开始位置
int levelStartIndex = startIndex + pattern.length();
// 跳过可能的空格
while (levelStartIndex < querySql.length() && Character.isWhitespace(querySql.charAt(levelStartIndex))) {
levelStartIndex++;
}
// 找到数字结束位置(遇到 , 或 } 或空格)
int levelEndIndex = levelStartIndex;
while (levelEndIndex < querySql.length()) {
char c = querySql.charAt(levelEndIndex);
if (c == ',' || c == '}' || c == ' ') {
break;
}
levelEndIndex++;
}
// 提取数字字符串
String consistencyLevelStr = querySql.substring(levelStartIndex, levelEndIndex).trim();
try {
return Integer.parseInt(consistencyLevelStr);
} catch (NumberFormatException e) {
debugPrint("Failed to parse consistency_level from: %s", consistencyLevelStr);
return -1;
}
}

private static int getTenantId(Connection connection) throws Exception {
PreparedStatement statement = connection.prepareStatement(GET_TENANT_ID_SQL);
statement.setString(1, TENANT_NAME);
Expand Down Expand Up @@ -444,7 +478,8 @@ private SqlAuditResult getServerBySqlAudit(String rowkey, String stmtType) throw
int svrPort = resultSet.getInt("svr_port");
String querySql = resultSet.getString("query_sql");
int tabletId = extractTabletId(querySql);
sqlAuditResult = new SqlAuditResult(svrIp, svrPort, tabletId);
int consistencyLevel = extractConsistencyLevel(querySql);
sqlAuditResult = new SqlAuditResult(svrIp, svrPort, tabletId, consistencyLevel);
debugPrint("querySql: %s", querySql);
debugPrint("sqlAuditResult: %s", sqlAuditResult.toString());
}
Expand Down Expand Up @@ -1511,19 +1546,23 @@ public void testIdcBatchGet1() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据
BatchOperation batch = client.batchOperation(TABLE_NAME);
Get get = client.get(TABLE_NAME)
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
Get get1 = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
.select("c2");
batch.addOperation(get);
Get get2 = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey + "2")))
.select("c2");
batch.addOperation(get1, get2);
BatchOperationResult res = batch.execute();
Assert.assertNotNull(res);
Assert.assertEquals(1, res.getResults().size());
Assert.assertEquals(2, res.getResults().size());
Assert.assertEquals("c2_val", res.get(0).getOperationRow().get("c2"));
// 4. 查询 sql audit,确定读请求发到哪个节点和分区上
SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, BATCH_GET_STMT_TYPE);
debugPrint("sqlAuditResult: %s", sqlAuditResult.toString());
Assert.assertEquals(ObReadConsistency.WEAK.getValue(), sqlAuditResult.consistency_level);
// 5. 查询分区的位置信息
PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId);
debugPrint("partitionLocation: %s", partitionLocation.toString());
Expand All @@ -1533,6 +1572,49 @@ public void testIdcBatchGet1() throws Exception {
Assert.assertEquals(IDC2, readReplica.getIdc());
}

/*
* 测试场景:用户正常使用场景,使用batch get接口进行指定IDC读
* 测试预期:发到对应的IDC上进行读取
*/
@Test
public void testIdcBatchGet1_1() throws Exception {
ObTableClient client = newTestClient();
client.setCurrentIDC(IDC2); // 设置当前 idc
client.init();
// 1. 准备数据
String rowkey = getRandomRowkString();
insertData(client, rowkey);
Thread.sleep(1000); // 等待数据同步到所有节点
// 2. 设置 idc
setZoneRegionIdc(ZONE1, REGION1, IDC1);
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据
BatchOperation batch = client.batchOperation(TABLE_NAME);
Get get1 = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.select("c2");
Get get2 = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey + "2")))
.select("c2");
batch.addOperation(get1, get2);
BatchOperationResult res = batch.execute();
Assert.assertNotNull(res);
Assert.assertEquals(2, res.getResults().size());
Assert.assertEquals("c2_val", res.get(0).getOperationRow().get("c2"));
// 4. 查询 sql audit,确定读请求发到哪个节点和分区上
SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, BATCH_GET_STMT_TYPE);
debugPrint("sqlAuditResult: %s", sqlAuditResult.toString());
Assert.assertEquals(ObReadConsistency.STRONG.getValue(), sqlAuditResult.consistency_level);
// 5. 查询分区的位置信息
PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId);
debugPrint("partitionLocation: %s", partitionLocation.toString());
// 6. 校验
ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, sqlAuditResult.svrPort);
debugPrint("readReplica: %s", readReplica.toString());
Assert.assertTrue(readReplica.isLeader());
}

/*
* 测试场景:未设置当前IDC进行弱读
* 测试预期:发到任意follower上进行弱读
Expand All @@ -1551,10 +1633,10 @@ public void testIdcBatchGet2() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down Expand Up @@ -1631,10 +1713,10 @@ public void testIdcBatchGet4() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down Expand Up @@ -1672,10 +1754,10 @@ public void testIdcBatchGet5() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据,使用strong consistency
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.STRONG); // 设置强一致性读
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.STRONG) // 设置强一致性读
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down Expand Up @@ -1712,10 +1794,10 @@ public void testIdcBatchGet6() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down Expand Up @@ -1783,10 +1865,10 @@ public void testIdcBatchGet8() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.WEAK)
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down Expand Up @@ -1864,10 +1946,10 @@ public void testIdcBatchGet10() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 使用不同大小写的weak
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.WEAK) // 大写
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down Expand Up @@ -1946,10 +2028,10 @@ public void testIdcBatchGet12() throws Exception {
setZoneRegionIdc(ZONE2, REGION2, IDC2);
setZoneRegionIdc(ZONE3, REGION3, IDC3);
// 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置
BatchOperation batch = client.batchOperation(TABLE_NAME);
BatchOperation batch = client.batchOperation(TABLE_NAME)
.setReadConsistency(ObReadConsistency.STRONG); // 语句级别设置为strong,应该覆盖全局的weak
Get get = client.get(TABLE_NAME)
.setRowKey(row(colVal("c1", rowkey)))
.setReadConsistency(ObReadConsistency.STRONG) // 语句级别设置为strong,应该覆盖全局的weak
.select("c2");
batch.addOperation(get);
BatchOperationResult res = batch.execute();
Expand Down