Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,4 @@
</build>
</profile>
</profiles>
</project>
</project>
285 changes: 183 additions & 102 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
InvokeCallback invokeCallback) {
return new ObClientFuture(request.getId());
}


// schema changed
private boolean needFetchAll(int errorCode, int pcode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
import com.alipay.oceanbase.rpc.mutation.Row;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
Expand Down Expand Up @@ -58,7 +59,7 @@ public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
this.checkExists = check_exists;
}

public Object[] getRowKey() {
public Row getRowKey() {
return insUp.getRowKey();
}

Expand All @@ -85,15 +86,15 @@ public MutationResult execute() throws Exception {

TableQuery query = client.query(tableName);
query.setFilter(filter);
Object[] rowKey = getRowKey();
Row rowKey = getRowKey();
List<ObNewRange> ranges = new ArrayList<>();
ObNewRange range = new ObNewRange();
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey()));
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey()));
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey().getValues()));
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey().getValues()));
ranges.add(range);
query.getObTableQuery().setKeyRanges(ranges);
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
insUp.getRowKey(), insUp.getColumns(), insUp.getValues());
insUp.getRowKey().getValues(), insUp.getColumns(), insUp.getValues());

return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
}
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alipay.oceanbase.rpc.ObClusterTableBatchOps;
import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.constant.Constants;
import com.alipay.oceanbase.rpc.exception.*;
Expand Down Expand Up @@ -1312,8 +1313,29 @@ private static ObPartitionInfo parsePartitionInfo(ResultSet rs)
}

// set the property of first part and sub part
setPartDescProperty(info.getFirstPartDesc(), info.getPartColumns(), orderedPartedColumns1);
setPartDescProperty(info.getSubPartDesc(), info.getPartColumns(), orderedPartedColumns2);
List<ObColumn> firstPartColumns = new ArrayList<ObColumn>(), subPartColumns = new ArrayList<ObColumn>();
if (null != info.getFirstPartDesc()) {
for (String partColumnNames : info.getFirstPartDesc().getOrderedPartColumnNames()) {
for (ObColumn curColumn : info.getPartColumns()) {
if (curColumn.getColumnName().equalsIgnoreCase(partColumnNames)) {
firstPartColumns.add(curColumn);
break;
}
}
}
}
if (null != info.getSubPartDesc()) {
for (String partColumnNames : info.getSubPartDesc().getOrderedPartColumnNames()) {
for (ObColumn curColumn : info.getPartColumns()) {
if (curColumn.getColumnName().equalsIgnoreCase(partColumnNames)) {
subPartColumns.add(curColumn);
break;
}
}
}
}
setPartDescProperty(info.getFirstPartDesc(), firstPartColumns, orderedPartedColumns1);
setPartDescProperty(info.getSubPartDesc(), subPartColumns, orderedPartedColumns2);

return info;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package com.alipay.oceanbase.rpc.location.model.partition;

