Skip to content

KAFKA-3197 Fix producer sending records out of order#857

Closed
becketqin wants to merge 8 commits into
apache:trunkfrom
becketqin:KAFKA-3197
Closed

KAFKA-3197 Fix producer sending records out of order#857
becketqin wants to merge 8 commits into
apache:trunkfrom
becketqin:KAFKA-3197

Conversation

@becketqin
Copy link
Copy Markdown
Contributor

This patch reuse max.in.flight.request.per.connection. When it equals to one, we take it as user wants order protection. The current approach is make sure there is only one batch per partition on the fly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just changed for cleanup reasons?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This method seems exposed only for unit test purpose. So protected seems good enough. Is there any other case we need access to this method?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's only for unit tests, is package private good enough?

@becketqin
Copy link
Copy Markdown
Contributor Author

@granthenke Thanks for the review, updated the patch. I don't feel strong on the method variances as they look pretty much like private calls, just like the package name implies.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one liner? partitionsInFlight = sendInOrder ? new HashSet : null;

@becketqin
Copy link
Copy Markdown
Contributor Author

@jjkoshy @auradkar Thanks for the review. I have updated the patch to address your comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why twice especially if the timestamp isn't changing?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should close the metrics object as is done above.

now);
if (sendInOrder) {
// Mute all the partitions drained
for (Map.Entry<Integer, List<RecordBatch>> entry : batches.entrySet()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: doesn't look like you're using the key, maybe you could use batches.values()?

private final Time time;
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
private final Set<TopicPartition> muted;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a comment why this doesn't have to be thread-safe (unlike incomplete and batches, for example). In fact, it would be good to group the fields that are not thread-safe together (drainIndex seems to be the other one).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants