Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8490fee
fix batch retry
maochongxin Jul 31, 2024
8bb9560
draft LSBatchImpl
maochongxin Aug 5, 2024
7765ad1
retry LSBatchImpl
maochongxin Aug 5, 2024
2eb9a4c
add error code for rerouting-exception
maochongxin Aug 7, 2024
de8f71f
remove unused debug log
maochongxin Aug 7, 2024
e2649e0
query refresh partitionTables
maochongxin Aug 11, 2024
f0de20d
fix(scan): prevent infinite retries on illegal partitions during splits
maochongxin Aug 12, 2024
c48dadb
query refresh partitionTables
maochongxin Aug 12, 2024
8e0f120
fix scan_to_next_partition_test route error; fix scan_during_split_te…
maochongxin Aug 15, 2024
8288ba1
fix getScanner error during split
maochongxin Aug 17, 2024
6bd88f6
remove unused log
maochongxin Aug 19, 2024
ecf8cc6
Optimize SQL for refreshing table location information
maochongxin Sep 19, 2024
a82d5d0
partical refresh
maochongxin Sep 24, 2024
6874323
Fix infinite loop caused by removed reference
maochongxin Oct 10, 2024
1d46a86
fix lsop retry fail
maochongxin Oct 14, 2024
8d60c55
Remove unnecessary comments and format code
maochongxin Oct 15, 2024
22eeb36
Fix frequent refresh lock failures due to short refresh interval
maochongxin Oct 18, 2024
73ef9aa
Fix frequent refresh lock failures due to short refresh interval
maochongxin Oct 18, 2024
a46e2b0
add result code -4723
maochongxin Oct 18, 2024
fb03d26
fix review
maochongxin Oct 23, 2024
e5762f4
add -4138
maochongxin Oct 23, 2024
4c630ee
fix review: add ut for byteutil
maochongxin Oct 25, 2024
64926a4
fix lsop refresh location
maochongxin Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 189 additions & 81 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -122,31 +122,32 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchAllException(errMessage);
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
} else if (needFetchPartial(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage);
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
// Encountered an unexpected RoutingWrong error code,
// possibly due to the client error code version being behind the observer's version.
// Attempting a full refresh here
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
logger.warn("get unexpected error code: {}", response.getMessage());
throw new ObTableNeedFetchAllException(errMessage);
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
}
}
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
if (resultCode.getRcode() != 0
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchAllException(errMessage);
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
} else if (needFetchPartial(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage);
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
.getObTable().getPort(), response.getHeader().getTraceId1(), response
Expand Down Expand Up @@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
}

Expand Down
399 changes: 283 additions & 116 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

import static com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -53,7 +54,9 @@ public class TableEntry {
// partition location
private TableEntryKey tableEntryKey = null;
private volatile ObPartitionEntry partitionEntry = null;


public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();

/*
* Is valid.
*/
Expand Down Expand Up @@ -218,8 +221,6 @@ public void prepare() throws IllegalArgumentException {
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
+ tableEntryKey);
partitionInfo.prepare();
checkArgument(partitionEntry != null,
"partition table partition entry is not ready. key" + tableEntryKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class ObPartitionEntry {
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();

// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
private Map<Long, Long> tabletLsIdMap = new HashMap<>();

// tabelt id -> (PartitionLocation, LsId)
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();


public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
}

public Map<Long, ObPartitionLocation> getPartitionLocation() {
return partitionLocation;
}
Expand All @@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
this.partitionLocation = partitionLocation;
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }

/*
* Get partition location with part id.
*/
Expand Down Expand Up @@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.location.model.partition;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;

public class ObPartitionLocationInfo {
private ObPartitionLocation partitionLocation = null;
private Long tabletLsId = OB_INVALID_ID;
private Long lastUpdateTime = 0L;
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public AtomicBoolean initialized = new AtomicBoolean(false);
public final CountDownLatch initializationLatch = new CountDownLatch(1);

public ObPartitionLocation getPartitionLocation() {
rwLock.readLock().lock();
try {
return partitionLocation;
} finally {
rwLock.readLock().unlock();
}
}

public void updateLocation(ObPartitionLocation newLocation) {
this.partitionLocation = newLocation;
this.lastUpdateTime = System.currentTimeMillis();
}

public Long getTabletLsId() {
return tabletLsId;
}

public void setTabletLsId(Long tabletLsId) {
this.tabletLsId = tabletLsId;
}

public Long getLastUpdateTime() {
rwLock.readLock().lock();
try {
return lastUpdateTime;
} finally {
rwLock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public ObRangePartDesc() {
public List<ObObjType> getOrderedCompareColumnTypes() {
return orderedCompareColumnTypes;
}
private List<Long> completeWorks;

private List<Long> completeWorks;

/*
* Set ordered compare column types.
Expand Down Expand Up @@ -299,20 +300,20 @@ public int getBoundsIdx(boolean isScan, Row rowKey) {
try {
List<Object> evalParams = evalRowKeyValues(rowKey);
List<Comparable> comparableElement = super.initComparableElementByTypes(evalParams,
this.orderedCompareColumns);
this.orderedCompareColumns);
ObPartitionKey searchKey = ObPartitionKey.getInstance(orderedCompareColumns,
comparableElement);
comparableElement);

int pos = upperBound(this.bounds, new ObComparableKV<ObPartitionKey, Long>(searchKey,
(long) -1));
(long) -1));
if (pos >= this.bounds.size()) {
if (isScan) {
// if range is bigger than rangeMax while scanning
// we just scan until last range
return this.bounds.size() - 1;
}
throw new ArrayIndexOutOfBoundsException("Table has no partition for value in "
+ this.getPartExpr());
+ this.getPartExpr());
} else {
return pos;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public enum Property {
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),

// [ObTable][OTHERS]
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),

/*
* other config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ public enum ResultCodes {
OB_CLUSTER_NO_MATCH(-4666), //
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
OB_ERR_ZONE_NOT_EMPTY(-4668), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
OB_LS_NOT_EXIST(-4719), //
OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
OB_TABLET_NOT_EXIST(-4725), //
OB_ERR_PARSER_INIT(-5000), //
OB_ERR_PARSE_SQL(-5001), //
Expand Down
Loading