Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ default WindowStoreIterator<V> backwardFetch(K key, Instant timeFrom, Instant ti
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
* A null value indicates a starting position from the first element in the store.
* @param keyTo the last key in the range
* A null value indicates that the range ends with the last element in the store.
* @param timeFrom time range start (inclusive), where iteration starts.
* @param timeTo time range end (inclusive), where iteration ends.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
Expand All @@ -146,12 +147,13 @@ KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Ins
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
* A null value indicates a starting position from the first element in the store.
* @param keyTo the last key in the range
* A null value indicates that the range ends with the last element in the store.
* @param timeFrom time range start (inclusive), where iteration ends.
* @param timeTo time range end (inclusive), where iteration starts.
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from end to beginning of time.
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
default KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ default WindowStoreIterator<V> backwardFetch(final K key,
* This iterator must be closed after use.
*
* @param keyFrom the first key in the range
* A null value indicates a starting position from the first element in the store.
* @param keyTo the last key in the range
* A null value indicates that the range ends with the last element in the store.
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if one of the given keys is {@code null}
*/
// WindowStore keeps a long-based implementation of ReadOnlyWindowStore#fetch Instant-based
// if super#fetch is removed, keep this implementation as it serves PAPI Stores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final long from,
final long to,
final boolean forward) {
if (keyFrom.compareTo(keyTo) > 0) {
if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
Expand All @@ -124,8 +124,8 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,

final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);

final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
final Bytes binaryTo = keySchema.upperRange(keyTo, to);
final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);
Comment thread
showuon marked this conversation as resolved.
Outdated

return new SegmentIterator<>(
searchSpace.iterator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
if (keyFrom.compareTo(keyTo) > 0) {
if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
Expand All @@ -266,8 +266,8 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, true) :
context.cache().range(
cacheName,
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
);

final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
Expand All @@ -289,7 +289,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFr
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
if (keyFrom.compareTo(keyTo) > 0) {
if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
Expand All @@ -310,8 +310,8 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFr
new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) :
context.cache().reverseRange(
cacheName,
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
);

final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
Expand Down Expand Up @@ -573,12 +573,14 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang
throw new IllegalStateException("Error iterating over segments: segment interval has changed");
}

if (keyFrom.equals(keyTo)) {
if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) {
cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
} else {
cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
cacheKeyFrom = keyFrom == null ? null :
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
cacheKeyTo = keyTo == null ? null :
cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
Comment on lines 580 to 583
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null cacheKeyFrom and cacheKeyTo will use range query, which is already supported in KIP-763.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
final K keyTo,
final Instant timeFrom,
final Instant timeTo) {
Objects.requireNonNull(keyFrom, "keyFrom can't be null");
Objects.requireNonNull(keyTo, "keyTo can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
store -> store.fetch(keyFrom, keyTo, timeFrom, timeTo);
return new DelegatingPeekingKeyValueIterator<>(
Expand All @@ -131,8 +129,6 @@ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo,
final Instant timeFrom,
final Instant timeTo) throws IllegalArgumentException {
Objects.requireNonNull(keyFrom, "keyFrom can't be null");
Objects.requireNonNull(keyTo, "keyTo can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
store -> store.backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
return new DelegatingPeekingKeyValueIterator<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,9 @@ KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final long timeFrom,
final long timeTo,
final boolean forward) {
Objects.requireNonNull(from, "from key cannot be null");
Objects.requireNonNull(to, "to key cannot be null");

removeExpiredSegments();

if (from.compareTo(to) > 0) {
if (from != null && to != null && from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
Expand Down Expand Up @@ -397,7 +394,8 @@ private WrappedWindowedKeyValueIterator registerNewWindowedKeyValueIterator(fina
final Bytes to = (retainDuplicates && keyTo != null) ? wrapForDups(keyTo, Integer.MAX_VALUE) : keyTo;

final WrappedWindowedKeyValueIterator iterator =
new WrappedWindowedKeyValueIterator(from,
new WrappedWindowedKeyValueIterator(
from,
to,
segmentIterator,
openIterators::remove,
Expand Down Expand Up @@ -462,14 +460,31 @@ public boolean hasNext() {
}

final Bytes key = getKey(next.key);
if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) {
if (isKeyWithinRange(key)) {
return true;
} else {
next = null;
return hasNext();
}
}

private boolean isKeyWithinRange(final Bytes key) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per comment above, can you make this method more readable by splitting the statements?

// split all cases for readability and avoid BooleanExpressionComplexity checkstyle warning
if (keyFrom == null && keyTo == null) {
// fetch all
return true;
} else if (keyFrom == null) {
// start from the beginning
return key.compareTo(getKey(keyTo)) <= 0;
} else if (keyTo == null) {
// end to the last
return key.compareTo(getKey(keyFrom)) >= 0;
} else {
// key is within the range
return key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0;
}
}

public void close() {
next = null;
recordIterator = null;
Expand Down Expand Up @@ -499,9 +514,16 @@ Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() {
final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
currentTime = currentSegment.getKey();

final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
currentSegment.getValue() :
currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
final ConcurrentNavigableMap<Bytes, byte[]> subMap;
if (allKeys) { // keyFrom == null && keyTo == null
subMap = currentSegment.getValue();
} else if (keyFrom == null) {
subMap = currentSegment.getValue().headMap(keyTo, true);
} else if (keyTo == null) {
subMap = currentSegment.getValue().tailMap(keyFrom, true);
} else {
subMap = currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
}

if (forward) {
return subMap.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,12 @@ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
Objects.requireNonNull(keyTo, "keyTo cannot be null");
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo),
wrapped().fetch(
keyFrom == null ? null : keyBytes(keyFrom),
keyTo == null ? null : keyBytes(keyTo),
timeFrom,
timeTo),
fetchSensor,
streamsMetrics,
serdes,
Expand All @@ -260,10 +262,12 @@ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
Objects.requireNonNull(keyTo, "keyTo cannot be null");
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo),
wrapped().backwardFetch(
keyFrom == null ? null : keyBytes(keyFrom),
keyTo == null ? null : keyBytes(keyTo),
timeFrom,
timeTo),
fetchSensor,
streamsMetrics,
serdes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,10 @@ public boolean hasNext() {
close();
currentSegment = segments.next();
try {
if (from == null || to == null) {
if (forward) {
currentIterator = currentSegment.all();
} else {
currentIterator = currentSegment.reverseAll();
}
if (forward) {
currentIterator = currentSegment.range(from, to);
} else {
if (forward) {
currentIterator = currentSegment.range(from, to);
} else {
currentIterator = currentSegment.reverseRange(from, to);
}
currentIterator = currentSegment.reverseRange(from, to);
}
} catch (final InvalidStateStoreException e) {
// segment may have been closed so we ignore it.
Expand Down
Loading