Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
final long from,
final long to) {
if (keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to);

final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CachingKeyValueStore
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(CachingKeyValueStore.class);

private CacheFlushListener<byte[], byte[]> flushListener;
private boolean sendOldValues;
private String cacheName;
Expand Down Expand Up @@ -228,6 +232,13 @@ private byte[] getInternal(final Bytes key) {
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().range(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import org.apache.kafka.streams.state.SessionStore;

import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CachingSessionStore
extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
implements SessionStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(CachingSessionStore.class);

private final SessionKeySchema keySchema;
private final SegmentedCacheFunction cacheFunction;
private String cacheName;
Expand Down Expand Up @@ -153,6 +157,13 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFro
final Bytes keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
if (keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

validateStoreOpen();

final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CachingWindowStore
extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]>
implements WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(CachingWindowStore.class);

private final long windowSize;
private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();

Expand Down Expand Up @@ -196,6 +200,13 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final long timeFrom,
final long timeTo) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here and below

}

// since this function may not access the underlying inner store, we need to validate
// if store is open outside as well.
validateStoreOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@

import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final ConcurrentNavigableMap<Bytes, byte[]> map;
private volatile boolean open = false;

private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class);

public InMemoryKeyValueStore(final String name) {
this.name = name;

Expand Down Expand Up @@ -111,6 +115,14 @@ public byte[] delete(final Bytes key) {

@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {

if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes

if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
LOG.warn("Skipping record for expired segment.");
} else {
if (value != null) {
this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
Expand Down Expand Up @@ -185,6 +185,13 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final long timeTo) {
removeExpiredSegments();

if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,27 @@
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryNavigableLRUCache extends MemoryLRUCache {

private static final Logger LOG = LoggerFactory.getLogger(MemoryNavigableLRUCache.class);

public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
super(name, maxCacheSize);
}

@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {

if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
return new DelegatingPeekingKeyValueIterator<>(name(),
new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");

if (from.compareTo(to) > 0) {
log.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

validateStoreOpen();

final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -379,6 +381,21 @@ public void shouldDeleteFromStore() {
assertNull(store.get(2));
}

@Test
public void shouldReturnSameResultsForGetAndRangeWithEqualKeys() {
final List<KeyValue<Integer, String>> entries = new ArrayList<>();
entries.add(new KeyValue<>(1, "one"));
entries.add(new KeyValue<>(2, "two"));
entries.add(new KeyValue<>(3, "three"));

store.putAll(entries);

final Iterator<KeyValue<Integer, String>> iterator = store.range(2, 2);

assertEquals(iterator.next().value, store.get(2));
assertFalse(iterator.hasNext());
}

@Test
public void shouldNotThrowConcurrentModificationException() {
store.put(0, "zero");
Expand All @@ -389,4 +406,18 @@ public void shouldNotThrowConcurrentModificationException() {

assertEquals(new KeyValue<>(0, "zero"), results.next());
}

@Test
public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

final KeyValueIterator iterator = store.range(-1, 1);
assertFalse(iterator.hasNext());

final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. "
+ "Note that the built-in numerical serdes do not follow this for negative numbers"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -28,6 +29,7 @@
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
Expand All @@ -50,6 +52,7 @@
import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertArrayEquals;
Expand Down Expand Up @@ -339,6 +342,22 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled()
flushListener.forwarded.clear();
}

@Test
public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() {
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes());
cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes());
cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes());
cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes());

final KeyValueIterator<Windowed<Bytes>, byte[]> singleKeyIterator = cachingStore.findSessions(keyAA, 0L, 10L);
final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = cachingStore.findSessions(keyAA, keyAA, 0L, 10L);

assertEquals(singleKeyIterator.next(), keyRangeIterator.next());
assertEquals(singleKeyIterator.next(), keyRangeIterator.next());
assertFalse(singleKeyIterator.hasNext());
assertFalse(keyRangeIterator.hasNext());
}

@Test
public void shouldClearNamespaceCacheOnClose() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
Expand Down Expand Up @@ -412,6 +431,23 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() {
cachingStore.put(null, "1".getBytes());
}

@Test
public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));

final KeyValueIterator iterator = cachingStore.findSessions(keyFrom, keyTo, 0L, 10L);
assertFalse(iterator.hasNext());

final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. "
+ "Note that the built-in numerical serdes do not follow this for negative numbers"));
}

private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String... sessionIds) {
final Random random = new Random();
final List<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<>();
Expand Down
Loading