import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTablePartitionConsistentException;
import com.alipay.oceanbase.rpc.mutation.Row;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
import com.alipay.oceanbase.rpc.util.RandomUtil;
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
import com.alipay.oceanbase.rpc.mutation.Row;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -98,105 +101,49 @@ public void prepare() {
* Get part ids.
*/
@Override
public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] end,
public List<Long> getPartIds(Object startRowObj, boolean startInclusive, Object endRowObj,
boolean endInclusive) {
// close set
try {
// verify the type of parameters and convert to Row
if (!(startRowObj instanceof Row) || !(endRowObj instanceof Row)) {
throw new ObTableException("invalid format of rowObj: " + startRowObj + ", "
+ endRowObj);
}
Row startRow = (Row) startRowObj, endRow = (Row) endRowObj;
// pre-check start and end
// should remove after remove addRowkeyElement
if (start.length != end.length) {
if (startRow.size() != endRow.size()) {
throw new IllegalArgumentException("length of start key and end key is not equal");
}

if (startRow.size() == 1 && startRow.getValues()[0] instanceof ObObj && ((ObObj) startRow.getValues()[0]).isMinObj() &&
endRow.size() == 1 && endRow.getValues()[0] instanceof ObObj && ((ObObj) endRow.getValues()[0]).isMaxObj()) {
return completeWorks;
}

// check whether partition key is Min or Max, should refactor after remove addRowkeyElement
for (ObPair<ObColumn, List<Integer>> pair : orderedPartRefColumnRowKeyRelations) {
for (int refIdx : pair.getRight()) {
if (start.length <= refIdx) {
throw new IllegalArgumentException("rowkey length is " + start.length
for (ObColumn curObcolumn : partColumns) {
for (int refIdx = 0; refIdx < curObcolumn.getRefColumnNames().size(); ++refIdx) {
String curObRefColumnName = curObcolumn.getRefColumnNames().get(refIdx);
if (startRow.size() <= refIdx) {
throw new IllegalArgumentException("rowkey length is " + startRow.size()
+ ", which is shortest than " + refIdx);
}
if (start[refIdx] instanceof ObObj
&& (((ObObj) start[refIdx]).isMinObj() || ((ObObj) start[refIdx])
.isMaxObj())) {
return completeWorks;
Object startValue = startRow.get(curObRefColumnName);
if (startValue == null) {
throw new IllegalArgumentException("Please include all partition key in start range. Currently missing key: { " + curObRefColumnName + " }");
}
if (end[refIdx] instanceof ObObj
&& (((ObObj) end[refIdx]).isMinObj() || ((ObObj) end[refIdx]).isMaxObj())) {
if (startValue instanceof ObObj
&& (((ObObj) startValue).isMinObj() || ((ObObj) startValue).isMaxObj())) {
return completeWorks;
}
}
}

// eval partition key
List<Object> startValues = evalRowKeyValues(start);
Object startValue = startValues.get(0);
List<Object> endValues = evalRowKeyValues(end);
Object endValue = endValues.get(0);

Long startLongValue = ObObjType.parseToLongOrNull(startValue);
Long endLongValue = ObObjType.parseToLongOrNull(endValue);

if (startLongValue == null || endLongValue == null) {
throw new NumberFormatException("can not parseToComparable start value ["
+ startValue + "] or end value [" + endValue
+ "] to long");
}
long startHashValue = startLongValue - (startInclusive ? 0 : -1);
long endHashValue = endLongValue - (endInclusive ? 0 : 1);

if (endHashValue - startHashValue + 1 >= partNum) {
return completeWorks;
} else {
List<Long> partIds = new ArrayList<Long>();
for (long i = startHashValue; i <= endHashValue; i++) {
partIds.add(innerHash(i));
}
return partIds;
}
} catch (IllegalArgumentException e) {
logger.error(LCD.convert("01-00002"), e);
throw new IllegalArgumentException(
"ObHashPartDesc get part id come across illegal params", e);
}
}

@Override
public List<Long> getPartIds(List<String> scanRangeColumns, Object[] start, boolean startInclusive,
Object[] end, boolean endInclusive) throws IllegalArgumentException {
try {
if (start.length != end.length) {
throw new IllegalArgumentException("length of start key and end key in range is not equal, " +
"the start key: " + start + ", the end key: " + end);
}

if (start.length == 1 && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() &&
end.length == 1 && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) {
return completeWorks;
}

if (scanRangeColumns.size() != start.length) {
throw new IllegalArgumentException("length of start key in range and scan range columns is not equal," +
"the start key: " + start + ", the scan range columns: " + scanRangeColumns);
}

Row startRow = new Row();
Row endRow = new Row();
for (int i = 0; i < scanRangeColumns.size(); i++) {
startRow.add(scanRangeColumns.get(i), start[i]);
endRow.add(scanRangeColumns.get(i), end[i]);
}

// check whether partition key is Min or Max, should refactor after remove addRowkeyElement
for (ObColumn partColumn : partColumns) {
List<String> refColumns = partColumn.getRefColumnNames();
for (String column : refColumns) {
if (startRow.get(column) instanceof ObObj
&& (((ObObj) startRow.get(column)).isMinObj() || ((ObObj) startRow.get(column))
.isMaxObj())) {
return completeWorks;
Object endValue = endRow.get(curObRefColumnName);
if (endValue == null) {
throw new IllegalArgumentException("Please include all partition key in end range. Currently missing key: { " + curObRefColumnName + " }");
}
if (endRow.get(column) instanceof ObObj
&& (((ObObj) endRow.get(column)).isMinObj() || ((ObObj) endRow.get(column)).isMaxObj())) {
if (endValue instanceof ObObj
&& (((ObObj) endValue).isMinObj() || ((ObObj) endValue).isMaxObj())) {
return completeWorks;
}
}
Expand All @@ -213,8 +160,8 @@ public List<Long> getPartIds(List<String> scanRangeColumns, Object[] start, bool

if (startLongValue == null || endLongValue == null) {
throw new NumberFormatException("can not parseToComparable start value ["
+ startValue + "] or end value [" + endValue
+ "] to long");
+ startValue + "] or end value [" + endValue
+ "] to long");
}
long startHashValue = startLongValue - (startInclusive ? 0 : -1);
long endHashValue = endLongValue - (endInclusive ? 0 : 1);
Expand All @@ -231,7 +178,7 @@ public List<Long> getPartIds(List<String> scanRangeColumns, Object[] start, bool
} catch (IllegalArgumentException e) {
logger.error(LCD.convert("01-00002"), e);
throw new IllegalArgumentException(
"ObHashPartDesc get part id come across illegal params", e);
"ObHashPartDesc get part id come across illegal params", e);
}
}

Expand All @@ -247,26 +194,30 @@ public Long getRandomPartId() {
* Get part id.
*/
@Override
public Long getPartId(Object... rowKey) {
List<Object[]> rowKeys = new ArrayList<Object[]>();
rowKeys.add(rowKey);
return this.getPartId(rowKeys, false);
public Long getPartId(Object... row) {
List<Object> rows = new ArrayList<Object>();
rows.addAll(Arrays.asList(row));
return this.getPartId(rows, false);
}

/*
* Get part id.
*/
@Override
public Long getPartId(List<Object[]> rowKeys, boolean consistency) {
public Long getPartId(List<Object> rows, boolean consistency) {

if (rowKeys == null || rowKeys.size() == 0) {
throw new IllegalArgumentException("invalid row keys :" + rowKeys);
if (rows == null || rows.size() == 0) {
throw new IllegalArgumentException("invalid row keys :" + rows);
}

Long partId = null;
try {
for (Object[] rowKey : rowKeys) {
List<Object> evalValues = evalRowKeyValues(rowKey);
for (Object rowObj : rows) {
if (!(rowObj instanceof Row)) {
throw new ObTableException("invalid format of rowObj: " + rowObj);
}
Row row = (Row) rowObj;
List<Object> evalValues = evalRowKeyValues(row);
Object value = evalValues.get(0);// the partition type of hash has one param at most
Long longValue = ObObjType.parseToLongOrNull(value);

Expand All @@ -285,7 +236,7 @@ public Long getPartId(List<Object[]> rowKeys, boolean consistency) {

if (!partId.equals(currentPartId)) {
throw new ObTablePartitionConsistentException(
"across partition operation may cause consistent problem " + rowKeys);
"across partition operation may cause consistent problem " + rows);
}
}
} catch (IllegalArgumentException e) {
Expand Down
Loading