Skip to content

Concurrency issues in startManager method #792

@filiphornak

Description

@filiphornak

Describe the bug

There are a few situations where we might end up with multiple job managers running simultaneously or one initialized job manager without running assigning loop due to a lack of guarantees.

There is a possibility, although negligible that multiple threads can run JobScheduler#startManager() and start multiple job managers at the same time. Or, with an interrupt happening at the right moment, the scheduler might end up improperly initialized, i.e., without running the assignment loop.

Even if the chance of such a scenario is meager and might never happen in the real world, the bottom line is that this part of the code doesn't behave deterministically, and we can not rely upon it.

To Reproduce

Reproducing the issue would require slight modification of the underlying code because there is little to no time between getting the state of the volatile boolean isManagerRunning and setting it until context switching happens at the exact moment. Here is an idea of code that code:

  def startManager(): Unit = {
    logger.info("Starting Manager")
    if (!isManagerRunningAtomic.get() && runningScheduler.isComple ted) {
      Thread.sleep(1)  // This is ok because the context switch might happen at this spot
      isManagerRunningAtomic.set(true)
      // rest of initialization code
    }
  }

// starting managers
val jobScheduler: JobScheduler = ???
new Thread(() => jobScheduler.startManager(): Runnable).start
new Thread(() => jobScheduler.startManager(): Runnable).start

// interruption after the start
val thread = new Thread(() => jobScheduler.startManager(): Runnable)
thread.start
thread.interrupt()

Expected behavior

The code should behave the same way it acts now. However, there shouldn't be this possibility of nondeterministic behavior. We should provide some concurrency guarantees to the end users.

Additional context

Possible solutions that we discussed with @kevinwallimann are using the check and set operation, like in the following snippet:

  def startManager(): Unit = {
    logger.info("Starting Manager")
    if (isManagerRunningAtomic.checkAndSet(false, true) && runningScheduler.isCompleted) {
      Thread.sleep(1)  // Now we have a guarline that only one thread will enter this block.
      // rest of initialization code
    }
  }

However, more is needed to solve the issue of possible thread interruptions during initialization. It may also use double-checked locking. However, it has some caveats mainly that it's sensitive to placement of volatile variable assignment, as explained in double-checked locking is not threadsafe. This idiom was also used in scala2 lazy val, as you can see from the StackOverflow post: How are lazy val class variables implemented in scala 2.10, or bealdung post: Guide to lazy val in scala, which also contains example how to produce deadlock.

Another option, as also mentioned in double-checked locking is not threadsafe, is to use synchronized.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions