Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1247,16 +1247,16 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei

final String subscriptionStoreName = renamed
.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME);
builder.addStateStore(
new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde));
final StoreFactory subscriptionStoreFactory =
new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde);

final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
"-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new StatefulProcessorNode<>(
subscriptionReceiveName,
new ProcessorParameters<>(
new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreName, combinedKeySchema),
new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
subscriptionReceiveName),
new String[]{subscriptionStoreName}
);
Expand All @@ -1278,12 +1278,11 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei

final String foreignTableJoinName = renamed
.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR);
final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious is the final goal to eventually remove StatefulProcessorNode and only use ProcessorGraphNode?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question -- basically yes and no. Almost everything has/will be converted from StatefulProcesorNode to ProcessorGraphNode, but there are a few exceptions where we still need the StatefulProcessorNode to connect a processor and state store. For example any processors that access upstream state via a value getter, or a custom ProcessorSupplier passed into the .process/.processValues operator that doesn't implement #stores and manually adds stores by calling StreamsBuilder#addStateStore instead.

I actually have the PR for this ready but it was done on top of this so I was waiting for this one to be merged before calling for review. But it's ready now if you want to take a look (though note that it still can't be merged until Almog's TableSuppressNode PR is merged). See #18195

final ProcessorGraphNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
new ProcessorParameters<>(
new ForeignTableJoinProcessorSupplier<>(subscriptionStoreName, combinedKeySchema),
new ForeignTableJoinProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
foreignTableJoinName
),
new String[]{subscriptionStoreName}
)
);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,39 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;

public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
private final String storeName;
private final StoreFactory subscriptionStoreFactory;
private final CombinedKeySchema<KO, K> keySchema;
private boolean useVersionedSemantics = false;

public ForeignTableJoinProcessorSupplier(
final String storeName,
final CombinedKeySchema<KO, K> keySchema) {

this.storeName = storeName;
public ForeignTableJoinProcessorSupplier(final StoreFactory subscriptionStoreFactory,
final CombinedKeySchema<KO, K> keySchema) {
this.subscriptionStoreFactory = subscriptionStoreFactory;
this.keySchema = keySchema;
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(subscriptionStoreFactory));
}

@Override
public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
return new KTableKTableJoinProcessor();
Expand Down Expand Up @@ -80,7 +88,7 @@ public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> cont
internalProcessorContext.taskId().toString(),
internalProcessorContext.metrics()
);
subscriptionStore = internalProcessorContext.getStateStore(storeName);
subscriptionStore = internalProcessorContext.getStateStore(subscriptionStoreFactory.storeName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,42 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Set;

public class SubscriptionReceiveProcessorSupplier<K, KO>
implements ProcessorSupplier<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class);

private final String storeName;
private final StoreFactory subscriptionStoreFactory;
private final CombinedKeySchema<KO, K> keySchema;

public SubscriptionReceiveProcessorSupplier(
final String storeName,
final CombinedKeySchema<KO, K> keySchema) {
public SubscriptionReceiveProcessorSupplier(final StoreFactory subscriptionStoreFactory,
final CombinedKeySchema<KO, K> keySchema) {

this.storeName = storeName;
this.subscriptionStoreFactory = subscriptionStoreFactory;
this.keySchema = keySchema;
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(subscriptionStoreFactory));
}

@Override
public Processor<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {

return new ContextualProcessor<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
return new ContextualProcessor<>() {

private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
private Sensor droppedRecordsSensor;
Expand All @@ -68,7 +78,7 @@ public void init(final ProcessorContext<CombinedKey<KO, K>, Change<ValueAndTimes
internalProcessorContext.taskId().toString(),
internalProcessorContext.metrics()
);
store = internalProcessorContext.getStateStore(storeName);
store = internalProcessorContext.getStateStore(subscriptionStoreFactory.storeName());

keySchema.init(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

public class ForeignTableJoinNode<K, V> extends StatefulProcessorNode<K, V> implements VersionedSemanticsGraphNode {
public class ForeignTableJoinNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode {

public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> processorParameters,
final String[] storeNames) {
super(processorParameters.processorName(), processorParameters, storeNames);
public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(processorParameters.processorName(), processorParameters);
}

@SuppressWarnings("unchecked")
@Override
public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
final ProcessorSupplier<?, ?, ?, ?> processorSupplier = processorParameters().processorSupplier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
Expand Down Expand Up @@ -2153,6 +2154,84 @@ public void shouldWrapProcessorsForTableTableOuterJoin() {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
}

@Test
public void shouldWrapProcessorsForForeignKeyInnerJoin() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);

final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);

final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

final KTable<String, String> left = builder.table("input1", Consumed.as("input1"));
final KTable<String, String> right = builder.table("input2", Consumed.as("input2"));

left.join(right,
value -> value,
(v1, v2) -> v1 + v2,
TableJoined.as("join"),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-store").withValueSerde(Serdes.String()))
.toStream(Named.as("toStream"))
.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(9));
assertThat(counter.wrappedProcessorNames().toString(), counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"input1",
"input2",
"join-foreign-join-subscription",
"join-subscription-join-foreign",
"join-subscription-registration-processor",
"join-subscription-receive",
"join-result",
"join-subscription-response-resolver",
"toStream"
));

assertThat(counter.numUniqueStateStores(), CoreMatchers.is(4)); // table1, table2, subscription store, and join materialized
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(5));
}

@Test
public void shouldWrapProcessorsForForeignKeyLeftJoin() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);

final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);

final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

final KTable<String, String> left = builder.table("input1", Consumed.as("input1"));
final KTable<String, String> right = builder.table("input2", Consumed.as("input2"));

left.leftJoin(right,
value -> value,
(v1, v2) -> v1 + v2,
TableJoined.as("l-join"),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-store").withValueSerde(Serdes.String()))
.toStream(Named.as("toStream")) // 6
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the comment // 6 is about? I think this test case still only have 5 store?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy-paste error 🙃 thanks

And yeah, it should have 5 stores

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill address this in my next PR so i doin't have to run the tests again before merging

.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(9));
assertThat(counter.wrappedProcessorNames().toString(), counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"input1",
"input2",
"l-join-foreign-join-subscription",
"l-join-subscription-join-foreign",
"l-join-subscription-registration-processor",
"l-join-subscription-receive",
"l-join-result",
"l-join-subscription-response-resolver",
"toStream"
));

assertThat(counter.numUniqueStateStores(), CoreMatchers.is(4)); // table1, table2, subscription store, and join materialized
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(5));
}

@Test
public void shouldAllowStreamsFromSameTopic() {
builder.stream("topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -71,10 +71,13 @@ public void setUp() {
context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir);

final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
processor = new ForeignTableJoinProcessorSupplier<String, String, String>(storeBuilder().name(), COMBINED_KEY_SCHEMA).get();
processor = new ForeignTableJoinProcessorSupplier<String, String, String>(
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder()),
COMBINED_KEY_SCHEMA
).get();
stateStore = storeBuilder.build();
context.addStateStore(stateStore);
stateStore.init((StateStoreContext) context, stateStore);
stateStore.init(context, stateStore);
processor.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -507,7 +508,10 @@ public void shouldPropagateNullIfNoFKValAvailableV1() {
private SubscriptionReceiveProcessorSupplier<String, String> supplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder) {

return new SubscriptionReceiveProcessorSupplier<>(storeBuilder.name(), COMBINED_KEY_SCHEMA);
return new SubscriptionReceiveProcessorSupplier<>(
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
COMBINED_KEY_SCHEMA
);
}

private StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder() {
Expand Down