From 107b09b0ade3a0173011e3fba3744b41704a636c Mon Sep 17 00:00:00 2001 From: stuBirdFly <1065492934@qq.com> Date: Mon, 28 Oct 2024 20:59:11 +0800 Subject: [PATCH] support hbase scan renewLease --- .../query/AbstractQueryStreamResult.java | 7 +++++ .../syncquery/ObQueryOperationType.java | 2 +- .../ObTableClientQueryAsyncStreamResult.java | 26 +++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 5da49bbc..d2d4ead9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -271,6 +271,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, return result; } + /* + * RenewLease. + */ + public void renewLease() throws Exception { + throw new IllegalStateException("renew only support stream query"); + } + /* * Next. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java index 0f384754..b329e3ef 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java @@ -21,7 +21,7 @@ import java.util.Map; public enum ObQueryOperationType { - QUERY_START(0), QUERY_NEXT(1), QUERY_END(2); + QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3); private int value; private static Map map = new HashMap(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index cae7f39c..a526c47e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -181,6 +181,32 @@ protected Map> refreshPartition(ObTableQuery ta return buildPartitions(client, tableQuery, tableName); } + // This function is designed for HBase-type requests. + // It is used to extend the session duration of a scan + @Override + public void renewLease() throws Exception { + if (!isEnd() && !expectant.isEmpty()) { + Iterator>> it = expectant.entrySet() + .iterator(); + Map.Entry> lastEntry = it.next(); + ObPair partIdWithObTable = lastEntry.getValue(); + // try access new partition, async will not remove useless expectant + ObTableParam obTableParam = partIdWithObTable.getRight(); + ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); + + // refresh request info + queryRequest.setPartitionId(obTableParam.getPartitionId()); + queryRequest.setTableId(obTableParam.getTableId()); + + // refresh async query request + asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW); + asyncRequest.setQuerySessionId(sessionId); + executeAsync(partIdWithObTable, asyncRequest); + } else { + throw new ObTableException("query end or expectant is null"); + } + } + @Override public boolean next() throws Exception { checkStatus();