Skip to content

Conversation

@bharathkk
Copy link
Contributor

@bharathkk bharathkk commented Sep 25, 2018

The PR addresses the following issues

  • We have few scenarios where there is a race condition between SamzaContainerListener and StreamProcessor that results in incorrect application status being propagated to client
  1. Consider the scenario when the container runs into an exception in run loop and triggers shutdown sequence and at the same time the user triggers a stop on the StreamProcessor. The user request comes in and notices that the container is already shutting down and proceeds to shutdown the job coordinator which in turns shuts down successfully and the processorListener.onStop() is invoked. The container eventually invokes the SamzaContainerListener callback and updates the exception state after the StreamProcessor has finished shutting down.
  2. Consider the scenario when the user invokes start on the stream processor followed by a stop. During the StreamProcessor shutdown sequence we can have different status propagated to the client based on when we receive the start notification from container. If we receive the notification prior to invoking processListener.onStop() as part of StreamProcessor shutdown sequence, we return successfulFinish while if the notification is received after, we return unsuccessfulFinish.
  • SamzaContainer sets the status flag to failed as soon as it runs into an exception in the runloop. This is problematic because it is possible for StreamProcessor to assume the container has shutdown correctly even though container is still in the shutdown sequence. StreamProcessor uses container.hasStopped() to determine the shutdown status of the container. Internally, this method returns true if the status is either FAILED or STOPPED.
  • During rebalance phase, due to above mentioned bugs, it possible that we have a unclean shutdown but still proceed to wait for new job model in some cases and proceed to StreamProcessor shutdown in others.
  • Currently, we only propagate failures to processorListener when containerException is not null. It is possible for the samza container to take longer than task.shutdown.ms to shutdown in which case, we need to propagate a timeout exception to the processorListener as opposed assuming the shutdown was successful.

@bharathkk
Copy link
Contributor Author

@prateekm @nickpan47 @shanthoosh please take a look.
I will update the JIRA and PR with an extensive summary. Meanwhile, it would be good to get early feedback on it.

@bharathkk bharathkk changed the title Samza 1832 - Fix race condition between StreamProcessor and SamzaContainerListener SAMZA-1832: Fix race condition between StreamProcessor and SamzaContainerListener Sep 26, 2018
} else {
LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
}
if (state.get() == IN_REBALANCE) {
Copy link
Contributor

@shanthoosh shanthoosh Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if i'm wrong here.

There're three actors who could update the StreamProcessor state viz DebounceThread, UserThread, ContainerThread.

Scenario:
UserThread is trying to stop the StreamProcessor(StreamProcessor.stop()) and DebounceThread is triggering onNewJobModel at same time(let's say StreamProcessor state is IN_REBALANCE):

Time 0: Debounce thread checks the streamprocessor state and goes past the if guard in `onNewJobModel`.
Time 1: User thread invokes StateTransitionUtil.compareNotInAndSet and the check passes as well and user thread proceeds(sets the streamprocessor state to stopping). 
Time 2: User thread stops the container.
Time 3: Debounce thread recreates the container and launches it(most importantly updates streamprocessor state variables).
Time 4: User thread stops the coordinator.
Time 5: Container callback for container stopped in (time 2) comes back and counts down the container shutdownlatch.
Time 6: Coordinator.onFailure()/ coordinatorListener.OnStop() callback triggered for coordinator stopped in (time 4) will not stop the container, since the current streamprocessor state is stopping and container shutdownlatch value is zero.

After the StreamProcessor.stop() with stream processor state set to STOPPED, we will have a SamzaContainer running in the JVM(we could end-up in similar state with containerListener.afterStop(), containerListener.afterFailure() as well). The above interleaved execution sequence could occur with different combinations of the StreamProcessor public methods essentially resulting in state corruption. It is not sufficient to check a state variable alone and update other state in StreamProcessor.

Having serialized execution in StreamProcessor(with explicit locks) would help us avoid getting into the above state and will keep things simple and easy to understand.

}
processorListener.afterFailure(throwable);

if (state.get() == STOPPING) {
Copy link
Contributor

@shanthoosh shanthoosh Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious.

From the JIRA, it looks like the container status is set prematurely to failed or stopped while the container shutdown is in progress. If incorrect container state is the actual cause, then why didn't we go the route of fixing it to solve the problem. How does adding a new stream processor state and changing the lock mechanism fixes the problem?
Can you please share the rationale.

*
* @return true if current state is not in the blacklisted values and transition was successful; false otherwise
*/
public static <T> boolean compareNotInAndSet(AtomicReference<T> reference, Collection<T> blacklistedValues, T update) {
Copy link
Contributor

@shanthoosh shanthoosh Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious.

Though it looks like we use atomic reference in StreamProcessor, we lock implicitly in StateTransitionUtil. From what I can understand, we've replaced synchronized lock with a busy spinning lock in this patch for performing state transition on a object.

My concern with the spinlock is that, while waiting to acquire a lock, it wastes the CPU cycles. Can you please share the value-add and benefits that we get by replacing synchronized lock with the spinning lock. If we need to lock, why can't we use a explicit lock.

@bharathkk
Copy link
Contributor Author

Closing the PR since we have decided to pursue a simpler point fix w/ #673

@bharathkk bharathkk closed this Sep 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants