, Closeable {
@Override
void close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
new file mode 100644
index 0000000000000..009dad05937c4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.NoSuchElementException;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param
+ * @param
+ */
+abstract class AbstractMergedSortedCacheStoreIterator implements KeyValueIterator {
+ private final PeekingKeyValueIterator cacheIterator;
+ private final KeyValueIterator storeIterator;
+ protected final StateSerdes serdes;
+
+ AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator cacheIterator,
+ final KeyValueIterator storeIterator,
+ final StateSerdes serdes) {
+ this.cacheIterator = cacheIterator;
+ this.storeIterator = storeIterator;
+ this.serdes = serdes;
+ }
+
+ abstract int compare(final Bytes cacheKey, final KS storeKey);
+
+ abstract K deserializeStoreKey(final KS key);
+
+ abstract KeyValue deserializeStorePair(final KeyValue pair);
+
+ abstract K deserializeCacheKey(final Bytes cacheKey);
+
+ private boolean isDeletedCacheEntry(final KeyValue nextFromCache) {
+ return nextFromCache.value.value == null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ // skip over items deleted from cache, and corresponding store items if they have the same key
+ while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
+ if (storeIterator.hasNext()) {
+ final KS nextStoreKey = storeIterator.peekNextKey();
+ // advance the store iterator if the key is the same as the deleted cache key
+ if (compare(cacheIterator.peekNextKey(), nextStoreKey) == 0) {
+ storeIterator.next();
+ }
+ }
+ cacheIterator.next();
+ }
+
+ return cacheIterator.hasNext() || storeIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final Bytes nextCacheKey = cacheIterator.hasNext() ? cacheIterator.peekNextKey() : null;
+ final KS nextStoreKey = storeIterator.hasNext() ? storeIterator.peekNextKey() : null;
+
+ if (nextCacheKey == null) {
+ return nextStoreValue(nextStoreKey);
+ }
+
+ if (nextStoreKey == null) {
+ return nextCacheValue(nextCacheKey);
+ }
+
+ final int comparison = compare(nextCacheKey, nextStoreKey);
+ if (comparison > 0) {
+ return nextStoreValue(nextStoreKey);
+ } else if (comparison < 0) {
+ return nextCacheValue(nextCacheKey);
+ } else {
+ // skip the same keyed element
+ storeIterator.next();
+ return nextCacheValue(nextCacheKey);
+ }
+ }
+
+ private KeyValue nextStoreValue(KS nextStoreKey) {
+ final KeyValue next = storeIterator.next();
+
+ if (!next.key.equals(nextStoreKey)) {
+ throw new IllegalStateException("Next record key is not the peeked key value; this should not happen");
+ }
+
+ return deserializeStorePair(next);
+ }
+
+ private KeyValue nextCacheValue(Bytes nextCacheKey) {
+ final KeyValue next = cacheIterator.next();
+
+ if (!next.key.equals(nextCacheKey)) {
+ throw new IllegalStateException("Next record key is not the peeked key value; this should not happen");
+ }
+
+ return KeyValue.pair(deserializeCacheKey(next.key), serdes.valueFrom(next.value.value));
+ }
+
+ @Override
+ public K peekNextKey() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final Bytes nextCacheKey = cacheIterator.hasNext() ? cacheIterator.peekNextKey() : null;
+ final KS nextStoreKey = storeIterator.hasNext() ? storeIterator.peekNextKey() : null;
+
+ if (nextCacheKey == null) {
+ return deserializeStoreKey(nextStoreKey);
+ }
+
+ if (nextStoreKey == null) {
+ return serdes.keyFrom(nextCacheKey.get());
+ }
+
+ final int comparison = compare(nextCacheKey, nextStoreKey);
+ if (comparison > 0) {
+ return deserializeStoreKey(nextStoreKey);
+ } else if (comparison < 0) {
+ return deserializeCacheKey(nextCacheKey);
+ } else {
+ // skip the same keyed element
+ storeIterator.next();
+ return deserializeCacheKey(nextCacheKey);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() is not supported");
+ }
+
+ @Override
+ public void close() {
+ cacheIterator.close();
+ storeIterator.close();
+ }
+}
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index fdb03fd13e0a5..9a0a9763cbfa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -31,7 +31,7 @@
import java.util.List;
-class CachingKeyValueStore implements KeyValueStore, CachedStateStore {
+class CachingKeyValueStore implements WrappedStateStore, KeyValueStore, CachedStateStore {
private final KeyValueStore underlying;
private final Serde keySerde;
@@ -234,4 +234,12 @@ public synchronized V delete(final K key) {
KeyValueStore underlying() {
return underlying;
}
+
+ @Override
+ public StateStore inner() {
+ if (underlying instanceof WrappedStateStore) {
+ return ((WrappedStateStore) underlying).inner();
+ }
+ return underlying;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 17c4ee0084684..fec660932a696 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -19,7 +19,6 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -35,21 +34,22 @@
import java.util.NoSuchElementException;
-class CachingSessionStore implements SessionStore, CachedStateStore, AGG> {
+class CachingSessionStore extends WrappedStateStore.AbstractWrappedStateStore implements SessionStore, CachedStateStore, AGG> {
- private final SegmentedBytesStore bytesStore;
+ private final SessionStore bytesStore;
private final SessionKeySchema keySchema;
private Serde keySerde;
private final Serde aggSerde;
private InternalProcessorContext context;
private String name;
- private StateSerdes, AGG> serdes;
+ private StateSerdes serdes;
private ThreadCache cache;
private CacheFlushListener, AGG> flushListener;
- CachingSessionStore(final SegmentedBytesStore bytesStore,
+ CachingSessionStore(final SessionStore bytesStore,
final Serde keySerde,
final Serde aggSerde) {
+ super(bytesStore);
this.bytesStore = bytesStore;
this.keySerde = keySerde;
this.aggSerde = aggSerde;
@@ -65,12 +65,12 @@ public KeyValueIterator, AGG> findSessions(final K key,
keySchema.lowerRange(binarySessionId,
earliestSessionEndTime).get(),
keySchema.upperRange(binarySessionId, latestSessionStartTime).get());
- final KeyValueIterator storeIterator = bytesStore.fetch(binarySessionId, earliestSessionEndTime, latestSessionStartTime);
+ final KeyValueIterator, byte[]> storeIterator = bytesStore.findSessions(binarySessionId, earliestSessionEndTime, latestSessionStartTime);
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId,
- earliestSessionEndTime,
- latestSessionStartTime);
+ earliestSessionEndTime,
+ latestSessionStartTime);
final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);
- return new MergedSortedCacheKeyValueStoreIterator<>(filteredCacheIterator, storeIterator, serdes);
+ return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes);
}
@@ -92,11 +92,6 @@ public KeyValueIterator, AGG> fetch(final K key) {
return findSessions(key, 0, Long.MAX_VALUE);
}
-
- public String name() {
- return bytesStore.name();
- }
-
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
bytesStore.init(context, root);
@@ -107,14 +102,9 @@ public void init(final ProcessorContext context, final StateStore root) {
private void initInternal(final InternalProcessorContext context) {
this.context = context;
- if (keySerde == null) {
- keySerde = (Serde) context.keySerde();
- }
-
-
- this.serdes = (StateSerdes, AGG>) new StateSerdes<>(bytesStore.name(),
- new SessionKeySerde<>(keySerde),
- aggSerde == null ? context.valueSerde() : aggSerde);
+ this.serdes = new StateSerdes<>(bytesStore.name(),
+ keySerde == null ? (Serde) context.keySerde() : keySerde,
+ aggSerde == null ? (Serde) context.valueSerde() : aggSerde);
this.name = context.taskId() + "-" + bytesStore.name();
@@ -135,27 +125,27 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
final RecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
+ final Windowed key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer());
if (flushListener != null) {
- final Windowed key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer());
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = fetchPrevious(binaryKey);
if (!(newValue == null && oldValue == null)) {
flushListener.apply(key, newValue == null ? null : newValue, oldValue);
}
-
}
- bytesStore.put(binaryKey, entry.newValue());
+ bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue());
} finally {
context.setRecordContext(current);
}
}
private AGG fetchPrevious(final Bytes key) {
- final byte[] bytes = bytesStore.get(key);
- if (bytes == null) {
- return null;
+ try (final KeyValueIterator, byte[]> iterator = bytesStore.fetch(key)) {
+ if (!iterator.hasNext()) {
+ return null;
+ }
+ return serdes.valueFrom(iterator.next().value);
}
- return serdes.valueFrom(bytes);
}
@@ -170,25 +160,10 @@ public void close() {
cache.close(name);
}
- public boolean persistent() {
- return bytesStore.persistent();
- }
-
- public boolean isOpen() {
- return bytesStore.isOpen();
- }
-
public void setFlushListener(CacheFlushListener, AGG> flushListener) {
this.flushListener = flushListener;
}
- private void validateStoreOpen() {
- if (!isOpen()) {
- throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
- }
- }
-
-
private static class FilteredCacheIterator implements PeekingKeyValueIterator {
private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
private final HasNextCondition hasNextCondition;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index bd252f147d0ae..d471761f36475 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -26,16 +26,15 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordContext;
-import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.List;
-class CachingWindowStore implements WindowStore, CachedStateStore, V> {
+class CachingWindowStore extends WrappedStateStore.AbstractWrappedStateStore implements WindowStore, CachedStateStore, V> {
- private final SegmentedBytesStore underlying;
+ private final WindowStore underlying;
private final Serde keySerde;
private final Serde valueSerde;
private CacheFlushListener, V> flushListener;
@@ -45,21 +44,17 @@ class CachingWindowStore implements WindowStore, CachedStateStore serdes;
- CachingWindowStore(final SegmentedBytesStore underlying,
+ CachingWindowStore(final WindowStore underlying,
final Serde keySerde,
final Serde valueSerde,
final long windowSize) {
+ super(underlying);
this.underlying = underlying;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.windowSize = windowSize;
}
- @Override
- public String name() {
- return underlying.name();
- }
-
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
@@ -80,13 +75,14 @@ void initInternal(final ProcessorContext context) {
@Override
public void apply(final List entries) {
for (ThreadCache.DirtyEntry entry : entries) {
- final byte[] binaryKey = entry.key().get();
- final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryKey);
- final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
- final Windowed windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes),
+ final byte[] binaryWindowKey = entry.key().get();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey);
+
+ final Windowed windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes),
new TimeWindow(timestamp, timestamp + windowSize));
- maybeForward(entry, Bytes.wrap(binaryKey), windowedKey, (InternalProcessorContext) context);
- underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, timestamp, 0, WindowStoreUtils.INNER_SERDES)), entry.newValue());
+ final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey);
+ maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
+ underlying.put(key, entry.newValue(), timestamp);
}
}
});
@@ -102,7 +98,7 @@ private void maybeForward(final ThreadCache.DirtyEntry entry,
context.setRecordContext(entry.recordContext());
try {
flushListener.apply(windowedKey,
- serdes.valueFrom(entry.newValue()), fetchPrevious(key));
+ serdes.valueFrom(entry.newValue()), fetchPrevious(key, windowedKey.window().start()));
} finally {
context.setRecordContext(current);
}
@@ -127,16 +123,6 @@ public void close() {
cache.close(name);
}
- @Override
- public boolean persistent() {
- return underlying.persistent();
- }
-
- @Override
- public boolean isOpen() {
- return underlying.isOpen();
- }
-
@Override
public synchronized void put(final K key, final V value) {
put(key, value, context.timestamp());
@@ -158,23 +144,21 @@ public synchronized WindowStoreIterator fetch(final K key, final long timeFro
byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
- final KeyValueIterator underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+ final WindowStoreIterator underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, binaryFrom, binaryTo);
- return new MergedSortedCachedWindowStoreIterator<>(cacheIterator, new DelegatingPeekingKeyValueIterator<>(name, underlyingIterator), serdes);
+ return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
+ underlyingIterator,
+ new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
}
- private V fetchPrevious(final Bytes key) {
- final byte[] result = underlying.get(key);
- if (result == null) {
- return null;
- }
- return serdes.valueFrom(result);
- }
-
- private void validateStoreOpen() {
- if (!isOpen()) {
- throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
+ private V fetchPrevious(final Bytes key, final long timestamp) {
+ try (final WindowStoreIterator iter = underlying.fetch(key, timestamp, timestamp)) {
+ if (!iter.hasNext()) {
+ return null;
+ } else {
+ return serdes.valueFrom(iter.next().value);
+ }
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
new file mode 100644
index 0000000000000..e31d04bd1f51f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore {
+ private final KeyValueStore inner;
+ private StoreChangeLogger changeLogger;
+
+ public ChangeLoggingKeyValueBytesStore(final KeyValueStore inner) {
+ super(inner);
+ this.inner = inner;
+ }
+
+ @Override
+ public void init(final ProcessorContext context, final StateStore root) {
+ inner.init(context, root);
+ this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES);
+ }
+
+
+ @Override
+ public void put(final Bytes key, final byte[] value) {
+ inner.put(key, value);
+ changeLogger.logChange(key, value);
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+ final byte[] previous = get(key);
+ if (previous == null) {
+ put(key, value);
+ }
+ return previous;
+ }
+
+ @Override
+ public void putAll(final List> entries) {
+ inner.putAll(entries);
+ for (KeyValue entry : entries) {
+ changeLogger.logChange(entry.key, entry.value);
+ }
+ }
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ final byte[] oldValue = inner.get(key);
+ put(key, null);
+ return oldValue;
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ return inner.get(key);
+ }
+
+ @Override
+ public KeyValueIterator range(final Bytes from, final Bytes to) {
+ return inner.range(from, to);
+ }
+
+ @Override
+ public KeyValueIterator all() {
+ return inner.all();
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return inner.approximateNumEntries();
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
new file mode 100644
index 0000000000000..11cf8022d8bb3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class ChangeLoggingKeyValueStore extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore {
+ private final ChangeLoggingKeyValueBytesStore innerBytes;
+ private final Serde keySerde;
+ private final Serde valueSerde;
+ private StateSerdes serdes;
+
+ ChangeLoggingKeyValueStore(final KeyValueStore bytesStore,
+ final Serde keySerde,
+ final Serde valueSerde) {
+ this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, valueSerde);
+ }
+
+ private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore,
+ final Serde keySerde,
+ final Serde valueSerde) {
+ super(bytesStore);
+ this.innerBytes = bytesStore;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context, final StateStore root) {
+ innerBytes.init(context, root);
+
+ this.serdes = new StateSerdes<>(innerBytes.name(),
+ keySerde == null ? (Serde) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde) context.valueSerde() : valueSerde);
+ }
+
+ @Override
+ public void put(final K key, final V value) {
+ final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
+ final byte[] bytesValue = serdes.rawValue(value);
+ innerBytes.put(bytesKey, bytesValue);
+ }
+
+ @Override
+ public V putIfAbsent(final K key, final V value) {
+ final V v = get(key);
+ if (v == null) {
+ put(key, value);
+ }
+ return v;
+ }
+
+ @Override
+ public void putAll(final List> entries) {
+ final List> keyValues = new ArrayList<>();
+ for (final KeyValue entry : entries) {
+ keyValues.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
+ }
+ innerBytes.putAll(keyValues);
+ }
+
+ @Override
+ public V delete(final K key) {
+ final byte[] oldValue = innerBytes.delete(Bytes.wrap(serdes.rawKey(key)));
+ if (oldValue == null) {
+ return null;
+ }
+ return serdes.valueFrom(oldValue);
+ }
+
+ @Override
+ public V get(final K key) {
+ final byte[] rawValue = innerBytes.get(Bytes.wrap(serdes.rawKey(key)));
+ if (rawValue == null) {
+ return null;
+ }
+ return serdes.valueFrom(rawValue);
+ }
+
+ @Override
+ public KeyValueIterator range(final K from, final K to) {
+ return new SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)),
+ Bytes.wrap(serdes.rawKey(to))),
+ serdes);
+ }
+
+ @Override
+ public KeyValueIterator all() {
+ return new SerializedKeyValueIterator<>(innerBytes.all(), serdes);
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return innerBytes.approximateNumEntries();
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 14b8f1710e287..21c28662522d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -25,13 +25,14 @@
* Simple wrapper around a {@link SegmentedBytesStore} to support writing
* updates to a changelog
*/
-class ChangeLoggingSegmentedBytesStore implements SegmentedBytesStore {
+class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore {
private final SegmentedBytesStore bytesStore;
private StoreChangeLogger changeLogger;
ChangeLoggingSegmentedBytesStore(final SegmentedBytesStore bytesStore) {
+ super(bytesStore);
this.bytesStore = bytesStore;
}
@@ -60,10 +61,6 @@ public byte[] get(final Bytes key) {
return bytesStore.get(key);
}
- @Override
- public String name() {
- return bytesStore.name();
- }
@Override
@SuppressWarnings("unchecked")
@@ -71,25 +68,4 @@ public void init(final ProcessorContext context, final StateStore root) {
bytesStore.init(context, root);
changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES);
}
-
- @Override
- public void flush() {
- bytesStore.flush();
- }
-
- @Override
- public void close() {
- bytesStore.close();
- }
-
- @Override
- public boolean persistent() {
- return bytesStore.persistent();
- }
-
- @Override
- public boolean isOpen() {
- return bytesStore.isOpen();
- }
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index b33c0f0ceab3d..e0f1ec85b037b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -61,6 +61,11 @@ public WindowStoreIterator fetch(final K key, final long timeFrom, final long
public void close() {
}
+ @Override
+ public Long peekNextKey() {
+ throw new NoSuchElementException();
+ }
+
@Override
public boolean hasNext() {
return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index eb57acee688cf..f3101b17f9c54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -22,7 +22,7 @@
import java.util.NoSuchElementException;
-public class DelegatingPeekingKeyValueIterator implements KeyValueIterator {
+public class DelegatingPeekingKeyValueIterator implements KeyValueIterator, PeekingKeyValueIterator {
private final String storeName;
private final KeyValueIterator underlying;
private KeyValue next;
@@ -78,4 +78,12 @@ public synchronized KeyValue next() {
public void remove() {
throw new UnsupportedOperationException("remove not supported");
}
+
+ @Override
+ public KeyValue peekNext() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return next;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index c9a6866d9d17a..b860e161e0d9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -21,141 +21,37 @@
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
-import java.util.Comparator;
-import java.util.NoSuchElementException;
-
/**
* Merges two iterators. Assumes each of them is sorted by key
*
* @param
* @param
*/
-class MergedSortedCacheKeyValueStoreIterator implements KeyValueIterator {
- private final PeekingKeyValueIterator cacheIterator;
- private final KeyValueIterator storeIterator;
- private final StateSerdes serdes;
- private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR;
-
- public MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator cacheIterator,
- final KeyValueIterator storeIterator,
- final StateSerdes serdes) {
- this.cacheIterator = cacheIterator;
- this.storeIterator = storeIterator;
- this.serdes = serdes;
- }
-
- @Override
- public boolean hasNext() {
- while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
- if (storeIterator.hasNext()) {
- final Bytes storeKey = storeIterator.peekNextKey();
- // advance the store iterator if the key is the same as the deleted cache key
- if (storeKey.equals(cacheIterator.peekNextKey())) {
- storeIterator.next();
- }
- }
- // skip over items deleted from cache
- cacheIterator.next();
- }
- return cacheIterator.hasNext() || storeIterator.hasNext();
- }
+class MergedSortedCacheKeyValueStoreIterator extends AbstractMergedSortedCacheStoreIterator {
-
- private boolean isDeletedCacheEntry(final KeyValue nextFromCache) {
- return nextFromCache.value.value == null;
+ MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator cacheIterator,
+ final KeyValueIterator storeIterator,
+ final StateSerdes serdes) {
+ super(cacheIterator, storeIterator, serdes);
}
-
@Override
- public KeyValue next() {
-
- return internalNext(new NextValueFunction>() {
- @Override
- public KeyValue apply(final byte[] cacheKey, final byte[] storeKey) {
- if (cacheKey == null) {
- return nextStoreValue();
- }
-
- if (storeKey == null) {
- return nextCacheValue();
- }
-
- final int comparison = comparator.compare(cacheKey, storeKey);
- if (comparison > 0) {
- return nextStoreValue();
- } else if (comparison < 0) {
- return nextCacheValue();
- } else {
- storeIterator.next();
- return nextCacheValue();
- }
- }
- });
+ public KeyValue deserializeStorePair(KeyValue pair) {
+ return KeyValue.pair(serdes.keyFrom(pair.key.get()), serdes.valueFrom(pair.value));
}
@Override
- public K peekNextKey() {
- return internalNext(new NextValueFunction() {
- @Override
- public K apply(final byte[] cacheKey, final byte[] storeKey) {
- if (cacheKey == null) {
- return serdes.keyFrom(storeKey);
- }
-
- if (storeKey == null) {
- return serdes.keyFrom(cacheKey);
- }
-
- final int comparison = comparator.compare(cacheKey, storeKey);
- if (comparison > 0) {
- return serdes.keyFrom(storeKey);
- } else {
- return serdes.keyFrom(cacheKey);
- }
- }
- });
- }
-
- interface NextValueFunction {
- T apply(final byte[] cacheKey, final byte [] storeKey);
- }
-
- private T internalNext(final NextValueFunction nextValueFunction) {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- byte[] nextCacheKey = null;
- if (cacheIterator.hasNext()) {
- nextCacheKey = cacheIterator.peekNextKey().get();
- }
-
- byte[] nextStoreKey = null;
- if (storeIterator.hasNext()) {
- nextStoreKey = storeIterator.peekNextKey().get();
- }
-
- return nextValueFunction.apply(nextCacheKey, nextStoreKey);
- }
-
- private KeyValue nextCacheValue() {
- final KeyValue next = cacheIterator.next();
- return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value.value));
- }
-
- private KeyValue nextStoreValue() {
- final KeyValue next = storeIterator.next();
- return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value));
+ K deserializeCacheKey(final Bytes cacheKey) {
+ return serdes.keyFrom(cacheKey.get());
}
@Override
- public void remove() {
- throw new UnsupportedOperationException("remove not supported");
+ public K deserializeStoreKey(Bytes key) {
+ return serdes.keyFrom(key.get());
}
@Override
- public void close() {
- cacheIterator.close();
- storeIterator.close();
+ public int compare(Bytes cacheKey, Bytes storeKey) {
+ return cacheKey.compareTo(storeKey);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
new file mode 100644
index 0000000000000..db64621340116
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param
+ * @param
+ */
+class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheStoreIterator, Windowed, AGG> {
+ private final StateSerdes rawSerdes;
+
+
+ MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator cacheIterator,
+ final KeyValueIterator, byte[]> storeIterator,
+ final StateSerdes serdes) {
+ super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(),
+ new SessionKeySerde<>(serdes.keySerde()),
+ serdes.valueSerde()));
+
+ rawSerdes = serdes;
+ }
+
+ @Override
+ public KeyValue, AGG> deserializeStorePair(KeyValue, byte[]> pair) {
+ final K key = rawSerdes.keyFrom(pair.key.key().get());
+ return KeyValue.pair(new Windowed<>(key, pair.key.window()), serdes.valueFrom(pair.value));
+ }
+
+ @Override
+ Windowed deserializeCacheKey(final Bytes cacheKey) {
+ return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer());
+ }
+
+ @Override
+ public Windowed deserializeStoreKey(Windowed key) {
+ final K originalKey = rawSerdes.keyFrom(key.key().get());
+ return new Windowed(originalKey, key.window());
+ }
+
+ @Override
+ public int compare(Bytes cacheKey, Windowed storeKey) {
+ Bytes storeKeyBytes = SessionKeySerde.bytesToBinary(storeKey);
+ return cacheKey.compareTo(storeKeyBytes);
+ }
+}
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
new file mode 100644
index 0000000000000..a9d097320086c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ * @param
+ */
+class MergedSortedCacheWindowStoreIterator extends AbstractMergedSortedCacheStoreIterator implements WindowStoreIterator {
+
+ MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator cacheIterator,
+ final KeyValueIterator storeIterator,
+ final StateSerdes serdes) {
+ super(cacheIterator, storeIterator, serdes);
+ }
+
+ @Override
+ public KeyValue deserializeStorePair(final KeyValue pair) {
+ return KeyValue.pair(pair.key, serdes.valueFrom(pair.value));
+ }
+
+ @Override
+ Long deserializeCacheKey(final Bytes cacheKey) {
+ return WindowStoreUtils.timestampFromBinaryKey(cacheKey.get());
+ }
+
+ @Override
+ public Long deserializeStoreKey(final Long key) {
+ return key;
+ }
+
+ @Override
+ public int compare(final Bytes cacheKey, final Long storeKey) {
+ final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheKey.get());
+ return cacheTimestamp.compareTo(storeKey);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
deleted file mode 100644
index e210e73111fe9..0000000000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.util.NoSuchElementException;
-
-/**
- * Merges two iterators. Assumes each of them is sorted by key
- *
- * @param
- * @param
- */
-class MergedSortedCachedWindowStoreIterator implements WindowStoreIterator {
- private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
- private final KeyValueIterator storeIterator;
- private final StateSerdes