KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0#4746
KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0#4746mjsax merged 11 commits intoapache:0.10.1from
Conversation
|
This is a patch for 0.10.1 only. It sets up system testing with 0.10.0 code as discussed in #4636. Will port this PR to 0.10.2 and add corresponding system test setup. Local build and tests passed. System test passed, too: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1533/ Any insight why Jenkins failed? |
|
retest this please |
guozhangwang
left a comment
There was a problem hiding this comment.
THanks for the PR @mjsax !
Made a pass over it.
|
|
||
| self.driver.stop() | ||
|
|
||
| def start_all_nodes_with_0100(self): |
There was a problem hiding this comment.
This is a meta comment: in newer versions we should consider parameterize it instead of writing a new function for each from / to version pair.
There was a problem hiding this comment.
Agreed. I'll upgrade the code accordingly when porting the PR to other branches.
| timeout_sec=60, | ||
| err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) | ||
|
|
||
| # start second with 0.10.0 |
| "streams_stderr.1": { | ||
| "path": STDERR_FILE + ".1", | ||
| "collect_default": True}, | ||
| "streams_log.0-1": { |
There was a problem hiding this comment.
Why we will have 6 log files per round? From the code below it seems we will only need three per round?
There was a problem hiding this comment.
Yes. But we don't know which one we get because we shuffle the order or processors for rebalancing. The once that don't exists are just not collected, but we need to list them all :(
I am more than happy to change the code, but I don't know a better solution... Is there anything better we could do (I still don't know all ducktape magic)
There was a problem hiding this comment.
Although we shuffle the processes, each of them will still be bounced exactly twice, once per round right? In this case could we just use streams_log.0 and streams_log.1 on each processor instance?
There was a problem hiding this comment.
We could. I choose the current naming schema, because it encodes the rebalance order in the file names, what makes debugging easier. Otherwise, you have to extract this information from the ducktape log. Let me know if you think this simplification is worth it, or if you prefer removing the counter reducing the log files to 3.
There was a problem hiding this comment.
I see your point now, that makes sense. Let's keep it as is.
| "streams_stderr": { | ||
| "path": STDERR_FILE, | ||
| "collect_default": True}, | ||
| "streams_log.0": { |
There was a problem hiding this comment.
Where can these log files (the ones without dashes) be generated? I can only find the ones with dashes created in rolling bounces.
|
|
||
| @SuppressWarnings("unchecked") | ||
| public static void main(final String[] args) { | ||
| String kafka = args.length > 0 ? args[0] : null; |
There was a problem hiding this comment.
If we expect all parameters except upgradeFrom to always be given, should be just simply check args.length > 2 and if failed error out than setting nulls?
| assertEquals(oldVersion.standbyTasks, decoded.standbyTasks); | ||
| assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1 | ||
| assertEquals(2, decoded.version); // automatically upgraded to v2 on decode; | ||
| assertEquals(1, decoded.version); |
There was a problem hiding this comment.
Is this change intentional? If yes should we rename the test then?
There was a problem hiding this comment.
I think the test name is still fine -- AssignmentInfo.decode should be able to decode a version 1 Assignment -- originally, it upgraded the AssignmentInfo to be version 2, but I think this is actually not a good idea -- if we receive a version 1 AssignmentInfo we should return version 1 and not set version 2.
|
|
||
| final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); | ||
| if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) { | ||
| log.debug("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x."); |
There was a problem hiding this comment.
nit: I'd suggest make it INFO, as it should be rare but important for trouble shooting.
|
|
||
| def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] | ||
| def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs | ||
| def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100'] + connectPkgs |
There was a problem hiding this comment.
We should exclude this from installAll, releaseTarGzAll and uploadArchivesAll otherwise it would be uploaded to maven. I think we'd better always jar locally before test than trying to pull from the messed repo.
| from(project(':streams').configurations.runtime) { into("libs/") } | ||
| from(project(':streams:examples').jar) { into("libs/") } | ||
| from(project(':streams:examples').configurations.runtime) { into("libs/") } | ||
| from(project(':streams:upgrade-system-tests-0100').jar) { into("libs/") } |
There was a problem hiding this comment.
Could we double check if build, install, releaseTarGz and uploadArchives would not include this module, i.e. it will only be built upon running the system test?
There was a problem hiding this comment.
not sure why we should exclude from build or install ? Can you elaborate?
There was a problem hiding this comment.
Well, I guess it may not be too bad for build and install, just these are not necessary. Plus install will put the jar into the mvn cache which could be risky if we are relying on always build the jar from the current branch. Again, it is for cleanness more than correctness.
| fi | ||
| done | ||
| else | ||
| for file in "$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar; |
There was a problem hiding this comment.
Hmm... if we hard-code the version here how it could be extended in newer versions with multiple possible values of UPGRADE_KAFKA_STREAMS_TEST_VERSION?
There was a problem hiding this comment.
Yes. In newer version we extend this and replace hard-coded 0100 with $UPGRADE_KAFKA_STREAMS_TEST_VERSION -- did just omit it here and fix when porting this PR to 0.10.2 branch
|
Updates this. Triggered System Tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1556/ System test passed. |
|
Hmm... not so sure why |
|
Retest this please |
| (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). | ||
| <ul> | ||
| <li> note: rolling bounce upgrade requires to upgrade to 0.10.1.2 (rolling bounce upgrade is not supported for upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1) </li> | ||
| <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from="0.10.0"</code> is set for new version 0.10.1.2 </li> |
There was a problem hiding this comment.
does this mean the config setting needs to change?
There was a problem hiding this comment.
Yes. The newly started application must have this setting.
| jackson: "2.6.3", | ||
| jetty: "9.2.22.v20170606", | ||
| jersey: "2.22.2", | ||
| kafka0100: "0.10.0.1", |
There was a problem hiding this comment.
super nit: can we change to kafka_0100
| junit: "junit:junit:$versions.junit", | ||
| log4j: "log4j:log4j:$versions.log4j", | ||
| joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", | ||
| kafkaStreams0100: "org.apache.kafka:kafka-streams:$versions.kafka0100", |
| private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES; | ||
| static | ||
| { | ||
| static { |
| public final TopicPartition partition; | ||
|
|
||
| public AssignedPartition(TaskId taskId, TopicPartition partition) { | ||
| AssignedPartition(TaskId taskId, TopicPartition partition) { |
| String kafka = args[0]; | ||
| String zookeeper = args[1]; | ||
| String stateDir = args[2]; | ||
| String upgradeFrom = args.length > 3 ? args[3] : null; |
| System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)"); | ||
| System.out.println("kafka=" + kafka); | ||
| System.out.println("zookeeper=" + zookeeper); | ||
| System.out.println("stateDir=" + stateDir); |
| } | ||
| String kafka = args[0]; | ||
| String zookeeper = args[1]; | ||
| String stateDir = args[2]; |
There was a problem hiding this comment.
should we use the streams config instead of singlestateDir parameter introduced in #4714? Some others below as well.
There was a problem hiding this comment.
I think for this older branch, it's easier to just stay with stateDir for avoid unnecessary refactoring -- for trunk it makes sense to change to Properties. I'll keep it in mind an fix for trunk PR. Ok?
| (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). | ||
| <ul> | ||
| <li> note: rolling bounce upgrade requires to upgrade to 0.10.1.2 (rolling bounce upgrade is not supported for upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1) </li> | ||
| <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from="0.10.0"</code> is set for new version 0.10.1.2 </li> |
There was a problem hiding this comment.
Yes. The newly started application must have this setting.
| } | ||
| String kafka = args[0]; | ||
| String zookeeper = args[1]; | ||
| String stateDir = args[2]; |
There was a problem hiding this comment.
I think for this older branch, it's easier to just stay with stateDir for avoid unnecessary refactoring -- for trunk it makes sense to change to Properties. I'll keep it in mind an fix for trunk PR. Ok?
|
@mjsax the reason the PR is failing is that the target branch (0.10.1) is missing jenkins.sh. |
|
LGTM assuming Jenkins passed. |
25d3685 to
d061ff0
Compare
|
Thanks @ijuma ! |
|
Retest this please. |
|
Retriggered system tests after latest updates: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1571/ Passed. |
|
@guozhangwang @bbejeck @vvcephei I updated the docs section, too. Can you have a look. I am not sure how docs are deployed -- this should not end up on the web page immediately because we talk about non-release version Retest this please. |
| </li> | ||
| <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported) | ||
| <ul> | ||
| <li> note: rolling bounce upgrade is supported for upgrading from 0.10.0.x to 0.10.1.2 </li> |
There was a problem hiding this comment.
nit: this lines is a bit verbose to me.
| (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). | ||
| <ul> | ||
| <li> note: rolling bounce upgrade requires to upgrade to 0.10.1.2 (rolling bounce upgrade is not supported for upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1) </li> | ||
| <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from="0.10.0"</code> is set for new version 0.10.1.2 </li> |
There was a problem hiding this comment.
config upgrade.from is set to "0.10.0".
| <li> note: rolling bounce upgrade requires to upgrade to 0.10.1.2 (rolling bounce upgrade is not supported for upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1) </li> | ||
| <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from="0.10.0"</code> is set for new version 0.10.1.2 </li> | ||
| <li> bounce each instance of your application once </li> | ||
| <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove config <code>upgrade.mode</code> </li> |
There was a problem hiding this comment.
to remove the value for ...
|
Note: Java 9 does not work on old |
|
Updated. |
vvcephei
left a comment
There was a problem hiding this comment.
I asked a few questions and picked a couple of nits.
But overall, it's fine by me.
| <ul> | ||
| <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase | ||
| (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). | ||
| <ul> |
There was a problem hiding this comment.
Seems like this should be an <ol>, with the exception of the 'note:' bullet.
There was a problem hiding this comment.
I did bullet points on purpose instead or numbers. Let me know if you want to have numbers.
|
|
||
| <h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes in 0.10.1.0</a></h5> | ||
| <ul> | ||
| <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase |
There was a problem hiding this comment.
An offline upgrade is also an option here, right?
| index = rand.nextInt(remaining); | ||
| } else { | ||
| index = rand.nextInt(numKeys); | ||
| } |
There was a problem hiding this comment.
nit: using a ternary operator would let you make index final:
final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
| Runtime.getRuntime().addShutdownHook(new Thread() { | ||
| @Override | ||
| public void run() { | ||
| running = false; |
There was a problem hiding this comment.
Sorry if this is a noob question... what's the value of this hook?
I couldn't find a usage with autoTerminate=false in which the code wasn't running in the main thread.
With an infinite loop in the main thread, the only way your hook is going to run is from a SIGTERM, which will terminate the main thread anyway, unless we trap it somewhere. In this case, the hook will set running := false after the while loop has already exited (killed via SIGTERM).
... I think
| Runtime.getRuntime().addShutdownHook(new Thread() { | ||
| @Override | ||
| public void run() { | ||
| streams.close(); |
There was a problem hiding this comment.
might be nice to print a line before this, in case streams.close() hangs, preventing the JVM from exiting.
| Runtime.getRuntime().addShutdownHook(new Thread() { | ||
| @Override | ||
| public void run() { | ||
| streams.close(); |
| monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) | ||
|
|
||
| if len(self.pids(node)) == 0: | ||
| raise RuntimeError("No process ids recorded") |
There was a problem hiding this comment.
duplicate of the immediately following block?
There was a problem hiding this comment.
Nice catch! Guess this happened during cherry-picking resolving conflicts.
| timeout_sec=60, | ||
| err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) | ||
|
|
||
| self.driver.stop() |
There was a problem hiding this comment.
I might have missed it... did we verify that everything is using the new metadata version after the second bounce?
There was a problem hiding this comment.
Hmmm... Not really... Also not sure how to check this? Ideas? We could put additional DEBUG logs that print the version of the received AssignmentInfo. \cc @bbejeck @guozhangwang WDYT?
We only make sure that the instances did not crash and process data after second rolling bounce.
There was a problem hiding this comment.
I think this check should better be covered in unit test or integration test, not system test.
There was a problem hiding this comment.
It's covered in StreamPartitionAssignorTest -- seems we are good than.
| first_other_node = first_other_processor.node | ||
| second_other_node = second_other_processor.node | ||
|
|
||
| # stop processor and wait for rebalance of others |
There was a problem hiding this comment.
"wait on rebalance" was not part of the instructions in the doc. Is this a necessary step?
I suppose it is the distinction between an offline upgrade and a rolling one...
Supposing that is the intent, maybe a better phrasing would be "stop processor and ensure the others continue making progress".
There was a problem hiding this comment.
For users, it's not required. It's a system test thing, that allows us to "track" the progress -- if we bounce instances without waiting, we introduce a race condition in the test because we don't know how many rebalances might actually be triggered: it could be a single rebalance or two, depending how quickly the instance comes back online.
fd6ba48 to
adbe6fa
Compare
adbe6fa to
9db2a8f
Compare
|
Updated this. Triggered system test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1586/ |
|
Retest this please. |
|
Retest this please |
1 similar comment
|
Retest this please |
|
Retest this please. |
|
LGTM. Please feel free to merge after jenkins passed. |
|
retest this please |
|
Java 7 passed -- Java 8 timed out. Retest this please. |
|
Just finished the review. LGTM, mod the failing tests ;) |
|
Both Java7 and Java8 passed. Pushing one more doc typo and merge afterwards. |
|
Merged to |
No description provided.