-
Notifications
You must be signed in to change notification settings - Fork 331
SAMZA-1385: Fix zookeeper path conflict between LocalApplicationRunner and ZkJobCoordinator #265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…r and ZkJobCoordinator
|
@bharathkk I think there may be some checkstyle errors. Check the jenkins build link. |
navina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 thanks for quickly fixing this!
Fwiw, I really like the of making CoordinationUtils interface Closeable. Since we are expecting changes to the CoordinationUtils interface, it's ok to leave it as is for the time-being.
| private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); | ||
| // Latch id that's used for awaiting the init of application before creating the StreamProcessors | ||
| private static final String INIT_LATCH_ID = "init"; | ||
| private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/" + "ApplicationRunnerData"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can avoid the string concatenation here -
private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"
| }); | ||
| leaderElector.tryBecomeLeader(); | ||
| initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); | ||
| } finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering if this is a problem.
If we close coordinationUtils after startup phase, we could end up retriggering leader election when new processors joins a processors group. Say [P1, P2, P3] are running currently with a StreamGraphImpl(S) and P4, P5 joins them with S when P1, P2, P3 are processing. Since zkClient for [P1, P2, P3] are closed, we would retrigger leader election with [P4, P5] when they join and either [P4,P5] would end up becoming a leader and recreate intermediate streams.
The whole purpose of having this leaderElection here is to avoid that.
Please correct me if i am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a discussion about this w/ Navina this morning. Here is a summary
The problem you quoted exists even in current world. E.g. If you were to have a job with only one processor and restart it, you would still end up trying to create the intermediate streams.
Our implementation operates on the assumption that the underlying SystemAdmin handles requests for topic creation that already exists (Ideally, we should document this as a requirement for new systems if we were to stick on with current implementation).
W.r.t the leader election, it only ensures that no concurrent requests are fired to create the intermediate streams (In my mind this should also be the underlying system's guarantee) and nothing more about redundant requests.
With the current change, both behavior still holds.
One can argue that we spend extra cycles on these processes by trying to create intermediate streams. If we were to optimize that, then it would be a question of znode maintenance & cleanup vs this optimization. We decided that would be out of scope for this release blocker.
Let me know if that answers your question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem you quoted exists even in current world.
Beg to differ.
Imagine what can happen in worst case. Let’s say there’re X processors as a part of standalone application deployment.
Each processor joins after the predecessor processor in the group has acquired the lock(passed leader election)
and created intermediate streams (depending upon time processor joining the group).
X processors join one at a time (P1 triggers leader election and creates intermediate stream), then P2 triggers leader election(since P2 joined
after the completion of Leader election only with P1 &
intermediate stream creation by P1) acquires lock and creates intermediate stream,
you can extend this to all the X processors.
With this processor joining order(say ‘X’ processor joining) after
the change, we will have ‘X’ leader election & ‘X’ intermediate streams
creation calls(Every new processor joining
will have new leader election + intermediate stream creation call).
With the coordinationUtils.close only at shutdown, one leader election and one intermediate streams creation.
This is my only concern(With the combination of our debounce, latch timeout
this will be a big number for sufficiently large ‘X’).
Since this incurs significant cost at worst case, what is the benefit of closing it after leader election here and not during shutdown??
zNode maintainence vs cleanup
This is already taken care of by background GC which cleans up old zkNodes related to an app(which were reminiscent of
previous process group coordination,
generated job models etc). There should be no
manual/operational intervention needed.
With regards to release blocker
I think the fix was to branch the zkRootPath into two(One for ZkJobCooordinator and other for ZkCoordinationService).
Since we expect streams creation to be idempotent we don’t have to hold a lock just to do that(Idempotency at server side will take care of multiple req’s also at same time).
I’m not sure of the consequences of this (cost of multiple leader elections, multiple intermediate stream creation).
I don't think it's a good idea to change it very close to a release.
Nevertheless, we could do it if there’s a consensus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree with @shanthoosh on this note, if the bug we try to fix here is exactly as I stated in SAMZA-1385 comment #1. Basically, since different processes can enter the critical section to create the stream at different point in time in the whole lifecycle of the application, the leader for stream creation needs to be alive throughout the LocalApplicationRunner's lifecycle.
Note that for applications only have one process, restarting the process == restarting the whole app. When the whole app is restarted, it is reasonable to perform another round of leader election. However, for an application w/ many processes, as long as one process is alive and becomes the leader, the second process to join in sequence should not need to trigger leader election again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the records, we had a discussion offline and decided to address the lifecycle issue separately. We will use different znodes for JC & streams creation leader election.
| }); | ||
| leaderElector.tryBecomeLeader(); | ||
| initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); | ||
| } finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree with @shanthoosh on this note, if the bug we try to fix here is exactly as I stated in SAMZA-1385 comment #1. Basically, since different processes can enter the critical section to create the stream at different point in time in the whole lifecycle of the application, the leader for stream creation needs to be alive throughout the LocalApplicationRunner's lifecycle.
Note that for applications only have one process, restarting the process == restarting the whole app. When the whole app is restarted, it is reasonable to perform another round of leader election. However, for an application w/ many processes, as long as one process is alive and becomes the leader, the second process to join in sequence should not need to trigger leader election again.
|
@nickpan47 I believe we are bringing up multiple issues in this PR
Here is how I understand the current implementation of intermediate streams is behaving:
What I believe is missing is that there is no state indicating that the critical operation was completed. Neither is there a validation of whether critical section operation has to be performed or not nor is there a clear distinction between application restart and processor restart. I believe we are trying to address this issue in PR #264 (fyi, I don't think the PR is complete). On the point about closing the zkClient connection after critical operation vs during processor shutdown:
|
|
@navina and @bharathkk thanks for the detailed explanation on the issues. So, I just had a thought to address the long-idle ZK session: |
| * @return latch id | ||
| */ | ||
| private String generateLatchId(List<StreamSpec> intStreams) { | ||
| return String.valueOf(Objects.hashCode(intStreams.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that the hashcode is the same for lists with the same elements but in different orders? Is there any assumption in this code that the intStreams is a sorted list? Would be nice to see a test case for this code.
| /** | ||
| * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying | ||
| * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election. | ||
| * It results in a leader election whenever a new process joins and also creates intermediate streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: "it creates the following issues when a new process joins after the ephemeral node is gone"
| boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, TimeUnit.MILLISECONDS, timeout); | ||
|
|
||
| if (!targetPathExists) { | ||
| throw new TimeoutException("Timed out waiting for the targetPath"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not wrap this in TimeoutException. It is misleading. Can't we throw a unchecked LatchNotExistException and catch that around latch.await()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline w/ @bharathkk The issue lies in the mismatch of functional behavior of ZkClient.waitUntilExists() vs the defined behavior in Latch interface: ZkClient.waitUtilExists() never throws TimeoutException. It will simply return false when timeout happens. Hence, the logic here. As discussed, @bharathkk please add some doc here to explain the logic.
|
Another note: there is a merging conflict. Please resolve it first. |
nickpan47
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Please add the unit tests to cover the hashcode for the list of inputs and feel free to commit.
Thanks!
navina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks good. +1
Have a couple of questions. Thanks!
| initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); | ||
| } | ||
| } finally { | ||
| coordinationUtils.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for adding this here 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather have a close() method in each facility separate. This gives more control over the life cycle of the utils. But we don't have to change it now, since we will change it later.
| boolean eligibleForElection = true; | ||
|
|
||
| try { | ||
| streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage of the latch interface is super confusing because it blocks on await with 2 different timeouts - LEADER_ELECTION_WAIT_TIME and LATCH_TIMEOUT_MINUTES . Iiuc, we do this due to a lack of interface support to test the status of the latch. please correct me if I am wrong.
I find this acceptable only because this is a stop-gap solution until we fix the coordinatorUtils interface.
| public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class); | ||
| private static final String JOB_COORDINATOR_ZK_PATH_SUFFIX = "/JobCoordinatorData"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: This may probably start another debate. But I am throwing it out there anyways.
In the future when we have more than one job for the same StreamApplication, we will run into the same path collision issue. Is this ever going to be a valid use-case where we define each node in the DAG to be a separate job in itself? If that is the case, we will need another hierarchy when defining the scope - as in:
- /application/RunnerData (one per StreamApplication)
- /application//JobCoordinatorData (one per stage)
@nickpan47 can you please clarify if this is the direction we are heading for in the long term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@navina
Just curious.
Do we need to handle it prior to 0.13.1(To avoid zkPath collisions between different processor groups in multi-stage job)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I would prefer that, if there is enough clarity of the multi-stage roadmap.
@nickpan47 or @xinyu : Do you guys see this as an issue when it comes to multistage job execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @navina 's analysis. Let's fix it in now. However, I would imagine that we would refactor those hard-coded ZK path names w/ more general metadata store key utils later, like ApplicationMetadataUtils.getAppMetaKey() and ApplicationMetadataUtils.getJobCoordinatorMetaKey() instead of the hard-coded ZK paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We should refactor them. Within ZkJobCoordinator, we tried using a specific ZkKeyBuilder class. Not sure how to re-use the same key building utilities if they are in logically different components. But we can figure it out when we refactor our APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it with the following scheme for now "jobName-jobId-coordinationData".
Agreed, we should revisit the need for shared logical paths and refactor accordingly.
shanthoosh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Appreciate the industriousness in changing it in short time.
Thanks.
| public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class); | ||
| private static final String JOB_COORDINATOR_ZK_PATH_SUFFIX = "/JobCoordinatorData"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@navina
Just curious.
Do we need to handle it prior to 0.13.1(To avoid zkPath collisions between different processor groups in multi-stage job)?
nickpan47
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix and ship it! Thanks!
| createStreams(plan.getIntermediateStreams()); | ||
| // We need to handle the case to make sure physical name of the intermediate streams are unique. | ||
| // Currently, same jobs with different binaries could still end up producing same DAGs resulting in physical name | ||
| // collision. Refer JIRA-1391 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually refer to jiras in the following format: SAMZA-1391.
| : DEFAULT_JOB_NAME; | ||
| String jobId = jobConfig.getJobId().isDefined() | ||
| ? jobConfig.getJobId().get() | ||
| : String.valueOf(System.currentTimeMillis()); // TODO: shouldn't be the case. we need to clean up JobConfig to make this parameter required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe jobId always defaults to 1. Why is it currentTimeMillis here? Wouldn't this cause different path across processors? Perhaps I am missing something
| // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface | ||
| boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout); | ||
|
|
||
| if (!targetPathExists) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…stead of custom generated hash
navina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Tested the fix w/ sample page view adclick joiner job.
@navina @sborya @nickpan47 can you please take a look at the RB?