diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java index 1192c873..8afb323b 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java @@ -218,6 +218,12 @@ public TableQuery setMaxResultSize(long maxResultSize) { return tableClientQuery.setMaxResultSize(maxResultSize); } + @Override + public TableQuery setOperationTimeout(long operationTimeout) { + tableClientQuery.setOperationTimeout(operationTimeout); + return this; + } + /** * Clear. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index e9e17c65..a1032ee9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -26,6 +26,7 @@ import com.alipay.oceanbase.rpc.location.model.partition.*; import com.alipay.oceanbase.rpc.mutation.*; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregation; @@ -45,6 +46,7 @@ import com.alipay.remoting.util.StringUtils; import org.slf4j.Logger; +import javax.sound.midi.SysexMessage; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -155,6 +157,9 @@ public class ObTableClient extends AbstractObTableClient implements Lifecycle { private ConcurrentHashMap TableGroupCache = new ConcurrentHashMap(); // tableGroup -> Table private ConcurrentHashMap TableGroupInverted = new ConcurrentHashMap(); // Table -> tableGroup + private RouteTableRefresher routeTableRefresher; + + private Thread backgroundRefreshTableTask; private Long clientId; private Map TableConfigs = new HashMap<>(); /* @@ -177,6 +182,10 @@ public void init() throws Exception { initProperties(); // 4. init metadata initMetadata(); + // 5. run fresh table task + routeTableRefresher = new RouteTableRefresher(this); + backgroundRefreshTableTask = new Thread(routeTableRefresher); + backgroundRefreshTableTask.start(); initialized = true; } catch (Throwable t) { BOOT.warn("failed to init ObTableClient", t); @@ -202,6 +211,7 @@ public void close() throws Exception { return; } closed = true; + routeTableRefresher.finish(); if (tableRoster != null) { Exception throwException = null; List exceptionObServers = new ArrayList(); @@ -238,6 +248,14 @@ public void close() throws Exception { } } + public RouteTableRefresher getRouteTableRefresher() { + return routeTableRefresher; + } + + public Map getTableLocations() { + return tableLocations; + } + /* * Check status. */ @@ -260,7 +278,7 @@ public Long getClientId() { public Map getTableConfigs() { return TableConfigs; } - + private void initTableConfigs() { TableConfigs.put("client_id", clientId); TableConfigs.put("runtime", new HashMap()); @@ -354,7 +372,7 @@ private void initProperties() { // add configs value to TableConfigs - + // runtime Object value = TableConfigs.get("runtime"); if (value instanceof Map) { @@ -372,7 +390,7 @@ private void initProperties() { Map logMap = (Map) value; logMap.put(SLOW_QUERY_MONITOR_THRESHOLD.getKey(), String.valueOf(slowQueryMonitorThreshold)); } - + value = TableConfigs.get("route"); if (value instanceof Map) { Map routeMap = (Map) value; @@ -390,7 +408,7 @@ private void initProperties() { if (runtimeBatchExecutor != null) { useExecutor = true; } - + value = TableConfigs.get("thread_pool"); if (value instanceof Map) { Map threadPoolMap = (Map) value; @@ -1124,7 +1142,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r final boolean waitForRefresh) throws Exception { return getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false); } - + /** * Get or refresh table entry. * @param tableName table name @@ -1619,6 +1637,45 @@ public ObPair getTableWithPartId(String tableName, long part return getTableInternal(tableName, tableEntry, partId, waitForRefresh, route); } + /** + * + * @param moveResponse reRoute response + * @return + */ + public ObTable getTable(ObTableApiMove moveResponse) throws Exception { + ObServerAddr addr = new ObServerAddr(); + addr.setIp(moveResponse.getReplica().getServer().ipToString()); + addr.setSvrPort(moveResponse.getReplica().getServer().getPort()); + logger.info("get new server ip {}, port {} from move response ", addr.getIp(), addr.getSvrPort()); + + for (Map.Entry entry: tableRoster.entrySet()){ + if (Objects.equals(entry.getKey().getIp(), addr.getIp()) && Objects.equals(entry.getKey().getSvrPort(), addr.getSvrPort())){ + return entry.getValue(); + } + } + // If the node address does not exist, a new table is created + return addTable(addr); + } + + public ObTable addTable(ObServerAddr addr){ + + try { + logger.info("server from response not exist in route cache, server ip {}, port {} , execute add Table.", addr.getIp(), addr.getSvrPort()); + ObTable obTable = new ObTable.Builder(addr.getIp(), addr.getSvrPort()) // + .setLoginInfo(tenantName, userName, password, database) // + .setProperties(getProperties()).build(); + tableRoster.put(addr, obTable); + return obTable; + } catch (Exception e) { + BOOT.warn( + "The addr{}:{} failed to put into table roster, the node status may be wrong, Ignore", + addr.getIp(), addr.getSvrPort()); + RUNTIME.warn("Get table from API_MOVE response ip and port meet exception", e); + e.printStackTrace(); + } + return null; + } + /** * get addr from table entry by pardId * @param tableName table want to get @@ -1949,7 +2006,7 @@ public Map execute(ObPair obPair) throws Exc // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); request.setConsistencyLevel(obReadConsistency.toObTableConsistencyLevel()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result); String endpoint = obTable.getIp() + ":" + obTable.getPort(); @@ -1992,7 +2049,7 @@ public Long execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "UPDATE", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2033,7 +2090,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "UPDATE", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2075,7 +2132,7 @@ public Long execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "DELETE", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2113,7 +2170,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "DELETE", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2155,7 +2212,7 @@ public Long execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INSERT", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2196,7 +2253,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INSERT", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2237,7 +2294,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2279,7 +2336,7 @@ public Long execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "REPLACE", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2320,7 +2377,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "REPLACE", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2362,7 +2419,7 @@ public Long execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INERT_OR_UPDATE", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2408,7 +2465,7 @@ public ObPayload execute(ObPair obPair) throws Exception { if (usePut) { request.setOptionFlag(ObTableOptionFlag.USE_PUT); } - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INERT_OR_UPDATE", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2468,7 +2525,7 @@ public Map execute(ObPair obPair) throws Exc request.setReturningAffectedEntity(withResult); request.setTableId(tableParam.getTableId()); request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INCREMENT", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2515,7 +2572,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INCREMENT", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2553,7 +2610,7 @@ public Map execute(ObPair obPair) throws Exc request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INCREMENT", endpoint, rowKey, (ObTableOperationResult) result, getTableTime - start, @@ -2594,7 +2651,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableName); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableName, "INCREMENT", endpoint, rowKey, (ObTableOperationResult) result, TableTime - start, @@ -2671,7 +2728,7 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setPartitionId(tableParam.getPartitionId()); request.getTableQueryAndMutate().setIsCheckAndExecute(checkAndExecute); request.getTableQueryAndMutate().setIsCheckNoExists(!checkExists); - ObPayload result = obTable.execute(request); + ObPayload result = executeWithRetry(obTable, request, tableQuery.getTableName()); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableQuery.getTableName(), "QUERY_AND_MUTATE", operation.getOperationType().toString(), endpoint, @@ -2684,6 +2741,24 @@ public ObPayload execute(ObPair obPair) throws Exception { }); } + public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tableName) throws Exception { + ObPayload result = obTable.execute(request); + if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) { + ObTableApiMove moveResponse = (ObTableApiMove) result; + getRouteTableRefresher().addTableIfAbsent(tableName, true); + getRouteTableRefresher().triggerRefreshTable(); + obTable = getTable(moveResponse); + result = obTable.execute(request); + if (result instanceof ObTableApiMove) { + ObTableApiMove move = (ObTableApiMove) result; + logger.warn("The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " + + "Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString()); + throw new ObTableRoutingWrongException(); + } + } + return result; + } + /** * * @param tableQuery table query @@ -2885,7 +2960,8 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.setTableId(tableParam.getTableId()); request.setPartitionId(tableParam.getPartitionId()); request.setTimeout(tableParam.getObTable().getObTableOperationTimeout()); - return tableParam.getObTable().execute(request); + ObTable obTable = tableParam.getObTable(); + return executeWithRetry(obTable, request, request.getTableName()); } } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java index 32228332..a588758d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java @@ -22,6 +22,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult; @@ -111,9 +112,7 @@ public ObPayload newPayload(ObRpcPacketHeader header) { */ @Override public ObPayload newPayload(ObRpcPacketHeader header) { - throw new ObTableRoutingWrongException( - "Receive rerouting response packet. " - + "Java client is not supported and need to Refresh table router entry"); + return new ObTableApiMove(); } }, // OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) { @@ -161,9 +160,7 @@ public static ObTablePacketCode valueOf(short value) { case Pcodes.OB_TABLE_API_LS_EXECUTE: return OB_TABLE_API_LS_EXECUTE; case Pcodes.OB_TABLE_API_MOVE: - throw new ObTableRoutingWrongException( - "Receive rerouting response packet. " - + "Java client is not supported and need to Refresh table router entry"); + return OB_TABLE_API_MOVE; case Pcodes.OB_ERROR_PACKET: return OB_ERROR_PACKET; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java index 2055a615..89ee4bcb 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java @@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques ObRpcResultCode resultCode = new ObRpcResultCode(); resultCode.decode(buf); // If response indicates the request is routed to wrong server, we should refresh the routing meta. - if (response.getHeader().isRoutingWrong()) { + if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) { String errMessage = TraceUtil.formatTraceMessage(conn, request, "routed to the wrong server: " + response.getMessage()); logger.warn(errMessage); @@ -139,7 +139,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques throw new ObTableNeedFetchAllException(errMessage); } } - if (resultCode.getRcode() != 0) { + if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) { String errMessage = TraceUtil.formatTraceMessage(conn, request, "routed to the wrong server: " + response.getMessage()); logger.warn(errMessage); @@ -186,7 +186,6 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque InvokeCallback invokeCallback) { return new ObClientFuture(request.getId()); } - // schema changed private boolean needFetchAll(int errorCode, int pcode) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/ObReplicaType.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/ObReplicaType.java index 72c8d8b6..e928e61e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/ObReplicaType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/ObReplicaType.java @@ -17,6 +17,8 @@ package com.alipay.oceanbase.rpc.location.model; +import java.util.Objects; + /** * ObReplicaType(副本类型) * @@ -72,6 +74,10 @@ static public ObReplicaType getReplicaType(int idx) { } } + public int getIndex() { + return this.index; + } + /* * whether the replica is readable. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java new file mode 100644 index 00000000..d8a76f4b --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java @@ -0,0 +1,93 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ +package com.alipay.oceanbase.rpc.location.model; + +import com.alipay.oceanbase.rpc.ObTableClient; + + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.alipay.oceanbase.rpc.location.model.partition.ObPair; +import org.slf4j.Logger; + +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.getLogger; + +public class RouteTableRefresher extends Thread{ + + private static final Logger logger = getLogger(RouteTableRefresher.class); + + private volatile AtomicBoolean isFinished = new AtomicBoolean(false); // Thread end flag + + private final Semaphore semaphore = new Semaphore(0); + + private volatile ConcurrentLinkedQueue> refreshTableTasks; // Task refresh queue + + ObTableClient client; + + private final Lock lock = new ReentrantLock(); // Ensure the atomicity of the AddIfAbsent operation. + + public RouteTableRefresher(ObTableClient client){ + this.client = client; + } + + public void finish() { + isFinished.set(true); + } + + @Override + public void run() { + refreshTableTasks = new ConcurrentLinkedQueue<>(); + while (!isFinished.get()) { + try { + semaphore.acquire(); // A semaphore is associated with a task; it ensures that only one task is processed at a time. + logger.info("Thread name {}, id{} acquire semaphore, begin execute route refresher", currentThread().getName(), currentThread().getId()); + } catch (InterruptedException e) { + logger.info("Thread name {}, id {} is interrupted", currentThread().getName(), currentThread().getId()); + } + ObPair refreshTableTask = refreshTableTasks.peek(); + if (refreshTableTask != null && refreshTableTask.getRight()) { + String tableName = refreshTableTask.getLeft(); + try { + logger.info("backgroundRefreshTableTask run refresh, table name {}", tableName); + TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, true, false, false); + client.getTableLocations().put(refreshTableTask.getLeft(), tableEntry); + } catch (Exception e) { + String message = "RefreshTableBackground run meet exception" + e.getMessage(); + logger.warn(message); + } + } + refreshTableTasks.poll(); + } + } + + public void addTableIfAbsent(String tableName, Boolean isRefreshing){ + lock.lock(); + if (!refreshTableTasks.contains(new ObPair<>(tableName, isRefreshing))) { + logger.info("add table {}, is refreshing {} to refresh task.", tableName, isRefreshing); + refreshTableTasks.add(new ObPair<>(tableName,isRefreshing)); + } + lock.unlock(); + } + + public void triggerRefreshTable() { + semaphore.release(); + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/property/AbstractPropertyAware.java b/src/main/java/com/alipay/oceanbase/rpc/property/AbstractPropertyAware.java index 5ebe84cf..d294bcd7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/property/AbstractPropertyAware.java +++ b/src/main/java/com/alipay/oceanbase/rpc/property/AbstractPropertyAware.java @@ -60,7 +60,10 @@ public long parseToLong(String key, long defaultV) { } } - public boolean parseToBoolean(String key) { + public boolean parseToBoolean(String key) throws Exception { + if (System.getProperty(OB_TABLE_CLIENT_PREFIX + key) == null && getProperty(key) == null){ + throw new Exception(); + } return Boolean.parseBoolean(System.getProperty(OB_TABLE_CLIENT_PREFIX + key, getProperty(key))); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java index c3de7fbf..abdfbfde 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java +++ b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java @@ -102,7 +102,7 @@ public enum Property { RUNTIME_BATCH_MAX_WAIT("runtime.batch.max.wait", 3000L, "批量执行请求的超时时间"), // [ObTableClient][LOG] - SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", 10L, "记录到 MONITOR 日志中的慢操作的运行时间阈值"), + SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", -1L, "记录到 MONITOR 日志中的慢操作的运行时间阈值"), /* * property in [`ObTable`] @@ -134,7 +134,7 @@ public enum Property { NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"), // [ObTable][OTHERS] - SERVER_ENABLE_REROUTING("server.enable.rerouting", "false", "开启server端的重定向回复功能"), + SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"), /* * other config diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObAddr.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObAddr.java new file mode 100644 index 00000000..928667dc --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObAddr.java @@ -0,0 +1,118 @@ +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +public class ObAddr extends AbstractPayload { + + private ObAddrVersion obAddrVersion; + + private byte[] ip; + private int port; + + public ObAddr(){ + this.obAddrVersion = ObAddrVersion.ObAddrIPV4; + this.ip = new byte[16]; + this.port = 0; + } + + public String ipToString() { + if (isIPv4()){ + return getIPv4().getHostAddress(); + } + return getIPv6().getHostAddress(); + } + + public InetAddress getIPv4(){ + if (isIPv6()){ + return null; + } + try { + return InetAddress.getByAddress(Arrays.copyOf(ip, 4)); + } catch (UnknownHostException e){ + // 需要查看对应的错误吗进行处理 + } + return null; + } + + public InetAddress getIPv6() { + if (isIPv6()){ + return null; + } + try { + return InetAddress.getByAddress(ip); + } catch (UnknownHostException e){ + // 需要查看对应的错误吗进行处理 + } + return null; + } + + public boolean isIPv4() { + return obAddrVersion == ObAddrVersion.ObAddrIPV4; + } + + public boolean isIPv6() { + return obAddrVersion == ObAddrVersion.ObAddrIPV6; + } + + public int getPort(){ + return port; + } + + @Override + public long getPayloadContentSize() { + return 0; + } + + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + System.arraycopy(Serialization.encodeI8(obAddrVersion.getValue()), 0, bytes, idx, Serialization.getNeedBytes(obAddrVersion.getValue())); + + ByteBuffer buffer = ByteBuffer.wrap(ip).order(ByteOrder.BIG_ENDIAN); + int ip1 = buffer.getInt(0); + int ip2 = buffer.getInt(4); + int ip3 = buffer.getInt(8); + int ip4 = buffer.getInt(12); + System.arraycopy(Serialization.encodeVi32(ip1), 0, bytes, idx, Serialization.getNeedBytes(ip1)); + System.arraycopy(Serialization.encodeVi32(ip2), 0, bytes, idx, Serialization.getNeedBytes(ip2)); + System.arraycopy(Serialization.encodeVi32(ip3), 0, bytes, idx, Serialization.getNeedBytes(ip3)); + System.arraycopy(Serialization.encodeVi32(ip4), 0, bytes, idx, Serialization.getNeedBytes(ip4)); + + System.arraycopy(Serialization.encodeVi32(port), 0, bytes, idx, Serialization.getNeedBytes(port)); + return bytes; + } + + @Override + public ObAddr decode(ByteBuf buf) { + super.decode(buf); + this.obAddrVersion = ObAddrVersion.fromValue(Serialization.decodeI8(buf)); + //decode ip addr + + int ip1, ip2, ip3, ip4; + ip1 = Serialization.decodeVi32(buf); + ip2 = Serialization.decodeVi32(buf); + ip3 = Serialization.decodeVi32(buf); + ip4 = Serialization.decodeVi32(buf); + ByteBuffer ipBuffer = ByteBuffer.wrap(ip).order(ByteOrder.BIG_ENDIAN); + ipBuffer.putInt(0, ip1); + ipBuffer.putInt(4, ip2); + ipBuffer.putInt(8, ip3); + ipBuffer.putInt(12, ip4); + + this.port = Serialization.decodeVi32(buf); + + return this; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObAddrVersion.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObAddrVersion.java new file mode 100644 index 00000000..6f23c162 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObAddrVersion.java @@ -0,0 +1,25 @@ +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +public enum ObAddrVersion { + ObAddrIPV4((byte) 4), + ObAddrIPV6((byte) 6); + + private final byte value; + + ObAddrVersion(byte value) { + this.value = value; + } + + public byte getValue() { + return value; + } + + public static ObAddrVersion fromValue(int value){ + for (ObAddrVersion obAddrVersion : values()) { + if (obAddrVersion.value == value) { + return obAddrVersion; + } + } + return ObAddrIPV4; //默认使用IPV4, 或者抛异常。 + } +} \ No newline at end of file diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableApiMove.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableApiMove.java new file mode 100644 index 00000000..eb6d8268 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableApiMove.java @@ -0,0 +1,59 @@ +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +public class ObTableApiMove extends AbstractPayload { + + private ObTableMoveReplicaInfo replica; + private long reserved; + + public ObTableApiMove() { + replica = new ObTableMoveReplicaInfo(); + reserved = 0L; + } + + @Override + public int getPcode() { + return Pcodes.OB_TABLE_API_MOVE; + } + + @Override + protected int encodeHeader(byte[] bytes, int idx) { + return super.encodeHeader(bytes, idx); + } + + public ObTableMoveReplicaInfo getReplica() { + return replica; + } + + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + System.arraycopy(replica.encode(), 0, bytes, idx, Serialization.getNeedBytes(replica.encode())); + System.arraycopy(Serialization.encodeVi64(reserved), 0, bytes, idx, Serialization.getNeedBytes(reserved)); + + return bytes; + } + + @Override + public ObTableApiMove decode(ByteBuf buf) { + super.decode(buf); + + replica.decode(buf); + + reserved = Serialization.decodeVi64(buf); + return this; + } + + @Override + public long getPayloadContentSize() { + return 0; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableMoveReplicaInfo.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableMoveReplicaInfo.java new file mode 100644 index 00000000..19fb99e9 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableMoveReplicaInfo.java @@ -0,0 +1,73 @@ +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.location.model.ObReplicaType; +import com.alipay.oceanbase.rpc.location.model.ObServerRole; +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.ObGlobal.OB_VERSION; + +public class ObTableMoveReplicaInfo extends AbstractPayload { + + private long tableId; + private long schemaVersion; + private long partitionId; + private ObAddr server; + private ObServerRole obServerRole; + private ObReplicaType obReplicaType; + private long partRenewTime; + private long reserved; + + public ObTableMoveReplicaInfo() { + server = new ObAddr(); + } + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + System.arraycopy(Serialization.encodeVi64(tableId), 0, bytes, idx, Serialization.getNeedBytes(tableId)); + System.arraycopy(Serialization.encodeVi64(schemaVersion), 0, bytes, idx, Serialization.getNeedBytes(schemaVersion)); + if (OB_VERSION >= 4){ + System.arraycopy(Serialization.encodeI64(partitionId), 0, bytes, idx, Serialization.getNeedBytes(partitionId)); + } else { + System.arraycopy(Serialization.encodeVi64(partitionId), 0, bytes, idx, Serialization.getNeedBytes(partitionId)); + } + System.arraycopy(server.encode(), 0, bytes, idx, Serialization.getNeedBytes(server.encode())); + System.arraycopy(Serialization.encodeVi32(obServerRole.getIndex()), 0, bytes, idx, Serialization.getNeedBytes(obServerRole.getIndex())); + System.arraycopy(Serialization.encodeVi32(obReplicaType.getIndex()), 0, bytes, idx, Serialization.getNeedBytes(obReplicaType.getIndex())); + System.arraycopy(Serialization.encodeVi64(partRenewTime), 0, bytes, idx, Serialization.getNeedBytes(partRenewTime)); + System.arraycopy(Serialization.encodeVi64(reserved), 0, bytes, idx, Serialization.getNeedBytes(reserved)); + return bytes; + } + + @Override + public long getPayloadContentSize() { + return 0; + } + + public ObAddr getServer(){ + return server; + } + + @Override + public ObTableMoveReplicaInfo decode(ByteBuf buf) { + super.decode(buf); + tableId = Serialization.decodeVi64(buf); + schemaVersion = Serialization.decodeVi64(buf); + if (OB_VERSION >= 4){ + partitionId = Serialization.decodeI64(buf); + } else { + partitionId = Serialization.decodeVi64(buf); + } + server.decode(buf); + obServerRole = ObServerRole.getRole(Serialization.decodeI8(buf)); + obReplicaType = ObReplicaType.getReplicaType(Serialization.decodeVi32(buf)); + partRenewTime = Serialization.decodeVi64(buf); + reserved = Serialization.decodeVi64(buf); + return this; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 6a82d50a..1a378051 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -28,6 +28,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult; @@ -103,7 +104,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, ObPayload request, AtomicReference connectionRef) throws Exception { - Object result; + ObPayload result; ObTable subObTable = partIdWithIndex.getRight().getObTable(); boolean needRefreshTableEntry = false; int tryTimes = 0; @@ -146,6 +147,20 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, result = subObTable.executeWithConnection(request, connectionRef); } else { result = subObTable.execute(request); + + if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) { + ObTableApiMove moveResponse = (ObTableApiMove) result; + client.getRouteTableRefresher().addTableIfAbsent(indexTableName, true); + client.getRouteTableRefresher().triggerRefreshTable(); + subObTable = client.getTable(moveResponse); + result = subObTable.execute(request); + if (result instanceof ObTableApiMove) { + ObTableApiMove move = (ObTableApiMove) result; + logger.warn("The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " + + "Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString()); + throw new ObTableRoutingWrongException(); + } + } } client.resetExecuteContinuousFailureCount(indexTableName); break; @@ -244,7 +259,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, } Thread.sleep(client.getRuntimeRetryInterval()); } - return (ObPayload) result; + return result; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index 02882ad8..62c7126e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -66,7 +66,7 @@ public class ObTable extends AbstractObTable implements Lifecycle { private volatile boolean initialized = false; private volatile boolean closed = false; - private boolean reRouting = false; // only used for init packet factory + private boolean reRouting = true; // only used for init packet factory private ReentrantLock statusLock = new ReentrantLock(); @@ -174,6 +174,10 @@ private void initProperties() { } } + public boolean getReRouting(){ + return reRouting; + } + /* * Query. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index 5437a5a1..a186a06d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -22,6 +22,8 @@ import com.alipay.oceanbase.rpc.location.model.ObServerRoute; import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.mutation.result.*; +import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; @@ -35,7 +37,6 @@ import java.util.concurrent.TimeUnit; import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*; -import static com.alipay.oceanbase.rpc.util.TraceUtil.formatTraceMessage; public class ObTableClientBatchOpsImpl extends AbstractTableBatchOps { @@ -341,8 +342,21 @@ public void partitionExecute(ObTableOperationResult[] results, subRequest.setPartitionId(newParam.getPartitionId()); } } - subObTableBatchOperationResult = (ObTableBatchOperationResult) subObTable - .execute(subRequest); + ObPayload result = subObTable.execute(subRequest); + if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) { + ObTableApiMove moveResponse = (ObTableApiMove) result; + obTableClient.getRouteTableRefresher().addTableIfAbsent(tableName, true); + obTableClient.getRouteTableRefresher().triggerRefreshTable(); + subObTable = obTableClient.getTable(moveResponse); + result = subObTable.execute(subRequest); + if (result instanceof ObTableApiMove) { + ObTableApiMove move = (ObTableApiMove) result; + logger.warn("The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " + + "Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString()); + throw new ObTableRoutingWrongException(); + } + } + subObTableBatchOperationResult = (ObTableBatchOperationResult) result; obTableClient.resetExecuteContinuousFailureCount(tableName); break; } catch (Exception ex) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index db7b152f..8e6e5347 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -24,6 +24,8 @@ import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.mutation.*; import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; @@ -464,7 +466,21 @@ public void partitionExecute(ObTableSingleOpResult[] results, getRight().getObTable(); } } - subLSOpResult = (ObTableLSOpResult) subObTable.execute(tableLsOpRequest); + ObPayload result = subObTable.execute(tableLsOpRequest); + if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) { + ObTableApiMove moveResponse = (ObTableApiMove) result; + obTableClient.getRouteTableRefresher().addTableIfAbsent(tableName, true); + obTableClient.getRouteTableRefresher().triggerRefreshTable(); + subObTable = obTableClient.getTable(moveResponse); + result = subObTable.execute(tableLsOpRequest); + if (result instanceof ObTableApiMove) { + ObTableApiMove move = (ObTableApiMove) result; + logger.warn("The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " + + "Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString()); + throw new ObTableRoutingWrongException(); + } + } + subLSOpResult = (ObTableLSOpResult) result; obTableClient.resetExecuteContinuousFailureCount(tableName); break; } catch (Exception ex) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java b/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java index ce824d07..4553df23 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java @@ -91,7 +91,7 @@ public static void info(final ObPayload payload, String database, String tableNa String methodName, String type, String endpoint, ObTableQueryAndMutateResult result, ObTableQuery tableQuery, long routeTableTime, long executeTime, long slowQueryMonitorThreshold) { - if (routeTableTime + executeTime >= slowQueryMonitorThreshold) { + if (slowQueryMonitorThreshold >= 0 && routeTableTime + executeTime >= slowQueryMonitorThreshold) { List params = new ArrayList<>(); for (ObNewRange rang : tableQuery.getKeyRanges()) { ObRowKey startKey = rang.getStartKey(); @@ -156,7 +156,7 @@ public static void info(final ObPayload payload, String database, String tableNa String methodName, String endpoint, Object[] rowKeys, ObTableOperationResult result, long routeTableTime, long executeTime, long slowQueryMonitorThreshold) { - if (routeTableTime + executeTime >= slowQueryMonitorThreshold) { + if ( slowQueryMonitorThreshold > 0 && (routeTableTime + executeTime >= slowQueryMonitorThreshold)) { MONITOR.info(logMessage(formatTraceMessage(payload), database, tableName, methodName, endpoint, rowKeys, result, routeTableTime, executeTime)); } @@ -186,7 +186,7 @@ private static String logMessage(String traceId, String database, String tableNa // for each sub batch opreation private static void logMessage0(final ObPayload payload, String database, String tableName, String methodName, String endpoint, ObTableBatchOperation subOperations, long partId, int resultSize, long executeTime, long slowQueryMonitorThreshold) { - if (executeTime < slowQueryMonitorThreshold) { + if (slowQueryMonitorThreshold >= 0 && executeTime < slowQueryMonitorThreshold) { return; } String traceId = formatTraceMessage(payload); @@ -238,7 +238,7 @@ public static void info(final ObPayload payload, String database, String tableNa public static void info(final ObPayload payload, String database, String tableName, String methodName, String endpoint, int resultSize, long routeTableTime, long executeTime, long slowQueryMonitorThreshold) { - if (routeTableTime + executeTime >= slowQueryMonitorThreshold) { + if (slowQueryMonitorThreshold >= 0 && routeTableTime + executeTime >= slowQueryMonitorThreshold) { MONITOR.info(logMessage(formatTraceMessage(payload), database, tableName, methodName, endpoint, resultSize, routeTableTime, executeTime)); } @@ -271,7 +271,7 @@ public static void info(final ObPayload payload, String database, String tableNa String methodName, String endpoint, ObTableQuery tableQuery, AbstractQueryStreamResult result, long routeTableTime, long executeTime, long slowQueryMonitorThreshold) { - if (routeTableTime + executeTime >= slowQueryMonitorThreshold) { + if (slowQueryMonitorThreshold >= 0 && routeTableTime + executeTime >= slowQueryMonitorThreshold) { List params = new ArrayList<>(); for (ObNewRange rang : tableQuery.getKeyRanges()) { ObRowKey startKey = rang.getStartKey(); @@ -299,7 +299,7 @@ private static void logLsOpMessage(final ObPayload payload, String database, Str String methodName, String endpoint, ObTableLSOperation lsOperation, int resultSize, long executeTime, long slowQueryMonitorThreshold) { - if (executeTime < slowQueryMonitorThreshold) { + if (slowQueryMonitorThreshold < 0 || executeTime < slowQueryMonitorThreshold) { return; } String traceId = formatTraceMessage(payload); diff --git a/src/test/java/com/alipay/oceanbase/rpc/location/reroute/RerouteTest.java b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/RerouteTest.java new file mode 100644 index 00000000..19b45a52 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/RerouteTest.java @@ -0,0 +1,393 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ +package com.alipay.oceanbase.rpc.location.reroute; + +import com.alipay.oceanbase.rpc.ObGlobal; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.location.reroute.util.ReplicaOperation; +import com.alipay.oceanbase.rpc.property.Property; +import com.alipay.oceanbase.rpc.stream.QueryResultSet; +import com.alipay.oceanbase.rpc.table.api.TableBatchOps; +import com.alipay.oceanbase.rpc.table.api.TableQuery; +import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.alipay.oceanbase.rpc.util.ObTableClientTestUtil.*; + +public class RerouteTest { + + public ObTableClient moveClient; + public ObTableClient clientNotReroute; + private static final boolean passReroutingTest = false; + private static final String tenantName = ""; + private static final String databaseName = ""; + private static final int partNum = 2; + private static final String testInt32RerouteTableName = "test_int32_reroute"; + private static final String testInt32RerouteCreateStatement = "create table if not exists `test_int32_reroute`(`c1` int(12) not null,`c2` int(12) default null,primary key (`c1`)) partition by hash(c1) partitions 2;"; + + private ReplicaOperation replicaOperation; + @Before + public void setup() throws Exception { + + moveClient = ObTableClientTestUtil.newTestClient(); + moveClient.addProperty(Property.SERVER_ENABLE_REROUTING.getKey(), "true"); + moveClient.init(); + + clientNotReroute = ObTableClientTestUtil.newTestClient(); + clientNotReroute.addProperty(Property.SERVER_ENABLE_REROUTING.getKey(), "false"); + clientNotReroute.init(); + Connection connection = getConnection(); + PreparedStatement statement = connection.prepareStatement(testInt32RerouteCreateStatement); + statement.execute(); + replicaOperation = new ReplicaOperation(); + } + + private Connection getConnection() throws SQLException { + String[] userNames = FULL_USER_NAME.split("#"); + return DriverManager.getConnection("jdbc:mysql://" + JDBC_IP + ":" + JDBC_PORT + + "/" + databaseName + "?" + + "rewriteBatchedStatements=TRUE&" + + "allowMultiQueries=TRUE&" + + "useLocalSessionState=TRUE&" + + "useUnicode=TRUE&" + + "characterEncoding=utf-8&" + + "socketTimeout=3000000&" + + "connectTimeout=60000", userNames[0], PASSWORD); + } + + public static void setReroutingEnable(boolean enable) throws SQLException { + String sql = "alter system set _obkv_feature_mode='rerouting=%s'"; + String targetSql; + if (enable) { + targetSql = String.format(sql, "on"); + } else { + targetSql = String.format(sql, "off"); + } + PreparedStatement ps = getSysConnection().prepareStatement(targetSql); + ps.execute(); + ps.close(); + } + @Test + public void testMoveReplicaSingleOp() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(1, affectRow); + + Map result = moveClient.get(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}); + + Assert.assertEquals(0, result.get("c2")); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + + Thread.sleep(5000); + + result = moveClient.get(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}); + + Assert.assertEquals(0, result.get("c2")); + } finally { + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + @Test + public void testMoveReplicaSingleInsertUp() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + affectRow = moveClient.insertOrUpdate(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + } finally { + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + @Test + public void testMoveReplicaSingleBatch() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + TableBatchOps tableBatchOps = moveClient.batch(testInt32RerouteTableName); + tableBatchOps.get(new Object[] {0}, new String[] {"c1", "c2"}); + List batchResult = tableBatchOps.execute(); + Assert.assertEquals(batchResult.size(), 1); + Assert.assertEquals(((HashMap) batchResult.get(0)).get("c1").intValue(), 0); + Assert.assertEquals(((HashMap) batchResult.get(0)).get("c2").intValue(), 0); + } finally { + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + @Test + public void testMoveReplicaSingleBatchInsertUp() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + TableBatchOps tableBatchOps = moveClient.batch(testInt32RerouteTableName); + tableBatchOps.insertOrUpdate(new Object[] {0}, new String[] {"c2"}, new Object[] {5}); + List result = tableBatchOps.execute(); + + Assert.assertEquals(1, result.size()); + Assert.assertEquals((long)1, result.get(0)); + } finally { + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + @Test + public void testMoveReplicaQuery() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + QueryResultSet result = moveClient.query(testInt32RerouteTableName).select("c1", "c2").addScanRange(new Object[] {0}, new Object[] {0}).execute(); + + while (result.next()) { + Assert.assertEquals(0, result.getRow().get("c1")); + Assert.assertEquals(0, result.getRow().get("c2")); + } + } finally { + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + @Test + public void testMoveReplicaQueryAndMutate() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + affectRow = moveClient.update(testInt32RerouteTableName, new Object[]{0}, new String[] {"c2"}, new Object[] {5}); + Assert.assertEquals(1, affectRow); + } finally { + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + // 在服务端设置不允许重路由 + @Test + public void testMoveReplicaServerReroutingOff() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(false); + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = moveClient.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + // single get + Map rs= moveClient.get(testInt32RerouteTableName, new Object[] {0}, new String[] {"c2"}); + Assert.assertEquals(0, rs.get("c2")); + // multi get + TableBatchOps tableBatchOps = moveClient.batch(testInt32RerouteTableName); + tableBatchOps.get(new Object[] {0}, new String[] {"c2"}); + List res = tableBatchOps.execute(); + Assert.assertEquals(1, res.size()); + Assert.assertEquals(0, ((Map)res.get(0)).get("c2").intValue()); + + // 所有操作前都进行切主,为了防止前边刷新路由表之后,后边对切主无感。 + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + // query + TableQuery tableQuery = moveClient.query(testInt32RerouteTableName).select("c1", "c2").addScanRange(new Object[] {0}, new Object[] {0}); + QueryResultSet queryResultSet = tableQuery.execute(); + while (queryResultSet.next()) { + Assert.assertEquals(0, queryResultSet.getRow().get("c1")); + Assert.assertEquals(0, queryResultSet.getRow().get("c2")); + } + + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + // update + affectRow = moveClient.update(testInt32RerouteTableName, new Object[]{0}, new String[] {"c2"}, new Object[] {5}); + Assert.assertEquals(1, affectRow); + } finally { + setReroutingEnable(true); + Thread.sleep(1000); + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } + + // 在客户端设置不允许重路由 + // 即使不允许重路由,现阶段的修改还是保留了原有的同步等待刷新表的流程,所以下边的操作不会抛异常,只是时间会很久 + @Test + public void testMoveReplicaClientReroutingOff() throws Exception { + try { + if (passReroutingTest) { + System.out.println("Please run Rerouting tests manually!!!"); + System.out.println("Change passReroutingTest to false in src/test/java/com/alipay/oceanbase/rpc/location/model/RerouteTest.java to run rerouting tests."); + Assert.assertFalse(passReroutingTest); + } + setReroutingEnable(true); + clientNotReroute.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + + long affectRow = clientNotReroute.insert(testInt32RerouteTableName, new Object[]{0}, new String[]{"c2"}, new Object[]{0}); + Assert.assertEquals(affectRow, 1); + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + + // single get + Map rs= clientNotReroute.get(testInt32RerouteTableName, new Object[] {0}, new String[] {"c2"}); + Assert.assertEquals(0, rs.get("c2")); + // multi get + TableBatchOps tableBatchOps = clientNotReroute.batch(testInt32RerouteTableName); + tableBatchOps.get(new Object[] {0}, new String[] {"c2"}); + List res = tableBatchOps.execute(); + Assert.assertEquals(1, res.size()); + Assert.assertEquals(0, ((Map)res.get(0)).get("c2").intValue()); + + // 所有操作前都进行切主,为了防止前边刷新路由表之后,后边对切主无感。 + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + // query + TableQuery tableQuery = clientNotReroute.query(testInt32RerouteTableName).select("c1", "c2").addScanRange(new Object[] {0}, new Object[] {0}); + QueryResultSet queryResultSet = tableQuery.execute(); + while (queryResultSet.next()) { + Assert.assertEquals(0, queryResultSet.getRow().get("c1")); + Assert.assertEquals(0, queryResultSet.getRow().get("c2")); + } + + if (ObGlobal.OB_VERSION < 4) { + replicaOperation.switchReplicaLeaderRandomly(tenantName, databaseName, testInt32RerouteTableName, partNum); + } else { + replicaOperation.switchReplicaLeaderRandomly4x(tenantName, databaseName, testInt32RerouteTableName); + } + Thread.sleep(5000); + // update + affectRow = clientNotReroute.update(testInt32RerouteTableName, new Object[]{0}, new String[] {"c2"}, new Object[] {5}); + Assert.assertEquals(1, affectRow); + } finally { + moveClient.addRowKeyElement(testInt32RerouteTableName, new String[]{"c1"}); + moveClient.delete(testInt32RerouteTableName, new Object[] {0}); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/Partition.java b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/Partition.java new file mode 100644 index 00000000..11ac7c60 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/Partition.java @@ -0,0 +1,76 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.location.reroute.util; + +import java.util.ArrayList; +import java.util.List; +public class Partition { + private long tableId; + private long partId; + private long replicaNum; + private Replica leader; + private List follower; + + public Partition(long partId) { + tableId = 0; + this.partId = partId; + replicaNum = 0; + leader = null; + follower = new ArrayList<>(2); + } + + public void setTableId(long tableId) { + this.tableId = tableId; + } + + public long getTableId() { + return tableId; + } + + public void setPartId(long partId) { + this.partId = partId; + } + + public long getPartId() { + return partId; + } + + public void addReplicaNum(long num) { + this.replicaNum += num; + } + + public long getReplicaNum() { + return replicaNum; + } + + public void setLeader(Replica leader) { + this.leader = leader; + } + + public Replica getLeader() { + return leader; + } + + public void appendFollower(Replica follower) { + this.follower.add(follower); + } + + public List getFollower() { + return follower; + } +} diff --git a/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/Replica.java b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/Replica.java new file mode 100644 index 00000000..7b447acd --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/Replica.java @@ -0,0 +1,39 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.location.reroute.util; + +import com.alipay.oceanbase.rpc.location.model.ObServerRole; + +public class Replica { + long tableId; + long partId; + String ip; + int port; + ObServerRole role; + + public Replica(long tableId, long partId, String ip, int port, ObServerRole role ) { + this.tableId = tableId; + this.partId = partId; + this.ip = ip; + this.port = port; + this.role = role; + } + public ObServerRole getRole(){ + return role; + } +} diff --git a/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/ReplicaOperation.java b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/ReplicaOperation.java new file mode 100644 index 00000000..9e13580a --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/location/reroute/util/ReplicaOperation.java @@ -0,0 +1,263 @@ +package com.alipay.oceanbase.rpc.location.reroute.util; + +import com.alipay.oceanbase.rpc.location.model.ObServerRole; +import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +public class ReplicaOperation { + // tenant 跟 tenant_id修改为实际的参数 + + private static final String getReplicaSql = "SELECT A.table_id as table_id, A.partition_id as partition_id, A.svr_ip as svr_ip, B.svr_port as svr_port, A.role as role FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port WHERE tenant_name = ? and database_name = ? and table_name = ? and partition_id in "; + private static final String getLDIdSql = "SELECT ls_id FROM oceanbase.cdb_ob_table_locations WHERE tenant_id = 1004 and database_name = ? and table_name = ? and role = 'LEADER';"; + private static final String getFollowerAddrSql = "SELECT concat(svr_ip,':',svr_port) AS host FROM oceanbase.__all_virtual_ls_meta_table WHERE tenant_id = 1004 and ls_id = ? and role = 2 and replica_status = 'NORMAL' limit 1;"; + private static final String switch2FollwerSql = "ALTER SYSTEM SWITCH REPLICA LEADER ls = ? server= ? tenant='java_client'"; + + private Connection connection; + + public ReplicaOperation() throws SQLException { + connection = ObTableClientTestUtil.getSysConnection(); + } + + + public String createInStatement(int partNum) { + long[] values = new long[partNum]; + for (int i = 0; i < partNum; i++) { + values[i] = i; + } + + // Create inStatement "(0,1,2...partNum);". + StringBuilder inStatement = new StringBuilder(); + inStatement.append("("); + for (int i = 0; i < values.length; i++) { + if (i > 0) { + inStatement.append(", "); + } + inStatement.append(values[i]); + } + inStatement.append(");"); + return inStatement.toString(); + } + + public void getPartitions(String tenantName, String databaseName, String tableName, int partNum, List partitions) throws SQLException { + String sql = getReplicaSql + createInStatement(partNum); + Connection connection = ObTableClientTestUtil.getConnection(); + PreparedStatement ps = connection.prepareStatement(sql); + ps.setString(1, tenantName); + ps.setString(2, databaseName); + ps.setString(3, tableName); + ResultSet rs = ps.executeQuery(); + + long tableId; + long partId; + String ip; + int port; + int role; + + boolean isEmpty = true; + while (rs.next()) { + if (isEmpty) { + isEmpty = false; + } + tableId = rs.getLong("TABLE_ID"); + partId = rs.getLong("PART_ID"); + ip = rs.getString("IP"); + port = rs.getInt("PORT"); + role = rs.getInt("ROLE"); + + Replica replica = new Replica(tableId, partId, ip, port, ObServerRole.getRole(role)); + + if (replica.getRole() == ObServerRole.LEADER) { + for (Partition partition : partitions) { + if (partition.getPartId() == partId) { + partition.setTableId(tableId); + partition.setLeader(replica); + partition.addReplicaNum(1); + } + } + } else { + for (Partition partition : partitions) { + if (partition.getPartId() == partId) { + partition.appendFollower(replica); + partition.addReplicaNum(1); + } + } + } + } + if (isEmpty) { + throw new RuntimeException("empty set when get partitions"); + } + } + + public void disableAutoReplicaSwitch() throws SQLException { + String sql = "alter system set enable_auto_leader_switch = 'false';"; + PreparedStatement ps = connection.prepareStatement(sql); + ps.executeQuery(); + ps.close(); + } + + public void switchLeader(List partitions) throws SQLException { + int partNum = partitions.size(); + for (Partition partition : partitions) { + if (partition.getFollower().size() != 2) { + throw new RuntimeException("invalid follower num: " + partition.getFollower().size()); + } + } + + // get a follower ip and port + String ip = partitions.get(0).getFollower().get(0).ip; + int port = partitions.get(0).getFollower().get(0).port; + String server = ip + ":" + port; + + // switch all replica leader to ip:port + for (Partition partition : partitions) { + String partIdStr = String.format("%d%%%d@%d", partition.getPartId(), partNum, partition.getTableId()); + String sql = String.format("ALTER SYSTEM SWITCH REPLICA LEADER PARTITION_ID '%s' SERVER '%s';", partIdStr, server); + PreparedStatement ps = connection.prepareStatement(sql); + ps.executeQuery(); + } + } + + public void switchReplicaLeaderRandomly(String tenantName, String databaseName, String tableName, int partNum) { + if (partNum != 2) { + String errMsg = "invalid partition num:" + partNum; + throw new RuntimeException(errMsg); + } + + try { + // 1.disable auto replica switch + disableAutoReplicaSwitch(); + + // 2.query replica + List partitions = new ArrayList<>(partNum); + for (int i = 0; i < partNum; ++i) { + partitions.add(new Partition(i)); + } + getPartitions(tenantName, databaseName, tableName, partNum, partitions); + + // 3.check replica + if (partitions.size() != partNum) { + String errMsg = "invalid partition num:" + partitions.size(); + throw new RuntimeException(errMsg); + } + + // 4 switch leader + switchLeader(partitions); + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public long getLsId(String databaseName, String tableName) throws SQLException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + ps = connection.prepareStatement(getLDIdSql); + ps.setString(1, databaseName); + ps.setString(2, tableName); + rs = ps.executeQuery(); + + long lsId = -1, prevLsId; + prevLsId = Long.MAX_VALUE; + boolean isEmpty = true; + while (rs.next()) { + if (isEmpty) { + isEmpty = false; + } + lsId = rs.getLong("LS_ID"); + + if (prevLsId == Long.MAX_VALUE) { + prevLsId = lsId; + } else { + if (lsId != prevLsId){ + return -1; + } + } + } + + if (isEmpty) { + return -1; + } + return lsId; + } finally { + try { + if (null != rs) { + rs.close(); + } + if (null != ps) { + ps.close(); + } + } catch (SQLException e) { + // ignore + } + } + } + + public String getFollowerAddr(long lsId) { + PreparedStatement ps = null; + ResultSet rs = null; + try { + ps = connection.prepareStatement(getFollowerAddrSql); + ps.setString(1, String.valueOf(lsId)); + rs = ps.executeQuery(); + String addr = ""; + boolean isEmpty = true; + + while (rs.next()) { + if (isEmpty) { + isEmpty = false; + } + addr = rs.getString("host"); + } + if (isEmpty) { + throw new RuntimeException("empty set when get follower addr"); + } + return addr; + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + try { + if (null != rs) { + rs.close(); + } + if (null != ps) { + ps.close(); + } + } catch (SQLException e) { + // ignore + } + } + } + + private void switch2Follower(long lsId, String addr) { + PreparedStatement ps = null; + try { + ps = connection.prepareStatement(switch2FollwerSql); + ps.setLong(1, lsId); + ps.setString(2, addr); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + try { + if (null != ps) { + ps.close(); + } + } catch (SQLException e) { + // ignore + } + } + } + + public void switchReplicaLeaderRandomly4x(String tenantName, String databaseName, String tableName) throws SQLException { + long lsId = getLsId(databaseName, tableName); + String addr = getFollowerAddr(lsId); + switch2Follower(lsId, addr); + } +}