Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void init(final ProcessorContext context) {
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);

store = (SessionStore<K, Agg>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void init(final ProcessorContext context) {
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);

windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context), sendOldValues);
tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
public void init(final ProcessorContext context) {
super.init(context);
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/**
Expand All @@ -30,45 +29,24 @@
* @param <V>
*/
class TupleForwarder<K, V> {
private final CachedStateStore cachedStateStore;
private final boolean cachingEnabled;
private final ProcessorContext context;

@SuppressWarnings("unchecked")
TupleForwarder(final StateStore store,
final ProcessorContext context,
final ForwardingCacheFlushListener flushListener,
final ForwardingCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.cachedStateStore = cachedStateStore(store);
cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
this.context = context;
if (this.cachedStateStore != null) {
cachedStateStore.setFlushListener(flushListener, sendOldValues);
}
}

private CachedStateStore cachedStateStore(final StateStore store) {
if (store instanceof CachedStateStore) {
return (CachedStateStore) store;
} else if (store instanceof WrappedStateStore) {
StateStore wrapped = ((WrappedStateStore) store).wrapped();

while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR replaces a while loop with a single statement, does this mean Streams is guaranteed to only ever have one level of a WrappedStateStore?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TupleForwarder is part of the DSL and thus we know the wrapping hierarchy, and hence, this should be save. We know that the caching store is either inside metered stores, or there is no caching store.

wrapped = ((WrappedStateStore) wrapped).wrapped();
}

if (!(wrapped instanceof CachedStateStore)) {
return null;
}

return (CachedStateStore) wrapped;
}
return null;
}

public void maybeForward(final K key,
final V newValue,
final V oldValue) {
if (cachedStateStore == null) {
context.forward(key, new Change<>(newValue, oldValue));
if (cachingEnabled) {
return;
}
context.forward(key, new Change<>(newValue, oldValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ public Cancellable schedule(final Duration interval,
return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
}

private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends WrappedStateStore<T> {
private abstract static class StateStoreReadOnlyDecorator<T extends StateStore, K, V>
extends WrappedStateStore<T, K, V> {

static final String ERROR_MESSAGE = "Global store is read only";

private StateStoreReadOnlyDecorator(final T inner) {
Expand All @@ -229,7 +231,7 @@ public void close() {
}

private static class KeyValueStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>>
extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>, K, V>
implements KeyValueStore<K, V> {

private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
Expand Down Expand Up @@ -281,7 +283,7 @@ public V delete(final K key) {
}

private static class WindowStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<WindowStore<K, V>>
extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V>
implements WindowStore<K, V> {

private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
Expand Down Expand Up @@ -338,7 +340,7 @@ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
}

private static class SessionStoreReadOnlyDecorator<K, AGG>
extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>>
extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
implements SessionStore<K, AGG> {

private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
Expand Down Expand Up @@ -388,7 +390,9 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
}
}

private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends WrappedStateStore<T> {
private abstract static class StateStoreReadWriteDecorator<T extends StateStore, K, V>
extends WrappedStateStore<T, K, V> {

static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";

private StateStoreReadWriteDecorator(final T inner) {
Expand All @@ -408,7 +412,7 @@ public void close() {
}

private static class KeyValueStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<KeyValueStore<K, V>>
extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V>
implements KeyValueStore<K, V> {

private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
Expand Down Expand Up @@ -460,7 +464,7 @@ public V delete(final K key) {
}

private static class WindowStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<WindowStore<K, V>>
extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
implements WindowStore<K, V> {

private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
Expand Down Expand Up @@ -517,7 +521,7 @@ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
}

private static class SessionStoreReadWriteDecorator<K, AGG>
extends StateStoreReadWriteDecorator<SessionStore<K, AGG>>
extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
implements SessionStore<K, AGG> {

private SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public interface CachedStateStore<K, V> {
* @param listener
* @param sendOldValues
*/
void setFlushListener(final CacheFlushListener<K, V> listener,
final boolean sendOldValues);
boolean setFlushListener(final CacheFlushListener<K, V> listener,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return true if the listener was registered, ie, if the "metered stores" wraps a "caching store".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever! I tried to do this change a little while ago and got hung up on this point.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. blushing

final boolean sendOldValues);
}
Loading