From 14dc2fc5e4a7c8d538f289e55075e0f9d7d14d99 Mon Sep 17 00:00:00 2001
From: "Matthias J. Sax" Upgrade Guide and API Changes
- If you want to upgrade from 0.11.0.x to 1.0.0 you don't need to make any code changes as the public API is fully backward compatible. + If you want to upgrade from 0.10.2.x or 0.11.0.x to 1.0.x you don't need to do any code changes as the public API is fully backward compatible. However, some public APIs were deprecated and thus it is recommended to update your code eventually to allow for future upgrades. - See below for a complete list of 1.0.0 API and semantic changes that allow you to advance your application and/or simplify your code base. + See below a complete list of 1.0 and 0.11.0 API + and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
- If you want to upgrade from 0.10.2.x to 0.11.0 you don't need to make any code changes as the public API is fully backward compatible. - However, some configuration parameters were deprecated and thus it is recommended to update your code eventually to allow for future upgrades. - See below for a complete list of 0.11.0 API and semantic changes that allow you to advance your application and/or simplify your code base. + If you want to upgrade from 0.10.1.x to 1.0.x see the Upgrade Sections for 0.10.2, + 0.11.0, and + 1.0. + Note, that a brokers on-disk message format must be on version 0.10 or higher to run a Kafka Streams application version 1.0 or higher. + See below a complete list of 0.10.2, 0.11.0, + and 1.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
- If you want to upgrade from 0.10.1.x to 0.10.2, see the Upgrade Section for 0.10.2. - It highlights incompatible changes you need to consider to upgrade your code and application. - See below for a complete list of 0.10.2 API and semantic changes that allow you to advance your application and/or simplify your code base. -
- -
- If you want to upgrade from 0.10.0.x to 0.10.1, see the Upgrade Section for 0.10.1.
- It highlights incompatible changes you need to consider to upgrade your code and application.
- See below a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+ Upgrading from 0.10.0.x to 1.0.x directly is also possible.
+ Note, that a brokers must be on version 0.10.1 or higher and on-disk message format must be on version 0.10 or higher
+ to run a Kafka Streams application version 1.0 or higher.
+ See Streams API changes in 0.10.1, Streams API changes in 0.10.2,
+ Streams API changes in 0.11.0, and Streams API changes in 1.0
+ for a complete list of API changes.
+ Upgrading to 1.0.2 requires two rolling bounces with config upgrade.from="0.10.0" set for first upgrade phase
+ (cf. KIP-268).
+ As an alternative, and offline upgrade is also possible.
upgrade.from is set to "0.10.0" for new version 1.0.2upgrade.mode Upgrading from 0.10.0.x to 1.0.0 or 1.0.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+diff --git a/docs/upgrade.html b/docs/upgrade.html index 3ac293d8498dd..2b377ef04bf83 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -199,17 +199,77 @@
ProcessorContext#schedule(), Processor#punctuate() and KStreamBuilder, TopologyBuilder are being deprecated by new APIs.
- We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
+ We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
ProcessorContext#schedule(), Processor#punctuate() and KStreamBuilder, TopologyBuilder are being deprecated by new APIs.
+ We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
+ key.serde, value.serde and timestamp.extractor in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. ProcessorContext#schedule(), Processor#punctuate() and KStreamBuilder, TopologyBuilder are being deprecated by new APIs.
+ We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
+ key.serde, value.serde and timestamp.extractor in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. TimestampExtractor interface was changed. StreamsMetric interface was changed. upgrade.from="0.10.0" set for first upgrade phase
+ (cf. KIP-268).
+ As an alternative, and offline upgrade is also possible.
+ upgrade.from is set to "0.10.0" for new version 0.11.0.3 upgrade.mode upgrade.from added that allows rolling bounce upgrade from version 0.10.0.x Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 0.11.0.0 before upgrading. @@ -258,11 +318,55 @@
key.serde, value.serde and timestamp.extractor in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. key.serde, value.serde and timestamp.extractor in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. TimestampExtractor interface was changed. StreamsMetric interface was changed. upgrade.from="0.10.0" set for first upgrade phase
+ (cf. KIP-268).
+ As an alternative, and offline upgrade is also possible.
+ upgrade.from is set to "0.10.0" for new version 0.11.0.3 upgrade.mode upgrade.from added that allows rolling bounce upgrade from version 0.10.0.x upgrade.from="0.10.0" set for first upgrade phase
+ (cf. KIP-268).
+ As an alternative, and offline upgrade is also possible.
+ upgrade.from is set to "0.10.0" for new version 0.10.2.2 upgrade.mode upgrade.from added that allows rolling bounce upgrade from version 0.10.0.x retries default value was changed from 0 to 10. The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE.
@@ -491,6 +624,23 @@ upgrade.from="0.10.0" set for first upgrade phase
+ (cf. KIP-268).
+ As an alternative, and offline upgrade is also possible.
+ upgrade.from is set to "0.10.0" for new version 0.10.1.2 upgrade.mode
0.10.0.0 has potential breaking changes (please review before upgrading) and possible performance impact following the upgrade. By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade.
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
-
Notes to clients with version 0.9.0.0: Due to a bug introduced in 0.9.0.0, clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 before brokers are upgraded to 0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients. +
For a rolling upgrade:
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b7a03dcd7528f..11be2f3abf02f 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -60,6 +60,10 @@ versions += [ scalaLogging: "3.7.2", jopt: "5.0.4", junit: "4.12", + kafka_0100: "0.10.0.1", + kafka_0101: "0.10.1.1", + kafka_0102: "0.10.2.1", + kafka_0110: "0.11.0.2", lz4: "1.4", metrics: "2.2.0", // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta @@ -97,12 +101,15 @@ libs += [ jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty", jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey", jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh", - jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh", jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh", + jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh", + joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", junit: "junit:junit:$versions.junit", + kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100", + kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101", + kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", + kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110", log4j: "log4j:log4j:$versions.log4j", - scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", - joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock", @@ -110,6 +117,7 @@ libs += [ reflections: "org.reflections:reflections:$versions.reflections", rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB", scalaLibrary: "org.scala-lang:scala-library:$versions.scala", + scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", scalaReflect: "org.scala-lang:scala-reflect:$versions.scala", scalatest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalatest", scoveragePlugin: "org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage", diff --git a/settings.gradle b/settings.gradle index e599d01215cc9..b3b9d306f5c23 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,5 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'log4j-appender', +include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', + 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', + 'streams:upgrade-system-tests-0110', 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks' diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index ec0a1a8385b58..d6475bf94ee98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -167,6 +167,11 @@ public class StreamsConfig extends AbstractConfig { */ public static final String ADMIN_CLIENT_PREFIX = "admin."; + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}. + */ + public static final String UPGRADE_FROM_0100 = "0.10.0"; + /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. */ @@ -326,6 +331,11 @@ public class StreamsConfig extends AbstractConfig { public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements theorg.apache.kafka.streams.processor.TimestampExtractor interface. This config is deprecated, use " + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + " instead";
+ /** {@code upgrade.from} */
+ public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+ public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+ "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
/**
* {@code value.serde}
* @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
@@ -566,6 +576,12 @@ public class StreamsConfig extends AbstractConfig {
null,
Importance.LOW,
TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(UPGRADE_FROM_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ in(null, UPGRADE_FROM_0100),
+ Importance.LOW,
+ UPGRADE_FROM_DOC)
.define(VALUE_SERDE_CLASS_CONFIG,
Type.CLASS,
null,
@@ -779,6 +795,7 @@ public Map