From d5a84115a210cb959d2ca6fcc7a08618c7b8a97d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 16 Dec 2018 15:59:20 +0100 Subject: [PATCH 1/4] KAFKA-3522: Add in-memory KeyValueWithTimestampStore --- .../internals/ProcessorContextImpl.java | 146 +++++++++++- .../state/KeyValueWithTimestampStore.java | 50 +++++ .../streams/state/ValueAndTimestamp.java | 29 +++ .../internals/InMemoryKeyValueStore.java | 6 +- .../InMemoryKeyValueWithTimestampStore.java | 212 ++++++++++++++++++ .../ValueAndTimestampDeserializer.java | 59 +++++ .../internals/ValueAndTimestampImpl.java | 65 ++++++ .../internals/ValueAndTimestampSerde.java | 69 ++++++ .../ValueAndTimestampSerializer.java | 69 ++++++ 9 files changed, 698 insertions(+), 7 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampImpl.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 9ecc73c14e4d1..07422198ccd6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -31,7 +31,9 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.KeyValueWithTimestampStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -83,7 +85,9 @@ public StateStore getStateStore(final String name) { final StateStore global = stateManager.getGlobalStore(name); if (global != null) { - if (global instanceof KeyValueStore) { + if (global instanceof KeyValueWithTimestampStore) { + return new KeyValueWithTimestampStoreReadOnlyDecorator((KeyValueWithTimestampStore) global); + } else if (global instanceof KeyValueStore) { return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global); } else if (global instanceof WindowStore) { return new WindowStoreReadOnlyDecorator((WindowStore) global); @@ -105,7 +109,9 @@ public StateStore getStateStore(final String name) { } final StateStore store = stateManager.getStore(name); - if (store instanceof KeyValueStore) { + if (store instanceof KeyValueWithTimestampStore) { + return new KeyValueWithTimestampStoreReadWriteDecorator((KeyValueWithTimestampStore) store); + } else if (store instanceof KeyValueStore) { return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); } else if (store instanceof WindowStore) { return new WindowStoreReadWriteDecorator((WindowStore) store); @@ -295,6 +301,72 @@ public V delete(final K key) { } } + private static class KeyValueWithTimestampStoreReadOnlyDecorator + extends StateStoreReadOnlyDecorator> + implements KeyValueWithTimestampStore { + + private KeyValueWithTimestampStoreReadOnlyDecorator(final KeyValueWithTimestampStore inner) { + super(inner); + } + + @Override + public ValueAndTimestamp get(final K key) { + return getInner().get(key); + } + + @Override + public KeyValueIterator> range(final K from, + final K to) { + return getInner().range(from, to); + } + + @Override + public KeyValueIterator> all() { + return getInner().all(); + } + + @Override + public long approximateNumEntries() { + return getInner().approximateNumEntries(); + } + + @Override + public void put(final K key, + final ValueAndTimestamp valueAndTimestamp) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void put(final K key, + final V value, + final long windowStartTimestamp) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public ValueAndTimestamp putIfAbsent(final K key, + final ValueAndTimestamp valueAndTimestamp) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public ValueAndTimestamp putIfAbsent(final K key, + final V value, + final long timestamp) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void putAll(final List entries) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public ValueAndTimestamp delete(final K key) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + } + private static class WindowStoreReadOnlyDecorator extends StateStoreReadOnlyDecorator> implements WindowStore { @@ -474,6 +546,72 @@ public V delete(final K key) { } } + private static class KeyValueWithTimestampStoreReadWriteDecorator + extends StateStoreReadWriteDecorator> + implements KeyValueWithTimestampStore { + + private KeyValueWithTimestampStoreReadWriteDecorator(final KeyValueWithTimestampStore inner) { + super(inner); + } + + @Override + public ValueAndTimestamp get(final K key) { + return wrapped().get(key); + } + + @Override + public KeyValueIterator> range(final K from, + final K to) { + return wrapped().range(from, to); + } + + @Override + public KeyValueIterator> all() { + return wrapped().all(); + } + + @Override + public long approximateNumEntries() { + return wrapped().approximateNumEntries(); + } + + @Override + public void put(final K key, + final ValueAndTimestamp valueAndTimestamp) { + wrapped().put(key, valueAndTimestamp); + } + + @Override + public void put(final K key, + final V value, + final long timestamp) { + wrapped().put(key, value, timestamp); + } + + @Override + public ValueAndTimestamp putIfAbsent(final K key, + final ValueAndTimestamp valueAndTimestamp) { + return wrapped().putIfAbsent(key, valueAndTimestamp); + } + + @Override + public ValueAndTimestamp putIfAbsent(final K key, + final V value, + final long timestamp) { + return wrapped().putIfAbsent(key, value, timestamp); + } + + @Override + public void putAll(final List>> entries) { + wrapped().putAll(entries); + } + + @Override + public ValueAndTimestamp delete(final K key) { + return wrapped().delete(key); + } + } + private static class WindowStoreReadWriteDecorator extends StateStoreReadWriteDecorator> implements WindowStore { @@ -560,7 +698,8 @@ public void remove(final Windowed sessionKey) { } @Override - public void put(final Windowed sessionKey, final AGG aggregate) { + public void put(final Windowed sessionKey, + final AGG aggregate) { wrapped().put(sessionKey, aggregate); } @@ -575,4 +714,5 @@ public KeyValueIterator, AGG> fetch(final K from, return wrapped().fetch(from, to); } } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java new file mode 100644 index 0000000000000..baa20b83a6e22 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +/** + * A key-(value/timestamp) store that supports put/get/delete and range queries. + * + * @param The key type + * @param The value type + */ +public interface KeyValueWithTimestampStore extends KeyValueStore> { + + /** + * Update the value and timestamp associated with this key. + * + * @param key The key to associate the value to + * @param value The value to update, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as deletes + * @param timestamp the timestamp to be stored next to the value; ignored if {@code value} is {@code null} + * @throws NullPointerException If {@code null} is used for key. + */ + void put(K key, V value, long timestamp); + + /** + * Update the value associated with this key, unless a value is already associated with the key. + * + * @param key The key to associate the value to + * @param value The value to update, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as deletes + * @param timestamp the timestamp to be stored next to the value; ignored if {@code value} is {@code null} + * @return The old value and timestamp or {@code null} if there is no such key. + * @throws NullPointerException If {@code null} is used for key. + */ + ValueAndTimestamp putIfAbsent(K key, V value, long timestamp); + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java new file mode 100644 index 0000000000000..86fa33de1b526 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.KeyValue; + +/** + * Combines a value from a {@link KeyValue} with a timestamp. + * + * @param + */ +public interface ValueAndTimestamp { + V value(); + long timestamp(); +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index d6dd42ac264ff..5a3f6269a2c2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -47,8 +47,6 @@ public InMemoryKeyValueStore(final String name, this.keySerde = keySerde; this.valueSerde = valueSerde; - // TODO: when we have serde associated with class types, we can - // improve this situation by passing the comparator here. this.map = new TreeMap<>(); } @@ -159,10 +157,10 @@ public void close() { this.open = false; } - private static class InMemoryKeyValueIterator implements KeyValueIterator { + static class InMemoryKeyValueIterator implements KeyValueIterator { private final Iterator> iter; - private InMemoryKeyValueIterator(final Iterator> iter) { + InMemoryKeyValueIterator(final Iterator> iter) { this.iter = iter; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java new file mode 100644 index 0000000000000..f91ac31634c37 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueWithTimestampStore; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +public class InMemoryKeyValueWithTimestampStore implements KeyValueWithTimestampStore { + private final String name; + private final Serde keySerde; + private final ValueAndTimestampSerde valueAndTimestampSerde; + private final NavigableMap> map; + private volatile boolean open = false; + + private StateSerdes> serdes; + + public InMemoryKeyValueWithTimestampStore(final String name, + final Serde keySerde, + final ValueAndTimestampSerde valueAndTimestampSerde) { + this.name = name; + this.keySerde = keySerde; + this.valueAndTimestampSerde = valueAndTimestampSerde; + + this.map = new TreeMap<>(); + } + + @Override + public String name() { + return this.name; + } + + @Override + @SuppressWarnings("unchecked") + public void init(final ProcessorContext context, + final StateStore root) { + // construct the serde + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueAndTimestampSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueAndTimestampSerde); + + if (root != null) { + // register the store + context.register(root, (key, value) -> { + // this is a delete + if (value == null) { + delete(serdes.keyFrom(key)); + } else { + put(serdes.keyFrom(key), serdes.valueFrom(value)); + } + }); + } + + this.open = true; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public synchronized ValueAndTimestamp get(final K key) { + return this.map.get(key); + } + + @Override + public synchronized void put(final K key, + final ValueAndTimestamp valueAndTimestamp) { + if (valueAndTimestamp == null || valueAndTimestamp.value() == null) { + this.map.remove(key); + } else { + this.map.put(key, valueAndTimestamp); + } + } + + @Override + public synchronized void put(final K key, + final V value, + final long timestamp) { + if (value == null) { + this.map.remove(key); + } else { + this.map.put(key, new ValueAndTimestampImpl<>(value, timestamp)); + } + } + + @Override + public synchronized ValueAndTimestamp putIfAbsent(final K key, + final ValueAndTimestamp valueAndTimestamp) { + final ValueAndTimestamp originalValueAndTimestamp = get(key); + if (originalValueAndTimestamp == null) { + put(key, valueAndTimestamp); + } + return originalValueAndTimestamp; + } + + @Override + public synchronized ValueAndTimestamp putIfAbsent(final K key, + final V value, + final long timestamp) { + return putIfAbsent(key, new ValueAndTimestampImpl<>(value, timestamp)); + } + + @Override + public synchronized void putAll(final List>> entries) { + for (final KeyValue> entry : entries) { + put(entry.key, entry.value); + } + } + + @Override + public synchronized ValueAndTimestamp delete(final K key) { + return this.map.remove(key); + } + + @Override + public synchronized KeyValueIterator> range(final K from, + final K to) { + return new DelegatingPeekingKeyValueIterator<>( + name, + new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + } + + @Override + public synchronized KeyValueIterator> all() { + final TreeMap> copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator())); + } + + @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + this.map.clear(); + this.open = false; + } + + static class InMemoryKeyValueIterator implements KeyValueIterator { + private final Iterator> iter; + + InMemoryKeyValueIterator(final Iterator> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue next() { + final Map.Entry entry = iter.next(); + return new KeyValue<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java new file mode 100644 index 0000000000000..8c1b272b369cc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.util.Arrays; +import java.util.Map; + +class ValueAndTimestampDeserializer implements Deserializer> { + public final Deserializer valueDeserializer; + private final Deserializer timestampDeserializer; + + ValueAndTimestampDeserializer(final Deserializer valueDeserializer) { + this.valueDeserializer = valueDeserializer; + timestampDeserializer = new LongDeserializer(); + } + + @Override + public void configure(final Map configs, + final boolean isKey) { + valueDeserializer.configure(configs, isKey); + timestampDeserializer.configure(configs, isKey); + } + + @Override + public ValueAndTimestamp deserialize(final String topic, + final byte[] data) { + if (data == null) { + return null; + } + final long timestamp = timestampDeserializer.deserialize(topic, Arrays.copyOfRange(data, 0, 8)); + final V value = valueDeserializer.deserialize(topic, Arrays.copyOfRange(data, 8, data.length)); + return new ValueAndTimestampImpl<>(value, timestamp); + } + + @Override + public void close() { + valueDeserializer.close(); + timestampDeserializer.close(); + + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampImpl.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampImpl.java new file mode 100644 index 0000000000000..9132649b92e43 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.util.Objects; + +public class ValueAndTimestampImpl implements ValueAndTimestamp { + private final V value; + private final long timestamp; + + public ValueAndTimestampImpl(final V value, + final long timestamp) { + this.value = value; + this.timestamp = timestamp; + } + + @Override + public V value() { + return value; + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public String toString() { + return "<" + value + "," + timestamp + ">"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ValueAndTimestampImpl that = (ValueAndTimestampImpl) o; + return timestamp == that.timestamp && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value, timestamp); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java new file mode 100644 index 0000000000000..bd38102a1f027 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.util.Map; + +public class ValueAndTimestampSerde implements Serde> { + private final ValueAndTimestampSerializer valueAndTimestampSerializer; + private final ValueAndTimestampDeserializer valueAndTimestampDeserializer; + + public ValueAndTimestampSerde(final Serde valueSerde) { + if (valueSerde == null) { + valueAndTimestampSerializer = null; + valueAndTimestampDeserializer = null; + } else { + valueAndTimestampSerializer = new ValueAndTimestampSerializer<>(valueSerde.serializer()); + valueAndTimestampDeserializer = new ValueAndTimestampDeserializer<>(valueSerde.deserializer()); + } + } + + @Override + public void configure(final Map configs, final boolean isKey) { + if (valueAndTimestampSerializer != null) { + valueAndTimestampSerializer.configure(configs, isKey); + } + if (valueAndTimestampDeserializer != null) { + valueAndTimestampDeserializer.configure(configs, isKey); + } + } + + @Override + public void close() { + if (valueAndTimestampSerializer != null) { + valueAndTimestampSerializer.close(); + } + if (valueAndTimestampDeserializer != null) { + valueAndTimestampDeserializer.close(); + } + } + + @Override + public Serializer> serializer() { + return valueAndTimestampSerializer; + } + + @Override + public Deserializer> deserializer() { + return valueAndTimestampDeserializer; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java new file mode 100644 index 0000000000000..507140cd330bd --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.util.Map; + +class ValueAndTimestampSerializer implements Serializer> { + public final Serializer valueSerializer; + private final Serializer timestampSerializer; + + ValueAndTimestampSerializer(final Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + timestampSerializer = new LongSerializer(); + } + + @Override + public void configure(final Map configs, + final boolean isKey) { + valueSerializer.configure(configs, isKey); + timestampSerializer.configure(configs, isKey); + } + + @Override + public byte[] serialize(final String topic, + final ValueAndTimestamp data) { + if (data == null) { + return null; + } + return serialize(topic, data.value(), data.timestamp()); + } + + public byte[] serialize(final String topic, + final V data, + final long timestamp) { + if (data == null) { + return null; + } + final byte[] rawValue = valueSerializer.serialize(topic, data); + final byte[] rawTimestamp = timestampSerializer.serialize(topic, timestamp); + final byte[] rawValueAndTimestamp = new byte[rawTimestamp.length + rawValue.length]; + System.arraycopy(rawTimestamp, 0, rawValueAndTimestamp, 0, rawTimestamp.length); + System.arraycopy(rawValue, 0, rawValueAndTimestamp, rawTimestamp.length, rawValue.length); + return rawValueAndTimestamp; + } + + @Override + public void close() { + valueSerializer.close(); + timestampSerializer.close(); + } +} From 2cd92a50740f2e7b53710ff265bb35314673caab Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 18 Jan 2019 17:40:15 -0800 Subject: [PATCH 2/4] Renamed interfaces/classes according to KIP --- .../internals/ProcessorContextImpl.java | 26 +++++++++---------- ...ore.java => TimestampedKeyValueStore.java} | 2 +- ... => InMemoryTimestampedKeyValueStore.java} | 10 +++---- 3 files changed, 19 insertions(+), 19 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/state/{KeyValueWithTimestampStore.java => TimestampedKeyValueStore.java} (95%) rename streams/src/main/java/org/apache/kafka/streams/state/internals/{InMemoryKeyValueWithTimestampStore.java => InMemoryTimestampedKeyValueStore.java} (94%) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 07422198ccd6e..2e95dd56b02c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.KeyValueWithTimestampStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; @@ -85,8 +85,8 @@ public StateStore getStateStore(final String name) { final StateStore global = stateManager.getGlobalStore(name); if (global != null) { - if (global instanceof KeyValueWithTimestampStore) { - return new KeyValueWithTimestampStoreReadOnlyDecorator((KeyValueWithTimestampStore) global); + if (global instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadOnlyDecorator((TimestampedKeyValueStore) global); } else if (global instanceof KeyValueStore) { return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global); } else if (global instanceof WindowStore) { @@ -109,8 +109,8 @@ public StateStore getStateStore(final String name) { } final StateStore store = stateManager.getStore(name); - if (store instanceof KeyValueWithTimestampStore) { - return new KeyValueWithTimestampStoreReadWriteDecorator((KeyValueWithTimestampStore) store); + if (store instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store); } else if (store instanceof KeyValueStore) { return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); } else if (store instanceof WindowStore) { @@ -301,11 +301,11 @@ public V delete(final K key) { } } - private static class KeyValueWithTimestampStoreReadOnlyDecorator - extends StateStoreReadOnlyDecorator> - implements KeyValueWithTimestampStore { + private static class TimestampedKeyValueStoreReadOnlyDecorator + extends StateStoreReadOnlyDecorator> + implements TimestampedKeyValueStore { - private KeyValueWithTimestampStoreReadOnlyDecorator(final KeyValueWithTimestampStore inner) { + private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore inner) { super(inner); } @@ -546,11 +546,11 @@ public V delete(final K key) { } } - private static class KeyValueWithTimestampStoreReadWriteDecorator - extends StateStoreReadWriteDecorator> - implements KeyValueWithTimestampStore { + private static class TimestampedKeyValueStoreReadWriteDecorator + extends StateStoreReadWriteDecorator> + implements TimestampedKeyValueStore { - private KeyValueWithTimestampStoreReadWriteDecorator(final KeyValueWithTimestampStore inner) { + private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore inner) { super(inner); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStore.java similarity index 95% rename from streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java rename to streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStore.java index baa20b83a6e22..6e25d8b291b82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueWithTimestampStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedKeyValueStore.java @@ -22,7 +22,7 @@ * @param The key type * @param The value type */ -public interface KeyValueWithTimestampStore extends KeyValueStore> { +public interface TimestampedKeyValueStore extends KeyValueStore> { /** * Update the value and timestamp associated with this key. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java similarity index 94% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java index f91ac31634c37..74207ef493188 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueWithTimestampStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueWithTimestampStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -32,7 +32,7 @@ import java.util.NavigableMap; import java.util.TreeMap; -public class InMemoryKeyValueWithTimestampStore implements KeyValueWithTimestampStore { +public class InMemoryTimestampedKeyValueStore implements TimestampedKeyValueStore { private final String name; private final Serde keySerde; private final ValueAndTimestampSerde valueAndTimestampSerde; @@ -41,9 +41,9 @@ public class InMemoryKeyValueWithTimestampStore implements KeyValueWithTim private StateSerdes> serdes; - public InMemoryKeyValueWithTimestampStore(final String name, - final Serde keySerde, - final ValueAndTimestampSerde valueAndTimestampSerde) { + public InMemoryTimestampedKeyValueStore(final String name, + final Serde keySerde, + final ValueAndTimestampSerde valueAndTimestampSerde) { this.name = name; this.keySerde = keySerde; this.valueAndTimestampSerde = valueAndTimestampSerde; From 965f1e616045e82739506ee81e694f5d25ef39c0 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 18 Jan 2019 18:00:57 -0800 Subject: [PATCH 3/4] Added Unit Tests --- .../internals/ProcessorContextImplTest.java | 145 ++++++++++++++++-- 1 file changed, 133 insertions(+), 12 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index b29b04c14de68..829d91c45f960 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -27,9 +27,12 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.streams.state.internals.ValueAndTimestampImpl; import org.junit.Before; import org.junit.Test; @@ -55,35 +58,44 @@ public class ProcessorContextImplTest { private ProcessorContextImpl context; private static final String KEY = "key"; - private static final long VAL = 42L; + private static final long VALUE = 42L; + private static final ValueAndTimestamp VALUE_AND_TIMESTAMP = new ValueAndTimestampImpl<>(42L, 21L); private static final String STORE_NAME = "underlying-store"; private boolean flushExecuted; private boolean putExecuted; + private boolean putWithTimestampExecuted; private boolean putIfAbsentExecuted; + private boolean putIfAbsentWithTimestampExecuted; private boolean putAllExecuted; private boolean deleteExecuted; private boolean removeExecuted; private boolean put3argExecuted; private KeyValueIterator rangeIter; + private KeyValueIterator> timestampedRangeIter; private KeyValueIterator allIter; + private KeyValueIterator> timestampedAllIter; - private List, Long>> iters = new ArrayList<>(7); + private final List, Long>> iters = new ArrayList<>(7); private WindowStoreIterator windowStoreIter; @Before public void setup() { flushExecuted = false; putExecuted = false; + putWithTimestampExecuted = false; putIfAbsentExecuted = false; + putIfAbsentWithTimestampExecuted = false; putAllExecuted = false; deleteExecuted = false; removeExecuted = false; put3argExecuted = false; rangeIter = mock(KeyValueIterator.class); + timestampedRangeIter = mock(KeyValueIterator.class); allIter = mock(KeyValueIterator.class); + timestampedAllIter = mock(KeyValueIterator.class); windowStoreIter = mock(WindowStoreIterator.class); for (int i = 0; i < 7; i++) { @@ -99,11 +111,13 @@ public void setup() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock()); expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock()); expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock()); expect(stateManager.getGlobalStore(anyString())).andReturn(null); expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getStore("LocalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock()); expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock()); expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock()); @@ -120,7 +134,7 @@ public void setup() { ); context.setCurrentNode(new ProcessorNode("fake", null, - new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", "LocalSessionStore")))); + new HashSet<>(asList("LocalKeyValueStore", "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalSessionStore")))); } @Test @@ -134,10 +148,30 @@ public void globalKeyValueStoreShouldBeReadOnly() { checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()"); checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()"); - assertEquals((Long) VAL, store.get(KEY)); + assertEquals((Long) VALUE, store.get(KEY)); assertEquals(rangeIter, store.range("one", "two")); assertEquals(allIter, store.all()); - assertEquals(VAL, store.approximateNumEntries()); + assertEquals(VALUE, store.approximateNumEntries()); + }); + } + + @Test + public void globalTimestampedKeyValueStoreShouldBeReadOnly() { + doTest("GlobalTimestampedKeyValueStore", (Consumer>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + checkThrowsUnsupportedOperation(store::flush, "flush()"); + checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()"); + checkThrowsUnsupportedOperation(() -> store.put("1", new ValueAndTimestampImpl<>(1L, 1L)), "put()"); + checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L, 1L), "putIfAbsent()"); + checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", new ValueAndTimestampImpl<>(1L, 1L)), "putIfAbsent()"); + checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()"); + checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()"); + + assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY)); + assertEquals(timestampedRangeIter, store.range("one", "two")); + assertEquals(timestampedAllIter, store.all()); + assertEquals(VALUE, store.approximateNumEntries()); }); } @@ -153,7 +187,7 @@ public void globalWindowStoreShouldBeReadOnly() { assertEquals(iters.get(0), store.fetchAll(0L, 0L)); assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L)); - assertEquals((Long) VAL, store.fetch(KEY, 1L)); + assertEquals((Long) VALUE, store.fetch(KEY, 1L)); assertEquals(iters.get(2), store.all()); }); } @@ -194,10 +228,43 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() { store.delete("1"); assertTrue(deleteExecuted); - assertEquals((Long) VAL, store.get(KEY)); + assertEquals((Long) VALUE, store.get(KEY)); assertEquals(rangeIter, store.range("one", "two")); assertEquals(allIter, store.all()); - assertEquals(VAL, store.approximateNumEntries()); + assertEquals(VALUE, store.approximateNumEntries()); + }); + } + + @Test + public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { + doTest("LocalTimestampedKeyValueStore", (Consumer>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + store.flush(); + assertTrue(flushExecuted); + + store.put("1", 1L, 1L); + assertTrue(putExecuted); + + store.put("1", new ValueAndTimestampImpl<>(1L, 1L)); + assertTrue(putWithTimestampExecuted); + + store.putIfAbsent("1", 1L, 1L); + assertTrue(putIfAbsentExecuted); + + store.putIfAbsent("1", new ValueAndTimestampImpl<>(1L, 1L)); + assertTrue(putIfAbsentWithTimestampExecuted); + + store.putAll(Collections.emptyList()); + assertTrue(putAllExecuted); + + store.delete("1"); + assertTrue(deleteExecuted); + + assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY)); + assertEquals(timestampedRangeIter, store.range("one", "two")); + assertEquals(timestampedAllIter, store.all()); + assertEquals(VALUE, store.approximateNumEntries()); }); } @@ -218,7 +285,7 @@ public void localWindowStoreShouldNotAllowInitOrClose() { assertEquals(iters.get(0), store.fetchAll(0L, 0L)); assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L)); - assertEquals((Long) VAL, store.fetch(KEY, 1L)); + assertEquals((Long) VALUE, store.fetch(KEY, 1L)); assertEquals(iters.get(2), store.all()); }); } @@ -250,8 +317,8 @@ private KeyValueStore keyValueStoreMock() { initStateStoreMock(keyValueStoreMock); - expect(keyValueStoreMock.get(KEY)).andReturn(VAL); - expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL); + expect(keyValueStoreMock.get(KEY)).andReturn(VALUE); + expect(keyValueStoreMock.approximateNumEntries()).andReturn(VALUE); expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter); expect(keyValueStoreMock.all()).andReturn(allIter); @@ -286,6 +353,60 @@ private KeyValueStore keyValueStoreMock() { return keyValueStoreMock; } + @SuppressWarnings("unchecked") + private TimestampedKeyValueStore timestampedKeyValueStoreMock() { + final TimestampedKeyValueStore timestampedKeyValueStoreMock = mock(TimestampedKeyValueStore.class); + + initStateStoreMock(timestampedKeyValueStoreMock); + + expect(timestampedKeyValueStoreMock.get(KEY)).andReturn(VALUE_AND_TIMESTAMP); + expect(timestampedKeyValueStoreMock.approximateNumEntries()).andReturn(VALUE); + + expect(timestampedKeyValueStoreMock.range("one", "two")).andReturn(timestampedRangeIter); + expect(timestampedKeyValueStoreMock.all()).andReturn(timestampedAllIter); + + + timestampedKeyValueStoreMock.put(anyString(), anyLong(), anyLong()); + expectLastCall().andAnswer(() -> { + putExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.put(anyString(), anyObject(ValueAndTimestamp.class)); + expectLastCall().andAnswer(() -> { + putWithTimestampExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.putIfAbsent(anyString(), anyLong(), anyLong()); + expectLastCall().andAnswer(() -> { + putIfAbsentExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.putIfAbsent(anyString(), anyObject(ValueAndTimestamp.class)); + expectLastCall().andAnswer(() -> { + putIfAbsentWithTimestampExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.putAll(anyObject(List.class)); + expectLastCall().andAnswer(() -> { + putAllExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.delete(anyString()); + expectLastCall().andAnswer(() -> { + deleteExecuted = true; + return null; + }); + + replay(timestampedKeyValueStoreMock); + + return timestampedKeyValueStoreMock; + } + private WindowStore windowStoreMock() { final WindowStore windowStore = mock(WindowStore.class); @@ -294,7 +415,7 @@ private WindowStore windowStoreMock() { expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(iters.get(0)); expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(1)); expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter); - expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL); + expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE); expect(windowStore.all()).andReturn(iters.get(2)); windowStore.put(anyString(), anyLong()); From 4b4f0c0aff6cba362831d525eb9695d8e2582bc5 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 22 Jan 2019 16:21:09 -0800 Subject: [PATCH 4/4] Github comments --- .../InMemoryTimestampedKeyValueStore.java | 41 ++----------------- 1 file changed, 3 insertions(+), 38 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java index 74207ef493188..e104a706b1927 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimestampedKeyValueStore.java @@ -22,13 +22,11 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -151,13 +149,13 @@ public synchronized KeyValueIterator> range(final K from final K to) { return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + new InMemoryKeyValueStore.InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); } @Override public synchronized KeyValueIterator> all() { final TreeMap> copy = new TreeMap<>(this.map); - return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator())); + return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueStore.InMemoryKeyValueIterator<>(copy.entrySet().iterator())); } @Override @@ -176,37 +174,4 @@ public void close() { this.open = false; } - static class InMemoryKeyValueIterator implements KeyValueIterator { - private final Iterator> iter; - - InMemoryKeyValueIterator(final Iterator> iter) { - this.iter = iter; - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public KeyValue next() { - final Map.Entry entry = iter.next(); - return new KeyValue<>(entry.getKey(), entry.getValue()); - } - - @Override - public void remove() { - iter.remove(); - } - - @Override - public void close() { - // do nothing - } - - @Override - public K peekNextKey() { - throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); - } - } }