-
Notifications
You must be signed in to change notification settings - Fork 331
SAMZA-2305: Stream processor should ensure previous container is stopped during a rebalance #1213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dnishimura
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this important fix! Just a few comments and questions.
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
Outdated
Show resolved
Hide resolved
dnishimura
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the fix.
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
Outdated
Show resolved
Hide resolved
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
Show resolved
Hide resolved
cameronlee314
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My overall feedback as a separate reader:
It seems like using InterruptedExceptions is kind of an "implicit" way of handling this case, so it's a bit hard to validate that all places have been updated correctly (and might be a bit hard to follow if anyone else has to update this flow in the future). I see that you have done good analysis on alternative options, so I think it is reasonable to stick with this strategy overall, but if there is anything further you can think of to make it as explicit as possible, it might be helpful. You have already added some good things (i.e. update method sig with throws InterruptedException, javadocs), but I'm not sure if there is anything else you can do.
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
Show resolved
Hide resolved
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
Show resolved
Hide resolved
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
Show resolved
Hide resolved
samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
Outdated
Show resolved
Hide resolved
59530a4 to
15a56bf
Compare
cameronlee314
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the description notes to reflect the API change for the InterruptedException.
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
Show resolved
Hide resolved
samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
Show resolved
Hide resolved
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
Show resolved
Hide resolved
…ped during a rebalance (apache#1213) Stream processor should ensure previous container is stopped during a rebalance
…ped during a rebalance (#1213) Stream processor should ensure previous container is stopped during a rebalance
Symptom: Duplicate processing, Inconsistent checkpoints for inputs, Inconsistent changelog state
Cause: We have a bug in the state machine inside stream processor that can result in processors running containers with old job model version after rebalances in Standalone deployment model.
Fix: We interrupt the container and wait for container to shutdown gracefully within a timeout (task.shutdown.ms) and fail the stream processor if the container doesn’t shut down within the timeout
Tests: Added unit tests for StreamProcessor and SamzaContainer. Working on integration tests.
API Changes
restoremethods onTaskRestoreManagerandStorageEnginenow throwsInterruptedException. Please refer to java docs to get additional implementation notes.Upgrade Instructions Standalone jobs w/ external monitoring service to restart your application should follow the External monitoring section in the document below to tune debounce time to account for monitoring service latency and container startup time.
Usage Instructions None
More details about the bug can be found here.