Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,22 @@ public boolean hasNext() {
endOfScan = true;
return false;
}
// continue when cache is empty but not null
while (currentCache != null && currentCache.isEmpty()) {
if (cacheLoadFails()) {
return false;
}
}
return notEndOfScan();
}

private Kvrpcpb.KvPair getCurrent() {
if (isCacheDrained()) {
return null;
}
--limit;
return currentCache.get(index++);
}

@Override
public Kvrpcpb.KvPair next() {
Kvrpcpb.KvPair kv;
// continue when cache is empty but not null
for (kv = getCurrent(); currentCache != null && kv == null; kv = getCurrent()) {
if (cacheLoadFails()) {
return null;
}
}
return kv;
return getCurrent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
int limit,
boolean keyOnly) {
this.startKey = requireNonNull(startKey, "start key is null");
if (startKey.isEmpty()) {
throw new IllegalArgumentException("start key cannot be empty");
}
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.equals(ByteString.EMPTY);
this.hasEndKey = !endKey.isEmpty();
this.limit = limit;
this.keyOnly = keyOnly;
this.conf = conf;
Expand All @@ -74,7 +71,7 @@ boolean cacheLoadFails() {
if (endOfScan || processingLastBatch) {
return true;
}
if (startKey == null || startKey.isEmpty()) {
if (startKey == null) {
return true;
}
try {
Expand Down Expand Up @@ -107,7 +104,8 @@ boolean cacheLoadFails() {
startKey = lastKey.next().toByteString();
}
// notify last batch if lastKey is greater than or equal to endKey
if (hasEndKey && lastKey.compareTo(endKey) >= 0) {
// if startKey is empty, it indicates +∞
if (hasEndKey && lastKey.compareTo(endKey) >= 0 || startKey.isEmpty()) {
processingLastBatch = true;
startKey = null;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,9 @@ private static Map<ByteString, ByteString> mapKeysToValues(
private List<TiRegion> fetchRegionsFromRange(
BackOffer backOffer, ByteString startKey, ByteString endKey) {
List<TiRegion> regions = new ArrayList<>();
while (startKey.isEmpty() || Key.toRawKey(startKey).compareTo(Key.toRawKey(endKey)) < 0) {
while (startKey.isEmpty()
|| endKey.isEmpty()
|| Key.toRawKey(startKey).compareTo(Key.toRawKey(endKey)) < 0) {
TiRegion currentRegion = clientBuilder.getRegionManager().getRegionByKey(startKey, backOffer);
regions.add(currentRegion);
startKey = currentRegion.getEndKey();
Expand Down
94 changes: 61 additions & 33 deletions src/test/java/org/tikv/raw/RawKVClientTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tikv.raw;

import static org.junit.Assert.*;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.*;
Expand All @@ -8,6 +10,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,7 +94,7 @@ public void tearDown() throws Exception {
}
}

@Test
@Ignore
public void atomicAPITest() {
if (!initialized) return;
long ttl = 10;
Expand All @@ -100,19 +103,19 @@ public void atomicAPITest() {
ByteString value2 = ByteString.copyFromUtf8("value2");
client.delete(key);
ByteString res1 = client.putIfAbsent(key, value, ttl);
assert res1.isEmpty();
assertTrue(res1.isEmpty());
ByteString res2 = client.putIfAbsent(key, value2, ttl);
assert res2.equals(value);
assertEquals(value, res2);
try {
Thread.sleep(ttl * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ByteString res3 = client.putIfAbsent(key, value, ttl);
assert res3.isEmpty();
assertTrue(res3.isEmpty());
}

@Test
@Ignore
public void getKeyTTLTest() {
if (!initialized) return;
long ttl = 10;
Expand Down Expand Up @@ -254,30 +257,47 @@ public void deleteRangeTest() {
public void simpleTest() {
if (!initialized) return;
ByteString key = rawKey("key");
ByteString key0 = rawKey("key0");
ByteString key1 = rawKey("key1");
ByteString key2 = rawKey("key2");
ByteString key3 = rawKey("key3");
ByteString value = rawValue("value");
ByteString value1 = rawValue("value1");
ByteString value2 = rawValue("value2");
ByteString value3 = rawValue("value3");
Kvrpcpb.KvPair kv = Kvrpcpb.KvPair.newBuilder().setKey(key).setValue(value).build();
Kvrpcpb.KvPair kv1 = Kvrpcpb.KvPair.newBuilder().setKey(key1).setValue(value1).build();
Kvrpcpb.KvPair kv2 = Kvrpcpb.KvPair.newBuilder().setKey(key2).setValue(value2).build();
Kvrpcpb.KvPair kv3 = Kvrpcpb.KvPair.newBuilder().setKey(key3).setValue(value3).build();

try {
checkEmpty(key1);
checkEmpty(key2);
checkPut(key1, value1);
checkPut(key2, value2);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
List<Kvrpcpb.KvPair> result2 = new ArrayList<>();
result.add(kv1);
result.add(kv2);
checkScan(key, key3, result, limit);
checkScan(key1, key3, result, limit);
checkScan(key, key1, new ArrayList<>(), limit);
result2.add(kv1);
checkScan(key, key2, result2, limit);
checkDeleteRange(ByteString.EMPTY, ByteString.EMPTY);
checkEmpty(kv);
checkEmpty(kv1);
checkEmpty(kv2);
checkEmpty(kv3);
checkPut(kv);
checkPut(kv1);
checkPut(kv2);
checkPut(kv3);
// <key, value>, <key1,value1>, <key2,value2>, <key3,value3>
// (-∞, +∞)
checkScan(ByteString.EMPTY, ByteString.EMPTY, Arrays.asList(kv, kv1, kv2, kv3), limit);
// (-∞, key3)
checkScan(ByteString.EMPTY, key3, Arrays.asList(kv, kv1, kv2), limit);
// [key1, +∞)
checkScan(key1, ByteString.EMPTY, Arrays.asList(kv1, kv2, kv3), limit);
// [key, key3)
checkScan(key, key3, Arrays.asList(kv, kv1, kv2), limit);
// [key1, key3)
checkScan(key1, key3, Arrays.asList(kv1, kv2), limit);
// [key0, key1)
checkScan(key0, key1, new ArrayList<>(), limit);
// [key, key2)
checkScan(key, key2, Arrays.asList(kv, kv1), limit);
checkDelete(key1);
checkDelete(key2);
checkDeleteRange(ByteString.EMPTY, ByteString.EMPTY);
} catch (final TiKVException e) {
logger.warn("Test fails with Exception: " + e);
}
Expand Down Expand Up @@ -509,7 +529,7 @@ private void rawGetTest(int getCases, boolean benchmark) {
} else {
int i = 0;
for (Map.Entry<ByteString, ByteString> pair : data.entrySet()) {
assert client.get(pair.getKey()).equals(pair.getValue());
assertEquals(pair.getValue(), client.get(pair.getKey()));
i++;
if (i >= getCases) {
break;
Expand Down Expand Up @@ -756,27 +776,31 @@ private void rawTTLTest(int cases, long ttl, boolean benchmark) {
private void checkBatchGet(List<ByteString> keys) {
List<Kvrpcpb.KvPair> result = client.batchGet(keys);
for (Kvrpcpb.KvPair kvPair : result) {
assert data.containsKey(kvPair.getKey());
assert kvPair.getValue().equals(data.get(kvPair.getKey()));
assertTrue(data.containsKey(kvPair.getKey()));
assertEquals(data.get(kvPair.getKey()), kvPair.getValue());
}
}

private void checkPut(Kvrpcpb.KvPair kv) {
checkPut(kv.getKey(), kv.getValue());
}

private void checkPut(ByteString key, ByteString value) {
client.put(key, value);
assert client.get(key).equals(value);
assertEquals(value, client.get(key));
}

private void checkBatchPut(Map<ByteString, ByteString> kvPairs) {
client.batchPut(kvPairs);
for (Map.Entry<ByteString, ByteString> kvPair : kvPairs.entrySet()) {
assert client.get(kvPair.getKey()).equals(kvPair.getValue());
assertEquals(kvPair.getValue(), client.get(kvPair.getKey()));
}
}

private void checkScan(
ByteString startKey, ByteString endKey, List<Kvrpcpb.KvPair> ans, int limit) {
ByteString startKey, ByteString endKey, List<Kvrpcpb.KvPair> expected, int limit) {
List<Kvrpcpb.KvPair> result = client.scan(startKey, endKey, limit);
assert result.equals(ans);
assertEquals(expected, result);
}

private void checkScan(
Expand Down Expand Up @@ -812,7 +836,7 @@ private void checkBatchScan(List<ScanOption> scanOptions) {
.setValue(kvPair.getValue())
.build())
.collect(Collectors.toList());
assert result.get(i).equals(partialResult);
assertEquals(partialResult, result.get(i));
i++;
}
}
Expand All @@ -827,31 +851,35 @@ private void checkDeleteRange(ByteString startKey, ByteString endKey) {
logger.info("delete range complete");
List<Kvrpcpb.KvPair> result = client.scan(startKey, endKey);
logger.info("checking scan complete. number of remaining keys in range: " + result.size());
assert result.isEmpty();
assertTrue(result.isEmpty());
}

private void checkPutTTL(ByteString key, ByteString value, long ttl) {
client.put(key, value, ttl);
assert client.get(key).equals(value);
assertEquals(value, client.get(key));
}

private void checkGetKeyTTL(ByteString key, long ttl) {
Long t = client.getKeyTTL(key);
assert t != null;
assert t <= ttl && t > 0;
assertNotNull(t);
assertTrue(t <= ttl && t > 0);
}

private void checkGetTTLTimeOut(ByteString key) {
assert client.get(key).isEmpty();
assertTrue(client.get(key).isEmpty());
}

private void checkGetKeyTTLTimeOut(ByteString key) {
Long t = client.getKeyTTL(key);
assert t == null;
assertNull(t);
}

private void checkEmpty(Kvrpcpb.KvPair kv) {
checkEmpty(kv.getKey());
}

private void checkEmpty(ByteString key) {
assert client.get(key).isEmpty();
assertTrue(client.get(key).isEmpty());
}

private static ByteString rawKey(String key) {
Expand Down