Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -27,6 +28,7 @@
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;

/**
* {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
Expand Down Expand Up @@ -874,6 +876,73 @@ void to(final String topic,
void to(final TopicNameExtractor<K, V> topicExtractor,
final Produced<K, V> produced);

/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/
KTable<K, V> toTable();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the change that we might insert a repartition topic in the key was change upstream, we should extend the JavaDocs accordingly. Compare the JavaDocs of groupByKey() that contain a paragraph about auto-repartitioning. We should add a similar (or maybe even the exact some if suitable) for all four overloads of toTable(), too.


/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @param named a {@link Named} config used to name the processor in the topology
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/
KTable<K, V> toTable(final Named named);

/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/
KTable<K, V> toTable(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* @return a {@link KTable} that contains the same records as this {@code KStream}
*/
KTable<K, V> toTable(final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and default serializers and deserializers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
Expand All @@ -27,6 +29,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
Expand All @@ -45,11 +48,13 @@
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;

import java.lang.reflect.Array;
import java.util.Arrays;
Expand All @@ -58,6 +63,8 @@
import java.util.Objects;
import java.util.Set;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;

public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K, V> {

static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
Expand Down Expand Up @@ -110,6 +117,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K

private static final String FOREACH_NAME = "KSTREAM-FOREACH-";

private static final String TO_KTABLE_NAME = "KSTREAM-TOTABLE-";

private final boolean repartitionRequired;

KStreamImpl(final String name,
Expand Down Expand Up @@ -600,6 +609,100 @@ private void to(final TopicNameExtractor<K, V> topicExtractor,
builder.addGraphNode(streamsGraphNode, sinkNode);
}

@Override
public KTable<K, V> toTable() {
return toTable(NamedInternal.empty());
}

@Override
public KTable<K, V> toTable(final Named named) {
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(Consumed.with(keySerde, valSerde));

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
builder,
TO_KTABLE_NAME);

Comment thread
highluck marked this conversation as resolved.
Outdated
return doToTable(named, materializedInternal);
}

@Override
public KTable<K, V> toTable(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return toTable(NamedInternal.empty(), materialized);
}

@Override
public KTable<K, V> toTable(final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized);
return doToTable(
named,
materializedInternal);
}

private KTable<K, V> doToTable(final Named named,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal) {
Objects.requireNonNull(named, "named can't be null");

final Serde<K> keySerdeOverride = materializedInternal.keySerde() == null
? keySerde
: materializedInternal.keySerde();
final Serde<V> valueSerdeOverride = materializedInternal.valueSerde() == null
? valSerde
: materializedInternal.valueSerde();

final NamedInternal namedInternal = new NamedInternal(named);
final String name = namedInternal.orElseGenerateWithPrefix(builder, TO_KTABLE_NAME);
final Set<String> subTopologySourceNodes;
final StreamsGraphNode tableParentNode;

if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String sourceName = createRepartitionedSource(
builder,
keySerdeOverride,
valueSerdeOverride,
name,
repartitionNodeBuilder
);

tableParentNode = repartitionNodeBuilder.build();
builder.addGraphNode(streamsGraphNode, tableParentNode);
subTopologySourceNodes = Collections.singleton(sourceName);
} else {
tableParentNode = streamsGraphNode;
subTopologySourceNodes = sourceNodes;
}

final KTableSource<K, V> tableSource = new KTableSource<>(
materializedInternal.storeName(),
materializedInternal.queryableStoreName()
);
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, name);
final StreamsGraphNode tableNode = new StreamToTableNode<>(
name,
processorParameters,
materializedInternal
);

builder.addGraphNode(tableParentNode, tableNode);

return new KTableImpl<K, V, V>(
name,
keySerdeOverride,
valueSerdeOverride,
subTopologySourceNodes,
materializedInternal.queryableStoreName(),
tableSource,
tableNode,
builder
);
}

@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector) {
return groupBy(keySelector, Grouped.with(null, valSerde));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;

/**
* Represents a KTable convert From KStream
*/
public class StreamToTableNode<K, V> extends StreamsGraphNode {

private final ProcessorParameters<K, V> processorParameters;
private final MaterializedInternal<K, V, ?> materializedInternal;

public StreamToTableNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
final MaterializedInternal<K, V, ?> materializedInternal) {
super(nodeName);
this.processorParameters = processorParameters;
this.materializedInternal = materializedInternal;
}

@Override
public String toString() {
return "StreamToTableNode{" +
", processorParameters=" + processorParameters +
", materializedInternal=" + materializedInternal +
"} " + super.toString();
}

@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();

final String processorName = processorParameters.processorName();
final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());

if (storeBuilder != null && ktableSource.queryableName() != null) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}
Loading