From 839e58fded11318aca58737a4312e202ecf47df6 Mon Sep 17 00:00:00 2001 From: maochongxin Date: Wed, 14 May 2025 21:00:52 +0800 Subject: [PATCH] MetaExecutor interface definition and demo --- .../alipay/oceanbase/rpc/ObTableClient.java | 12 +++ .../rpc/bolt/protocol/ObTablePacketCode.java | 12 +++ .../rpc/meta/ObTableMetaRequest.java | 86 +++++++++++++++++++ .../rpc/meta/ObTableMetaResponse.java | 64 ++++++++++++++ .../rpc/meta/ObTableRpcMetaType.java | 40 +++++++++ .../rpc/protocol/payload/Pcodes.java | 1 + 6 files changed, 215 insertions(+) create mode 100644 src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaRequest.java create mode 100644 src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaResponse.java create mode 100644 src/main/java/com/alipay/oceanbase/rpc/meta/ObTableRpcMetaType.java diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 57e9cf98..db7ca5f0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -48,6 +48,7 @@ import com.alipay.remoting.util.StringUtils; import org.slf4j.Logger; +import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -1112,6 +1113,17 @@ public ObTable getTable(ObTableApiMove moveResponse) throws Exception { // If the node address does not exist, a new table is created return addTable(addr); } + + public ObTable getRandomTable() { + ObTable anyTable; + if (odpMode) { + anyTable = tableRoute.getOdpTable(); + } else { + ConcurrentHashMap tableRoster = tableRoute.getTableRoster().getTables(); + anyTable = tableRoster.values().stream().findAny().orElse(null); + } + return anyTable; + } public ObTable addTable(ObServerAddr addr){ diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java index 00b334a9..b4353cf7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java @@ -18,6 +18,8 @@ package com.alipay.oceanbase.rpc.bolt.protocol; import com.alipay.oceanbase.rpc.exception.ObTableRoutingWrongException; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; @@ -33,6 +35,8 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult; import com.alipay.remoting.CommandCode; +import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_META_INFO_EXECUTE; + public enum ObTablePacketCode implements CommandCode { OB_TABLE_API_LOGIN(Pcodes.OB_TABLE_API_LOGIN) { @@ -133,6 +137,12 @@ public ObPayload newPayload(ObRpcPacketHeader header) { public ObPayload newPayload(ObRpcPacketHeader header) { throw new IllegalArgumentException("OB_ERROR_PACKET has no payload implementation"); } + }, + OB_TABLE_META_INFO_EXECUTE(Pcodes.OB_TABLE_API_META_INFO_EXECUTE) { + @Override + public ObPayload newPayload(ObRpcPacketHeader header) { + return new ObTableMetaResponse(); + } }; private short value; @@ -175,6 +185,8 @@ public static ObTablePacketCode valueOf(short value) { return OB_TABLE_API_MOVE; case Pcodes.OB_ERROR_PACKET: return OB_ERROR_PACKET; + case OB_TABLE_API_META_INFO_EXECUTE: + return OB_TABLE_META_INFO_EXECUTE; } throw new IllegalArgumentException("Unknown Rpc command code value ," + value); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaRequest.java b/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaRequest.java new file mode 100644 index 00000000..f83f2676 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaRequest.java @@ -0,0 +1,86 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2025 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.meta; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Credentialable; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.util.ObBytesString; +import com.alipay.oceanbase.rpc.util.Serialization; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +public class ObTableMetaRequest extends AbstractPayload implements Credentialable { + private ObBytesString credential; + private ObTableRpcMetaType metaType; + private String data; + + @Override + public void setCredential(ObBytesString credential) { + this.credential = credential; + } + + @Override + public int getPcode() { + return Pcodes.OB_TABLE_API_META_INFO_EXECUTE; + } + + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); + System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, + idx, headerLen); + idx += headerLen; + int len = Serialization.getNeedBytes(credential); + System.arraycopy(Serialization.encodeBytesString(credential), 0, bytes, idx, len); + idx += len; + len = Serialization.getNeedBytes(metaType.getType()); + System.arraycopy(Serialization.encodeI8((short)metaType.getType()), 0, bytes, idx, len); + idx += len; + len = Serialization.getNeedBytes(data); + System.arraycopy(Serialization.encodeVString(data), 0, bytes, idx, len); + return bytes; + } + + @Override + public long getPayloadContentSize() { + return Serialization.getNeedBytes(credential) + + Serialization.getNeedBytes(metaType.getType()) + Serialization.getNeedBytes(data); + } + + public void setMetaType(ObTableRpcMetaType metaType) { + this.metaType = metaType; + } + + public ObTableRpcMetaType getMetaType() { + return metaType; + } + + public void setData(String data) { + this.data = data; + } + + public String getData() { + return data; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaResponse.java b/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaResponse.java new file mode 100644 index 00000000..053992f3 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableMetaResponse.java @@ -0,0 +1,64 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2025 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.meta; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableResult; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +public class ObTableMetaResponse extends AbstractPayload { + private ObTableRpcMetaType metaType; // 元信息类型 + private final ObTableResult header = new ObTableResult(); + private String data; // 服务端拿到的分片的元数据, json字符串 + + @Override + public int getPcode() { + return Pcodes.OB_TABLE_API_META_INFO_EXECUTE; + } + + @Override + public byte[] encode() { + return null; + } + + @Override + public Object decode(ByteBuf buf) { + super.decode(buf); + // 1. decode ObTableResult + header.decode(buf); + // 2. decode itself + data = Serialization.decodeVString(buf); + + return this; + } + + @Override + public long getPayloadContentSize() { + return 0; + } + + public String getData() { + return data; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableRpcMetaType.java b/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableRpcMetaType.java new file mode 100644 index 00000000..3dad573d --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/meta/ObTableRpcMetaType.java @@ -0,0 +1,40 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2025 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.meta; + +// define rpc meta type enum +public enum ObTableRpcMetaType { + INVALID(0), TABLE_PARTITION_INFO(1), // 分区信息, 用于路由刷新 + HTABLE_REGION_LOCATOR(2), // 分区上下界 + HTABLE_REGION_METRICS(3), // 分区统计信息 + HTABLE_CREATE_TABLE(4), // 建表 + HTABLE_DELETE_TABLE(5), // 删表 + HTABLE_TRUNCATE_TABLE(6), // 清空表 + HTABLE_EXISTS(7), // 检查表是否存在 + HTABLE_GET_DESC(8), // 获取表元信息 + HTABLE_META_MAX(255); + private int type; + + ObTableRpcMetaType(int type) { + this.type = type; + } + + public int getType() { + return type; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java index aaeb3395..5f2b6454 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java @@ -34,4 +34,5 @@ public interface Pcodes { int OB_TABLE_API_DIRECT_LOAD = 0x1123; int OB_TABLE_API_MOVE = 0x1124; int OB_TABLE_API_LS_EXECUTE = 0x1125; + int OB_TABLE_API_META_INFO_EXECUTE = 0x1128; }