Skip to content

KAFKA-14222; KRaft's memory pool should always allocate a buffer#12625

Merged
jsancio merged 3 commits intoapache:trunkfrom
jsancio:kafka-14222-dynamic-buffer-allocation
Sep 13, 2022
Merged

KAFKA-14222; KRaft's memory pool should always allocate a buffer#12625
jsancio merged 3 commits intoapache:trunkfrom
jsancio:kafka-14222-dynamic-buffer-allocation

Conversation

@jsancio
Copy link
Copy Markdown
Member

@jsancio jsancio commented Sep 12, 2022

Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is possible for the memory pool to run out of memory if the snapshot is greater than 5 * 8MB.

This change allows the BatchMemoryPool to always allocate a buffer when requested. The memory pool frees the extra allocated buffer when released if the number of pooled buffers is greater than the configured maximum batches.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is
possible for the memory pool to run out of memory if the snapshot is
greater than 5 * 8MB.

This change allows the BatchMemoryPool to always allocate a buffer when
requested. The memory pool frees the extra allocated buffer when released if
the number of pooled buffers is greater than the configured maximum
batches.
@jsancio jsancio added core Kafka Broker 3.3 labels Sep 12, 2022
@jsancio jsancio requested a review from hachikuji September 12, 2022 19:39
free.offer(previouslyAllocated);
// Free the buffer if the number of pooled buffers is already the maximum number of batches.
// Otherwise return the buffer to the memory pool.
if (free.size() >= maxBatches) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should rename maxBatches since it is no longer serving as a max. How about maxRetainedBatches or something like that since it is still a bound on the number of batches which the pool can hold onto indefinitely.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Fixed the variable name.

assertThrows(IllegalArgumentException.class, () -> pool.release(buffer));
}

private ByteBuffer touch(ByteBuffer buffer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: touch seems a little vague. I think we're just trying to simulate some buffer usage?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the function to update.

} finally {
lock.unlock();
}
return Integer.MAX_VALUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 billion bytes is 2GB? Is that enough?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This should be Long.MAX_VALUE.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just one comment about the javadoc.

@@ -29,15 +29,15 @@
public class BatchMemoryPool implements MemoryPool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we update the javadoc above to match current behavior?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@jsancio
Copy link
Copy Markdown
Member Author

jsancio commented Sep 13, 2022

Thanks @hachikuji . Merging -- the failures look unrelated.

@jsancio jsancio merged commit c595417 into apache:trunk Sep 13, 2022
@jsancio jsancio deleted the kafka-14222-dynamic-buffer-allocation branch September 13, 2022 15:04
jsancio added a commit that referenced this pull request Sep 13, 2022
)

Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is
possible for the memory pool to run out of memory if the snapshot is
greater than 5 * 8MB.

This change allows the BatchMemoryPool to always allocate a buffer when
requested. The memory pool frees the extra allocated buffer when released if
the number of pooled buffers is greater than the configured maximum
batches.

Reviewers: Jason Gustafson <jason@confluent.io>
ijuma added a commit to confluentinc/kafka that referenced this pull request Sep 21, 2022
…eptember 2022)

`Jenkinsfile` was the only conflict and we ignore the changes since
they are not relevant to the Confluent build.

* apache-github/3.3: (61 commits)
  KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. (apache#12628)
  KAFKA-14243: Temporarily disable unsafe downgrade (apache#12664)
  KAFKA-14240; Validate kraft snapshot state on startup (apache#12653)
  KAFKA-14233: disable testReloadUpdatedFilesWithoutConfigChange first to fix the build (apache#12658)
  KAFKA-14238;  KRaft metadata log should not delete segment past the latest snapshot (apache#12655)
  KAFKA-14156: Built-in partitioner may create suboptimal batches (apache#12570)
  MINOR: Adds KRaft versions of most streams system tests (apache#12458)
  MINOR; Add missing li end tag (apache#12640)
  MINOR: Mention that kraft is production ready in upgrade notes (apache#12635)
  MINOR: Add upgrade note regarding the Strictly Uniform Sticky Partitioner (KIP-794)  (apache#12630)
  KAFKA-14222; KRaft's memory pool should always allocate a buffer (apache#12625)
  KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (apache#12626)
  KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (apache#12603)
  KAFKA-14215; Ensure forwarded requests are applied to broker request quota (apache#12624)
  MINOR; Remove end html tag from upgrade (apache#12605)
  Remove the html end tag from upgrade.html
  KAFKA-14205; Document how to replace the disk for the KRaft Controller (apache#12597)
  KAFKA-14203 Disable snapshot generation on broker after metadata errors (apache#12596)
  KAFKA-14216: Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc (apache#12617)
  KAFKA-14217: app-reset-tool.html should not show --zookeeper flag that no longer exists (apache#12618)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants