Skip to content

KAFKA-4840 : BufferPool errors can cause buffer pool to go into a bad state#2659

Closed
smccauliff wants to merge 10 commits intoapache:trunkfrom
smccauliff:kafka-4840
Closed

KAFKA-4840 : BufferPool errors can cause buffer pool to go into a bad state#2659
smccauliff wants to merge 10 commits intoapache:trunkfrom
smccauliff:kafka-4840

Conversation

@smccauliff
Copy link
Copy Markdown
Contributor

No description provided.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the BUFFERPOOL prefix here since it's already defined in the BufferPool class. Thanks for the PR, will hopefully review it soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I've removed this prefix.

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2089/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2087/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2086/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2147/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2144/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2145/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is useful to have a static variable for this string since it is used only once. It is also different from the code style in Sender.SenderMetrics which assigns sensor name directly. Maybe we can preserve the existing style for simplicity and code style consistence.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a mock that refers to this variable otherwise I would not have bothered to put this into a static final variable.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only release lock in the finally here, do we still need to check lock.isHeldByCurrentThread?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. The original code was unlocking in different places so this was needed. I will remove this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name restoreAvailableMemoryOnFailure is a bit weird because we should always restore available memory on failure. Maybe we can name it hasError and set it to false right before return buffer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably remove this line restoreAvailableMemoryOnFailure = false.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also remove this condition variable within a finally block?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible and simpler to merge the two finally into one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not without additional complications. One finally deals with cases caused by waiting for memory to become available the other deals with cases causes by just allocating memory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is actually a reasonable way to merge the two finally into one. The idea is that the code in the first finally block doesn't throw exception and is no-op if this.availableMemory + freeListSize >= size. This is no-op if you change it to the following:

if (hasError)
  this.availableMemory += accumulated;
if (moreMemory != null)
  this.waiters.remove(moreMemory);

Don't check to see if lock is held before unlocking, since there is only one place where the lock is unlocked.
@smccauliff
Copy link
Copy Markdown
Contributor Author

@lindong28 Can you check this again? Thanks.

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2400/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2404/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2400/
Test PASSed (JDK 8 and Scala 2.12).

@lindong28
Copy link
Copy Markdown
Member

@smccauliff I think we can further simplify the code by merging the two finally into one. But it is OK to me if you don't want to do that. LGTM. Ping @becketqin for review.

freeUp(size);
this.availableMemory -= size;
lock.unlock();
return allocateByteBuffer(size);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an OOM is thrown from here, it seems the available memory would be already decremented and not added back.

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2788/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2788/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2792/
Test PASSed (JDK 8 and Scala 2.11).

// over for them
if (!(this.availableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
lock.unlock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems findBugs is not happy with this finally block although I think it should be fine. Can we have a try finally block here to ensure lock.unlock() is executed so that findBugs is happy?

@becketqin
Copy link
Copy Markdown
Contributor

@smccauliff Thanks for the update. LGTM. findBugs was complaining about one of the finally block. Can we change it to make findBugs happy?

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2922/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2926/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2921/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3162/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3166/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3159/
Test PASSed (JDK 7 and Scala 2.10).

@becketqin
Copy link
Copy Markdown
Contributor

Thanks for updating the patch. Merged to trunk.

@asfgit asfgit closed this in c43eb85 Apr 25, 2017
// we have enough unallocated or pooled memory to immediately
// satisfy the request
freeUp(size);
ByteBuffer allocatedBuffer = allocateByteBuffer(size);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was intentional that we didn't allocate memory with the lock held. It seems like this optimisation was lost?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if there is some clean way to restore that optimization, but if there is OOM on buffer allocation then the lock needs to be acquired and the allocation freed once more. Seems pretty ugly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not pretty. However, reducing the concurrency of BufferPool in the common path is not desireable. We definitely need to handle OOMs correctly, but they are relatively rare and it's OK if that path is slower.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think it is still worth keeping the optimization, although typically the the producer will only allocate poolable batch size, so the actual memory allocation should not happen very often.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants