Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
return tableClientQuery.setMaxResultSize(maxResultSize);
}

@Override
public TableQuery setOperationTimeout(long operationTimeout) {
tableClientQuery.setOperationTimeout(operationTimeout);
return this;
}

/**
* Clear.
*/
Expand Down
122 changes: 99 additions & 23 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
Expand Down Expand Up @@ -111,9 +112,7 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
*/
@Override
public ObPayload newPayload(ObRpcPacketHeader header) {
throw new ObTableRoutingWrongException(
"Receive rerouting response packet. "
+ "Java client is not supported and need to Refresh table router entry");
return new ObTableApiMove();
}
}, //
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
Expand Down Expand Up @@ -161,9 +160,7 @@ public static ObTablePacketCode valueOf(short value) {
case Pcodes.OB_TABLE_API_LS_EXECUTE:
return OB_TABLE_API_LS_EXECUTE;
case Pcodes.OB_TABLE_API_MOVE:
throw new ObTableRoutingWrongException(
"Receive rerouting response packet. "
+ "Java client is not supported and need to Refresh table router entry");
return OB_TABLE_API_MOVE;
case Pcodes.OB_ERROR_PACKET:
return OB_ERROR_PACKET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (response.getHeader().isRoutingWrong()) {
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand All @@ -139,7 +139,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
throw new ObTableNeedFetchAllException(errMessage);
}
}
if (resultCode.getRcode() != 0) {
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand Down Expand Up @@ -186,7 +186,6 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
InvokeCallback invokeCallback) {
return new ObClientFuture(request.getId());
}


// schema changed
private boolean needFetchAll(int errorCode, int pcode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.alipay.oceanbase.rpc.location.model;

import java.util.Objects;

/**
* ObReplicaType(副本类型)
*
Expand Down Expand Up @@ -72,6 +74,10 @@ static public ObReplicaType getReplicaType(int idx) {
}
}

public int getIndex() {
return this.index;
}

/*
* whether the replica is readable.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2021 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/
package com.alipay.oceanbase.rpc.location.model;

import com.alipay.oceanbase.rpc.ObTableClient;


import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import org.slf4j.Logger;

import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.getLogger;

public class RouteTableRefresher extends Thread{

private static final Logger logger = getLogger(RouteTableRefresher.class);

private volatile AtomicBoolean isFinished = new AtomicBoolean(false); // Thread end flag

private final Semaphore semaphore = new Semaphore(0);

private volatile ConcurrentLinkedQueue<ObPair<String, Boolean>> refreshTableTasks; // Task refresh queue

ObTableClient client;

private final Lock lock = new ReentrantLock(); // Ensure the atomicity of the AddIfAbsent operation.

public RouteTableRefresher(ObTableClient client){
this.client = client;
}

public void finish() {
isFinished.set(true);
}

@Override
public void run() {
refreshTableTasks = new ConcurrentLinkedQueue<>();
while (!isFinished.get()) {
try {
semaphore.acquire(); // A semaphore is associated with a task; it ensures that only one task is processed at a time.
logger.info("Thread name {}, id{} acquire semaphore, begin execute route refresher", currentThread().getName(), currentThread().getId());
} catch (InterruptedException e) {
logger.info("Thread name {}, id {} is interrupted", currentThread().getName(), currentThread().getId());
}
ObPair<String, Boolean> refreshTableTask = refreshTableTasks.peek();
if (refreshTableTask != null && refreshTableTask.getRight()) {
String tableName = refreshTableTask.getLeft();
try {
logger.info("backgroundRefreshTableTask run refresh, table name {}", tableName);
TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, true, false, false);
client.getTableLocations().put(refreshTableTask.getLeft(), tableEntry);
} catch (Exception e) {
String message = "RefreshTableBackground run meet exception" + e.getMessage();
logger.warn(message);
}
}
refreshTableTasks.poll();
}
}

public void addTableIfAbsent(String tableName, Boolean isRefreshing){
lock.lock();
if (!refreshTableTasks.contains(new ObPair<>(tableName, isRefreshing))) {
logger.info("add table {}, is refreshing {} to refresh task.", tableName, isRefreshing);
refreshTableTasks.add(new ObPair<>(tableName,isRefreshing));
}
lock.unlock();
}

public void triggerRefreshTable() {
semaphore.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public long parseToLong(String key, long defaultV) {
}
}

public boolean parseToBoolean(String key) {
public boolean parseToBoolean(String key) throws Exception {
if (System.getProperty(OB_TABLE_CLIENT_PREFIX + key) == null && getProperty(key) == null){
throw new Exception();
}
return Boolean.parseBoolean(System.getProperty(OB_TABLE_CLIENT_PREFIX + key,
getProperty(key)));
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/property/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public enum Property {
RUNTIME_BATCH_MAX_WAIT("runtime.batch.max.wait", 3000L, "批量执行请求的超时时间"),

// [ObTableClient][LOG]
SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", 10L, "记录到 MONITOR 日志中的慢操作的运行时间阈值"),
SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", -1L, "记录到 MONITOR 日志中的慢操作的运行时间阈值"),

/*
* property in [`ObTable`]
Expand Down Expand Up @@ -134,7 +134,7 @@ public enum Property {
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),

// [ObTable][OTHERS]
SERVER_ENABLE_REROUTING("server.enable.rerouting", "false", "开启server端的重定向回复功能"),
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),

/*
* other config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;

import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;

public class ObAddr extends AbstractPayload {

private ObAddrVersion obAddrVersion;

private byte[] ip;
private int port;

public ObAddr(){
this.obAddrVersion = ObAddrVersion.ObAddrIPV4;
this.ip = new byte[16];
this.port = 0;
}

public String ipToString() {
if (isIPv4()){
return getIPv4().getHostAddress();
}
return getIPv6().getHostAddress();
}

public InetAddress getIPv4(){
if (isIPv6()){
return null;
}
try {
return InetAddress.getByAddress(Arrays.copyOf(ip, 4));
} catch (UnknownHostException e){
// 需要查看对应的错误吗进行处理
}
return null;
}

public InetAddress getIPv6() {
if (isIPv6()){
return null;
}
try {
return InetAddress.getByAddress(ip);
} catch (UnknownHostException e){
// 需要查看对应的错误吗进行处理
}
return null;
}

public boolean isIPv4() {
return obAddrVersion == ObAddrVersion.ObAddrIPV4;
}

public boolean isIPv6() {
return obAddrVersion == ObAddrVersion.ObAddrIPV6;
}

public int getPort(){
return port;
}

@Override
public long getPayloadContentSize() {
return 0;
}

@Override
public byte[] encode() {
byte[] bytes = new byte[(int) getPayloadSize()];
int idx = 0;

// 0. encode header
idx = encodeHeader(bytes, idx);
System.arraycopy(Serialization.encodeI8(obAddrVersion.getValue()), 0, bytes, idx, Serialization.getNeedBytes(obAddrVersion.getValue()));

ByteBuffer buffer = ByteBuffer.wrap(ip).order(ByteOrder.BIG_ENDIAN);
int ip1 = buffer.getInt(0);
int ip2 = buffer.getInt(4);
int ip3 = buffer.getInt(8);
int ip4 = buffer.getInt(12);
System.arraycopy(Serialization.encodeVi32(ip1), 0, bytes, idx, Serialization.getNeedBytes(ip1));
System.arraycopy(Serialization.encodeVi32(ip2), 0, bytes, idx, Serialization.getNeedBytes(ip2));
System.arraycopy(Serialization.encodeVi32(ip3), 0, bytes, idx, Serialization.getNeedBytes(ip3));
System.arraycopy(Serialization.encodeVi32(ip4), 0, bytes, idx, Serialization.getNeedBytes(ip4));

System.arraycopy(Serialization.encodeVi32(port), 0, bytes, idx, Serialization.getNeedBytes(port));
return bytes;
}

@Override
public ObAddr decode(ByteBuf buf) {
super.decode(buf);
this.obAddrVersion = ObAddrVersion.fromValue(Serialization.decodeI8(buf));
//decode ip addr

int ip1, ip2, ip3, ip4;
ip1 = Serialization.decodeVi32(buf);
ip2 = Serialization.decodeVi32(buf);
ip3 = Serialization.decodeVi32(buf);
ip4 = Serialization.decodeVi32(buf);
ByteBuffer ipBuffer = ByteBuffer.wrap(ip).order(ByteOrder.BIG_ENDIAN);
ipBuffer.putInt(0, ip1);
ipBuffer.putInt(4, ip2);
ipBuffer.putInt(8, ip3);
ipBuffer.putInt(12, ip4);

this.port = Serialization.decodeVi32(buf);

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;

public enum ObAddrVersion {
ObAddrIPV4((byte) 4),
ObAddrIPV6((byte) 6);

private final byte value;

ObAddrVersion(byte value) {
this.value = value;
}

public byte getValue() {
return value;
}

public static ObAddrVersion fromValue(int value){
for (ObAddrVersion obAddrVersion : values()) {
if (obAddrVersion.value == value) {
return obAddrVersion;
}
}
return ObAddrIPV4; //默认使用IPV4, 或者抛异常。
}
}
Loading