Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void apply(final K key,
final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
} finally {
context.setCurrentNode(prev);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
return aggregateBuilder.build(
functionName,
new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
new KeyValueWithTimestampStoreMaterializer<>(materializedInternal).materialize(),
aggregateSupplier,
materializedInternal.queryableStoreName(),
materializedInternal.keySerde(),
materializedInternal.valueSerde());
materializedInternal.valueSerde()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggre
final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
funcName,
new ProcessorParameters<>(aggregateSupplier, funcName),
new KeyValueStoreMaterializer<>(materialized).materialize(),
new KeyValueWithTimestampStoreMaterializer<>(materialized).materialize(),
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,7 +57,7 @@ public void enableSendingOldValues() {

private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {

private KeyValueStore<K, T> store;
private KeyValueStore<K, ValueAndTimestamp<T>> store;
private StreamsMetricsImpl metrics;
private TupleForwarder<K, T> tupleForwarder;

Expand All @@ -62,11 +66,14 @@ private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
StateStore store = context.getStateStore(storeName);
if (store instanceof WrappedStateStore) {
store = ((WrappedStateStore) store).wrappedStore();
}
this.store = ((KeyValueWithTimestampStoreMaterializer.KeyValueStoreFacade) store).inner;
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
}


@Override
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
Expand All @@ -79,23 +86,28 @@ key, value, context().topic(), context().partition(), context().offset()
return;
}

T oldAgg = store.get(key);
final ValueAndTimestamp<T> oldAggWithTimestamp = store.get(key);
final T oldAgg;
final long oldTimestamp;

if (oldAgg == null) {
if (oldAggWithTimestamp == null) {
oldAgg = initializer.apply();
oldTimestamp = -1;
} else {
oldAgg = oldAggWithTimestamp.value();
oldTimestamp = oldAggWithTimestamp.timestamp();
}

T newAgg = oldAgg;

// try to add the new value
newAgg = aggregator.apply(key, value, newAgg);
final T newAgg = aggregator.apply(key, value, oldAgg);
final long newTimestamp = Math.max(oldTimestamp, context().timestamp());

// update the store with the new value
store.put(key, newAgg);
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}


@Override
public KTableValueGetterSupplier<K, T> view() {

Expand All @@ -114,16 +126,16 @@ public String[] storeNames() {

private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {

private KeyValueStore<K, T> store;
private ReadOnlyKeyValueStore<K, ValueAndTimestamp<T>> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
store = (ReadOnlyKeyValueStore<K, ValueAndTimestamp<T>>) context.getStateStore(storeName);
}

@Override
public T get(final K key) {
public ValueAndTimestamp<T> get(final K key) {
return store.get(key);
}

Expand Down
Loading