Skip to content

Only schedule AlterIsr thread when we have an ISR change#9749

Merged
mumrah merged 10 commits intoapache:trunkfrom
mumrah:minor-alter-isr-scheduling
Jan 13, 2021
Merged

Only schedule AlterIsr thread when we have an ISR change#9749
mumrah merged 10 commits intoapache:trunkfrom
mumrah:minor-alter-isr-scheduling

Conversation

@mumrah
Copy link
Copy Markdown
Member

@mumrah mumrah commented Dec 14, 2020

Rather than scheduling every 50ms to check for unsent updates, we should schedule the propagation thread only after we receive ISR updates

Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
@mumrah mumrah requested a review from hachikuji December 14, 2020 19:58
Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few more comments. After looking at this again, I can't help but feel that the inflight lock is overkill. I guess what's messing is up is the fact that we need to check both inflightRequest as well as unsentUpdates. Looking at the initial ordering, we have the following:

  • submit: first add to queue, then check inflight request
  • onResponse: first check queue, then clear inflight request

The race condition we were trying to protect occurs because we might enqueue a new item between the two operations in onResponse. But maybe we just needed to change the order. In other words, what if we first cleared the inflight flag, then we check the queue?

def submit(item): Unit = {
  unsentItems.putIfAbsent(item)
  maybePropagate()
}

def maybePropagate(): Unit = {
  if (!unsentItems.isEmpty() && inflightRequest.set(false, true)) {
   sendRequest()
  }
}

def onResponse(response): Unit = {
  inflightRequest.set(false)
  maybePropagate()
}

So after enqueuing the item in submit, we have an opportunity to send immediately if nothing is inflight. On the other hand, if a request is inflight, then we are guaranteed that onResponse will see the addition to the queue because we know it checks the queue only after clearing the flag.

Does that work?

Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
@mumrah
Copy link
Copy Markdown
Member Author

mumrah commented Jan 12, 2021

It seems I over-complicated this while trying to avoid clearing the inflight in the response handler when another request needs to go out. I think what you suggest does work. If the response handler just immediately sets the inflight flag to false it doesn't matter if another submit call races with checking the queue size since one of them will set the inflight flag true and the request will get submitted.

I'll go ahead and try this out

…sr-scheduling

Conflicts:
	core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Trivial nit

Comment thread core/src/main/scala/kafka/server/AlterIsrManager.scala Outdated
@mumrah mumrah merged commit ee08b0b into apache:trunk Jan 13, 2021
ijuma added a commit to ijuma/kafka that referenced this pull request Jan 13, 2021
* apache-github/trunk:
  Only schedule AlterIsr thread when we have an ISR change (apache#9749)
  MINOR: Fix flaky test shouldQuerySpecificActivePartitionStores (apache#9873)
  MINOR: Add restoration time tracking (apache#9830)
  MINOR: Remove unnecessary assertDoesNotThrow (apache#9854)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants