Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ <h1>Upgrade Guide and API Changes</h1>
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>

<h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
<p>
The <code>StreamsConfig</code> variable for configuration parameter <code>"topology.optimization"</code>
is renamed from <code>TOPOLOGY_OPTIMIZATION</code> to <code>TOPOLOGY_OPTIMIZATION_CONFIG</code>.
The old variable is deprecated. Note, that the parameter name itself is not affected.
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
</p>

<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
<p>
We added a new processing mode that improves application scalability using exactly-once guarantees
Expand Down
15 changes: 11 additions & 4 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String ADMIN_CLIENT_PREFIX = "admin.";

/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for disabling topology optimization
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
*/
public static final String NO_OPTIMIZATION = "none";

/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for enabling topology optimization
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for enabling topology optimization
*/
public static final String OPTIMIZE = "all";

Expand Down Expand Up @@ -524,7 +524,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.";

/** {@code topology.optimization} */
public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";

/** {@code upgrade.from} */
Expand Down Expand Up @@ -552,6 +552,13 @@ public class StreamsConfig extends AbstractConfig {
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface." +
" WARNING: This config is deprecated and will be removed in 3.0.0 release.";

/**
* {@code topology.optimization}
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
*/
@Deprecated
public static final String TOPOLOGY_OPTIMIZATION = TOPOLOGY_OPTIMIZATION_CONFIG;


private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
Expand Down Expand Up @@ -664,7 +671,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TOPOLOGY_OPTIMIZATION,
.define(TOPOLOGY_OPTIMIZATION_CONFIG,
Type.STRING,
NO_OPTIMIZATION,
in(NO_OPTIMIZATION, OPTIMIZE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
Expand All @@ -959,7 +959,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
Expand All @@ -986,7 +986,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
Expand Down Expand Up @@ -1014,7 +1014,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void buildAndOptimizeTopology(final Properties props) {

private void maybePerformOptimizations(final Properties props) {

if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG))) {
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
optimizeKTableSourceTopics();
maybeOptimizeRepartitionOperations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final Topology topology = builder.build(props);

final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
Expand Down Expand Up @@ -895,22 +895,22 @@ private void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInval
@Test
public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
final String expectedOptimizeConfig = "none";
final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION);
final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
}

@Test
public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
final String expectedOptimizeConfig = "all";
props.put(TOPOLOGY_OPTIMIZATION, "all");
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
final StreamsConfig config = new StreamsConfig(props);
final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION);
final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
}

@Test(expected = ConfigException.class)
public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
props.put(TOPOLOGY_OPTIMIZATION, "maybe");
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "maybe");
new StreamsConfig(props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void before() throws InterruptedException {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization);
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void before() {
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization)
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void shouldFetchLagsDuringRebalancing(final String optimization) throws
props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, FallbackPriorTaskAssignor.class.getName());
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i);
props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(stateStoreName + i).getAbsolutePath());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Prop
private Properties streamsConfiguration() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void shouldRestoreStateFromSourceTopic() throws Exception {
final StreamsBuilder builder = new StreamsBuilder();

final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

// restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
final int offsetLimitDelta = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public void close() {}
@Test
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception {
final Properties streamsConfiguration1 = streamsConfiguration();
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final Properties streamsConfiguration2 = streamsConfiguration();
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("source-table"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private void produceValueRange(final int key, final int start, final int endExcl
private Properties streamsConfiguration() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOpti
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
final Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final Topology topology = builder.build(properties);
assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern), is(1));
}
Expand Down Expand Up @@ -224,7 +224,7 @@ public void shouldFailWithSameRepartitionTopicNameInJoin() {
public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
.selectKey((k, v) -> k)
.groupByKey(Grouped.as("grouping"));
Expand Down Expand Up @@ -500,7 +500,7 @@ private Topology buildTopology(final String optimizationConfig) {

final Properties properties = new Properties();

properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
return builder.build(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedRe
@Test
public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
Expand Down Expand Up @@ -528,7 +528,7 @@ public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedRe
final StreamsBuilder builder = new StreamsBuilder();

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
Expand Down Expand Up @@ -653,7 +653,7 @@ public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedRe
final StreamsBuilder builder = new StreamsBuilder();

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
Expand Down Expand Up @@ -707,7 +707,7 @@ public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedRe
final StreamsBuilder builder = new StreamsBuilder();

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
Expand Down
Loading