Skip to content

KAFKA-7280: Synchronize consumer fetch request/response handling#5495

Merged
hachikuji merged 3 commits intoapache:trunkfrom
rajinisivaram:KAFKA-7280-consumer-cme
Sep 14, 2018
Merged

KAFKA-7280: Synchronize consumer fetch request/response handling#5495
hachikuji merged 3 commits intoapache:trunkfrom
rajinisivaram:KAFKA-7280-consumer-cme

Conversation

@rajinisivaram
Copy link
Copy Markdown
Contributor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@rajinisivaram
Copy link
Copy Markdown
Contributor Author

@hachikuji I haven't written tests yet, but just wanted to know if this is the right place for synchronization. I was initially thinking of making FetchSessionHandler thread-safe, but then found that there was also a HashMap in Fetcher. So I updated Fetcher instead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be enough to lock on the given instance of the FetchSessionHandler given that's the object that runs into a concurrent modification? This way as I understand we completely evict concurrency from the sendFetches method for a given instance.
Also I would generally prefer using a lock object instead of locking on this. The reason is that this way the synchronized is externalized to the public API (well in this case it is arguable since it's an internals class), + it enables accidental lock stealing, ie. a different class locking on Fetcher.this. I don't know if this is a concern now though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorsomogyi Thanks for the review. There is also a sessionHandlers HashMap. That would need to become a ConcurrentHashMap. I wasn't sure if there was any other state. We do broad locking of the coordinator for thread-safety, I thought the same for Fetcher would be the simplest and safest fix. Since this code is generally single-threaded and locking is only to avoid concurrent access in the heartbeat thread, I am not sure it matters so much. Will wait for @hachikuji 's review and then update if required.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then it's probably ok. I was missing the detail about the sessionHandlers map, but thanks for the heads-up :).

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Thanks for the patch. I guess if we use this approach, then completedFetches may not need to be a ConcurrentLinkedQueue any longer?

I think one of the problems here is that the need for concurrency control is a little obscured. I don't have any great ideas to fix it off the top of my head. Maybe we should just take the brute force approach and synchronize all of the Fetcher APIs (as well as the callbacks). That's pretty much what we ended up doing in AbstractCoordinator. Eventually I hope we can make the consumer more like the producer and the admin client, with all network IO happening in the background.

@rajinisivaram rajinisivaram force-pushed the KAFKA-7280-consumer-cme branch from 7a9ff7c to 203d0f3 Compare September 12, 2018 11:42
@rajinisivaram
Copy link
Copy Markdown
Contributor Author

@hachikuji I have documented the change and also added a test. I left completedFetches as a ConcurrentLinkedQueue since it is accessed by various methods and we should probably synchronize all the public methods in Fetcher if we want to change that. I think full synchronization is not necessary, but I am fine with doing that too to make it more obvious. Let me know what you think.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, Rajini. I just had a minor comment about the test case.

}
}
if (fetcher.hasCompletedFetches()) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to have some assertions which verify fetch progress. Like perhaps we can assert the last fetched offset after we complete fetchesRemaining?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the review, updated the test.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji hachikuji merged commit e24d82e into apache:trunk Sep 14, 2018
hachikuji pushed a commit that referenced this pull request Sep 14, 2018
This patch fixes unsafe concurrent access in the consumer by the heartbeat thread and the thread calling `poll()` to the fetch session state in `FetchSessionHandler`.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
rajinisivaram added a commit that referenced this pull request Sep 17, 2018
This patch fixes unsafe concurrent access in the consumer by the heartbeat thread and the thread calling `poll()` to the fetch session state in `FetchSessionHandler`.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
smccauliff pushed a commit to linkedin/kafka that referenced this pull request Apr 29, 2019
KAFKA-7280; Synchronize consumer fetch request/response handling (apache#5495)

This patch fixes unsafe concurrent access in the consumer by the heartbeat thread and the thread calling `poll()` to the fetch session state in `FetchSessionHandler`.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
smccauliff added a commit to linkedin/kafka that referenced this pull request Apr 29, 2019
KAFKA-7280; Synchronize consumer fetch request/response handling (apache#5495)

This patch fixes unsafe concurrent access in the consumer by the heartbeat thread and the thread calling `poll()` to the fetch session state in `FetchSessionHandler`.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…che#5495)

This patch fixes unsafe concurrent access in the consumer by the heartbeat thread and the thread calling `poll()` to the fetch session state in `FetchSessionHandler`.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants