Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
Expand Down Expand Up @@ -83,7 +85,9 @@ public StateStore getStateStore(final String name) {

final StateStore global = stateManager.getGlobalStore(name);
if (global != null) {
if (global instanceof KeyValueStore) {
if (global instanceof TimestampedKeyValueStore) {
return new TimestampedKeyValueStoreReadOnlyDecorator((TimestampedKeyValueStore) global);
} else if (global instanceof KeyValueStore) {
return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global);
} else if (global instanceof WindowStore) {
return new WindowStoreReadOnlyDecorator((WindowStore) global);
Expand All @@ -105,7 +109,9 @@ public StateStore getStateStore(final String name) {
}

final StateStore store = stateManager.getStore(name);
if (store instanceof KeyValueStore) {
if (store instanceof TimestampedKeyValueStore) {
return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store);
} else if (store instanceof KeyValueStore) {
return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
} else if (store instanceof WindowStore) {
return new WindowStoreReadWriteDecorator((WindowStore) store);
Expand Down Expand Up @@ -295,6 +301,72 @@ public V delete(final K key) {
}
}

private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<TimestampedKeyValueStore<K, V>>
implements TimestampedKeyValueStore<K, V> {

private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
super(inner);
}

@Override
public ValueAndTimestamp<V> get(final K key) {
return getInner().get(key);
}

@Override
public KeyValueIterator<K, ValueAndTimestamp<V>> range(final K from,
final K to) {
return getInner().range(from, to);
}

@Override
public KeyValueIterator<K, ValueAndTimestamp<V>> all() {
return getInner().all();
}

@Override
public long approximateNumEntries() {
return getInner().approximateNumEntries();
}

@Override
public void put(final K key,
final ValueAndTimestamp<V> valueAndTimestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void put(final K key,
final V value,
final long windowStartTimestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public ValueAndTimestamp<V> putIfAbsent(final K key,
final ValueAndTimestamp<V> valueAndTimestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public ValueAndTimestamp<V> putIfAbsent(final K key,
final V value,
final long timestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void putAll(final List entries) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public ValueAndTimestamp<V> delete(final K key) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
}

private static class WindowStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<WindowStore<K, V>>
implements WindowStore<K, V> {
Expand Down Expand Up @@ -474,6 +546,72 @@ public V delete(final K key) {
}
}

private static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<TimestampedKeyValueStore<K, V>>
implements TimestampedKeyValueStore<K, V> {

private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
super(inner);
}

@Override
public ValueAndTimestamp<V> get(final K key) {
return wrapped().get(key);
}

@Override
public KeyValueIterator<K, ValueAndTimestamp<V>> range(final K from,
final K to) {
return wrapped().range(from, to);
}

@Override
public KeyValueIterator<K, ValueAndTimestamp<V>> all() {
return wrapped().all();
}

@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}

@Override
public void put(final K key,
final ValueAndTimestamp<V> valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp);
}

@Override
public void put(final K key,
final V value,
final long timestamp) {
wrapped().put(key, value, timestamp);
}

@Override
public ValueAndTimestamp<V> putIfAbsent(final K key,
final ValueAndTimestamp<V> valueAndTimestamp) {
return wrapped().putIfAbsent(key, valueAndTimestamp);
}

@Override
public ValueAndTimestamp<V> putIfAbsent(final K key,
final V value,
final long timestamp) {
return wrapped().putIfAbsent(key, value, timestamp);
}

@Override
public void putAll(final List<KeyValue<K, ValueAndTimestamp<V>>> entries) {
wrapped().putAll(entries);
}

@Override
public ValueAndTimestamp<V> delete(final K key) {
return wrapped().delete(key);
}
}

private static class WindowStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<WindowStore<K, V>>
implements WindowStore<K, V> {
Expand Down Expand Up @@ -560,7 +698,8 @@ public void remove(final Windowed<K> sessionKey) {
}

@Override
public void put(final Windowed<K> sessionKey, final AGG aggregate) {
public void put(final Windowed<K> sessionKey,
final AGG aggregate) {
wrapped().put(sessionKey, aggregate);
}

Expand All @@ -575,4 +714,5 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
return wrapped().fetch(from, to);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

/**
* A key-(value/timestamp) store that supports put/get/delete and range queries.
*
* @param <K> The key type
* @param <V> The value type
*/
public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {

/**
* Update the value and timestamp associated with this key.
*
* @param key The key to associate the value to
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @param timestamp the timestamp to be stored next to the value; ignored if {@code value} is {@code null}
* @throws NullPointerException If {@code null} is used for key.
*/
void put(K key, V value, long timestamp);

/**
* Update the value associated with this key, unless a value is already associated with the key.
*
* @param key The key to associate the value to
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @param timestamp the timestamp to be stored next to the value; ignored if {@code value} is {@code null}
* @return The old value and timestamp or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
*/
ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.KeyValue;

/**
* Combines a value from a {@link KeyValue} with a timestamp.
*
* @param <V>
*/
public interface ValueAndTimestamp<V> {
V value();
long timestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public InMemoryKeyValueStore(final String name,
this.keySerde = keySerde;
this.valueSerde = valueSerde;

// TODO: when we have serde associated with class types, we can
// improve this situation by passing the comparator here.
this.map = new TreeMap<>();
}

Expand Down Expand Up @@ -159,10 +157,10 @@ public void close() {
this.open = false;
}

private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was maybe de-privatized for use in InMemoryTimestampedKeyValueStore, but that class has its own inner iterator class.

Should we restore the private modifier?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Good catch.

private final Iterator<Map.Entry<K, V>> iter;

private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter) {
InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter) {
this.iter = iter;
}

Expand Down
Loading