add control and status endpoints to KafkaIndexTask#2730
add control and status endpoints to KafkaIndexTask#2730gianm merged 1 commit intoapache:masterfrom dclim:kafka-index-task-endpoints
Conversation
There was a problem hiding this comment.
nit: following is more desired in java land if you wanted to iterate through both keys and values in a map
for (Map.Entry<Integer, Long> entry : offsets.entrySet())
|
👍 after #2730 (comment) |
There was a problem hiding this comment.
This doesn't look like it does what a stopGracefully should do. Maybe we should rename that method to something like suspend (not in this PR though). But, what it's supposed to do is suspend a task quickly so the jvm can be shut down and then restarted. Specifically it shouldn't be trying to publish or anything like that – just persist to disk and then stop.
There was a problem hiding this comment.
I can swap the implementations of stopGracefully and stopAbruptly since the abrupt one does what you described.
|
@gianm thanks for the comments, I added some follow-up questions/thoughts |
|
@gianm addressed your comments |
There was a problem hiding this comment.
status should start at something – perhaps NOT_STARTED or some such
There was a problem hiding this comment.
How is status kept consistent? Are all writes going through a single thread, or are they protected by a lock? Would be good to have that in a comment.
There was a problem hiding this comment.
Yes, status is only modified by the task runner thread which calls run(). I added a comment.
|
👍 after #2730 (comment) |
There was a problem hiding this comment.
why we stop reading altogether after one partition was done and pauseAfterRead was set? wouldn't this terminate the task earlier than we wanted.
There was a problem hiding this comment.
If pauseAfterRead is true, stillReading will remain true regardless of the state of assignment - this will ensure we continue to loop and will pause in possiblyPause() if all partitions have been read. If pauseAfterRead is false, the loop will end as soon as all the partitions have been read.
There was a problem hiding this comment.
ok, so pauseAfterRead ensures that task does not stop even after completion of consuming all partitions upto given offsets... and possibly supervisor would make requests to stop and publish later.
|
👍 besides #2730 (comment) |
|
Thanks! |
Added the following HTTP endpoints to KafkaIndexTask:
These endpoints are required for implementing functionality for #2656, specifically:
(1) exposes the offset currently being read and allows us to start the next tasks early
(4) allows us to stop redundant replicas when another has completed without generating a failed status
(all) allows us to stop tasks gracefully in a coordinated way across replicas (by pausing and then setting the end offsets so that they all finish at the same place) - this enables graceful immediate schema rollover as well as time-based task lifetimes instead of number of event based
(all) in conjunction with a pauseAfterRead flag in KafkaIOConfig, provides a framework to enable leader/follower capabilities, where followers will pause when they are done reading their offsets until the leader assigns a new set of ending offsets