Skip to content

Kafka Index Task that supports Incremental handoffs#4178

Closed
pjain1 wants to merge 6 commits intoapache:masterfrom
pjain1:kafka_indexing
Closed

Kafka Index Task that supports Incremental handoffs#4178
pjain1 wants to merge 6 commits intoapache:masterfrom
pjain1:kafka_indexing

Conversation

@pjain1
Copy link
Copy Markdown
Member

@pjain1 pjain1 commented Apr 17, 2017

Fixes #4016 and #4177

  • Incrementally handoff segments when they hit maxRowsPerSegment limit
  • Decouple segment partitioning from Kafka partitioning, all records from consumed partitions go to a single druid segment
  • Decouple publishing of segments from waiting for handoff
  • Support for restoring task on middle manager restarts by check pointing end offsets for segments

Design -

  • Introduce DriverHolder that has start and end offsets and thus represents the Kafka records that the FiniteAppenderatorDriver it wraps should consume. Each DriverHolder has a unique sequence name which is concatenation of the baseSequenceName of the task and a suffix of linearly increasing DriverHolder index. Similarly, the persist directory for each holder is concatenation of basePersistDir of the task and the holder index.
  • When a new task starts, a new DriverHolder is created with start and end offsets of the task.
  • All records consumed from assigned partitions are given to a DriverHolder that can handle it.
  • As soon as the number of records consumed hit the maxRowsInSegment limit the task pauses and Supervisor is sent CheckPointDataSourceMetadataAction which pauses all replica tasks, gets the highest offset for all partitions, store these partition offsets as a check point in the metadata store for the baseSequenceName of the task. Then sends the check point to all replica tasks using setEndOffset call with finish query param as false indicating that these are not the final end offsets for the task. The latest DriverHolder in the task updates its local end offsets to the check point and a new DriverHolder is created to handle records beyond this check point.
  • When a DriverHolder has consumed till its end offsets, it is submitted to a persistExecService which persists the driver and then adds the driver to a publishQueue. Another, executor service named publishExecService takes the driver from the publishQueue and tries to push and publish the segments to deep storage and metadata store. After a successful publish the driver is added to handOffQueue and handOffExecService keeps on waiting for completion of hand off of the driver.
  • After ingestion loop is finished a SentinelDriverHolder is submitted to be persisted and published. When this SentinelDriverHolder is done, the task ends.
  • For supporting restore on restart, the DriverHolder list is persisted every time a critical action happens like - a new holder is created, end offsets for a holder is set and a holder is removed from a list.
  • Each new task is sent a list of checkpoints for its baseSequenceName if one exists in the metadata store. This way if some task failed after check-pointing some holders and a new task is created in its place, it can get the same checkpoints.
  • At start of each task the persisted drivers list and checkpoints information is used to restore its state by creating DriverHolders if necessary.

This code is currently running on our metrics cluster for few days without any issues.
Note - We do not run replicas on our metrics cluster. Now running with 2 replicas since last few days.

TODO

  • Add tests
    - Possibly test with replicas on real cluster

@pjain1 pjain1 requested review from dclim and gianm April 17, 2017 18:56
@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented Apr 17, 2017

Unrelated failure

io.druid.indexing.overlord.RemoteTaskRunnerRunPendingTasksConcurrencyTest
testConcurrency(io.druid.indexing.overlord.RemoteTaskRunnerRunPendingTasksConcurrencyTest)  Time elapsed: 61.037 sec  <<< ERROR!
java.lang.Exception: test timed out after 60000 milliseconds
	at java.lang.Thread.sleep(Native Method)
	at io.druid.indexing.overlord.RemoteTaskRunnerRunPendingTasksConcurrencyTest.waitForBothWorkersToHaveUnackedTasks(RemoteTaskRunnerRunPendingTasksConcurrencyTest.java:169)
	at io.druid.indexing.overlord.RemoteTaskRunnerRunPendingTasksConcurrencyTest.testConcurrency(RemoteTaskRunnerRunPendingTasksConcurrencyTest.java:85)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)

@pjain1 pjain1 closed this Apr 17, 2017
@pjain1 pjain1 reopened this Apr 17, 2017
@gianm
Copy link
Copy Markdown
Contributor

gianm commented Apr 25, 2017

@pjain1 I plan to read through this soon. Just from a really brief look though, KafkaIndexTask is 2200 lines now -- wow! It's probably worth looking at breaking this up into smaller files, maybe we could use a layer that sits between KafkaIndexTask and FiniteAppenderatorDriver. Or maybe incorporating the functionality into FiniteAppenderatorDriver, so it's no longer finite but can support handing off multiple times. What do you think?

I might have more specific ideas after reading through…

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented Apr 25, 2017

Actually DriverHolder class is kind of a layer between KafkaIndexTask and FiniteAppenderatorDriver. KafkaIndexTask does not call methods on FiniteAppenderatorDriver nor it creates instances of it directly, it uses methods on DriverHolder so this class can probably be moved to a separate file, that will be around 500-600 lines.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Apr 25, 2017

That'd probably help for understanding the code. I'll have a more specific opinion after reading, but it does sound like a good idea to split that out.

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 3, 2017

@gianm Created a separate file for DriverHolder. KafkaIndexTask.java is around ~1500 lines now.

@pjain1 pjain1 closed this May 3, 2017
@pjain1 pjain1 reopened this May 3, 2017
pjain1 added 4 commits May 3, 2017 15:09
- Incrementally handoff segments when they hit maxRowsPerSegment limit
- Decouple segment partitioning from Kafka partitioning, all records from consumed partitions go to a single druid segment
- Decouple publishing of segments from waiting for handoff
- Support for restoring task on middle manager restarts by check pointing end offsets for segments
Copy link
Copy Markdown
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pjain1. I haven't finished my review yet, but left some comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use SegmentIdentifier instead of String

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think it would be better to move this to DriverHolder because it is the only class which is using this information.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driver.getMaxRowPerSegment() looks that each driver has different maxRowsPerSegment which is actually not. It would be better to use KafkaTuningConfig.getMaxRowsPerSegment() instead and remove driver.getMaxRowPerSegment().

@gianm
Copy link
Copy Markdown
Contributor

gianm commented May 7, 2017

@pjain1 @jihoonson when reading #4238 I see a lot of similarities in goals between the two patches. I'm wondering if some changes to FiniteAppenderatorDriver would make them both doable a little more simply; see this thread: #4238 (comment)

Could that reduce some of the complexity from this patch, maybe making DriverHolders and the extra tracking in KafkaIndexTask unnecessary, since a single AppenderatorDriver could handle multiple handoffs?

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 8, 2017

@gianm I haven't looked at #4238 I'll have a look and let you know. Thanks

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 8, 2017

@jihoonson didn't got chance to look at your comments, will get back to it after taking a look at #4238

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 8, 2017

@gianm I read your comment about design change.

In the new design I am not sure how a single AppenderatorDriver can support multiple active segments for same interval. For example, when maxRowsInSegmentLimit is reached a checkpoint (set of partitionOffset) is decided on by replica tasks. Now when each replica fetches records from Kafka then depending on offset they decide which driver should the record go to, the driver before the checkpoint or the one after that. I am not sure if my explanation is clear enough if not I will try to explain it more.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented May 9, 2017

Hmm. I was thinking that the replicas would sort of chill out for a bit when it's checkpoint time, and nobody would read past the checkpoint until they agree on what it is.

Or, instead of that, would it work to do something like:

  • add has an ability to have a set of active segments for each sequenceName, so we could use a different sequence for each checkpoint block
  • finish / publish would probably need to learn about sequenceNames, so we could tell it just to push the segments for a particular set of sequenceNames.

In this world each checkpoint block would have its own sequenceName but they would all be managed by the same AppenderatorDriver. And AppenderatorDriver, I suppose, would want to track metadata separately for each pending publish.

Would something like that allow us to avoid the need for a checkpoint table and DriverHolders?

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 9, 2017

Sure replicas pause when its the checkpoint time however both replicas may have read till different offsets in different partition. So after checkpoint is decided when they resume reading then some records for some partition may fall into next set of segments (after checkpoint) and some for the current set of segments (before checkpoint).

I thought of the approach that you mentioned of having different sequenceName for set of segments corresponding to a checkpoint, so there will be a map of sequenceName -> activeSegments. In this case as you said, AppenderatorDriver will have to maintain state of map of sequenceName -> activeSegments, state of pending publishes which is essentially what DriverHolder is doing now.

However, a kind of checkpoint table will still be needed because when a replica dies and a new task
is spawned in its place, it needs to replay whatever has happened in the replica, for that it needs to know the checkpoints that have already been agreed on.

BTW task checkpoint can be cleaned up automatically by making a simple change to the Segment published where the checkpoint information is also deleted as soon as corresponding driver has finished publishing its segments.

Having said all this I am totally OK with having a single AppenderatoDriver and getting rid of DriverHolder and task checkPoint table if we make it to work.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented May 9, 2017

I thought of the approach that you mentioned of having different sequenceName for set of segments corresponding to a checkpoint, so there will be a map of sequenceName -> activeSegments. In this case as you said, AppenderatorDriver will have to maintain state of map of sequenceName -> activeSegments, state of pending publishes which is essentially what DriverHolder is doing now.

Ah, that makes sense. It seems possible to me that making AppenderatorDriver the thing that does that would be simpler overall, so only one layer (namely: AppenderatorDriver) needs to worry about how active segments are tracked. Do you think it'd be better or worse to teach AppenderatorDriver how to do that sort of stuff?

However, a kind of checkpoint table will still be needed because when a replica dies and a new task is spawned in its place, it needs to replay whatever has happened in the replica, for that it needs to know the checkpoints that have already been agreed on.

Could the supervisor start off the replacement task from the most recent metadata in the druid_datasources table we already have? It's acting like a checkpoint already, just there's only one of them. I think that should work if we assume that there will be at most one publish happening at a time. This is already assumed by the current system (the supervisor does start new tasks before old ones exit, but only one can be publishing or else there will be txn failures). For the sake of keeping things simple I think it's OK to keep assuming that.

Having said all this I am totally OK with having a single AppenderatoDriver and getting rid of DriverHolder and task checkPoint table if we make it to work.

I hope we can do something like that in order to keep things simple, but if we end up not being able to, that's life. I'd at least try to work it all out though. Thanks for bearing with me.

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 9, 2017

Could the supervisor start off the replacement task from the most recent metadata in the druid_datasources table we already have? It's acting like a checkpoint already, just there's only one of them. I think that should work if we assume that there will be at most one publish happening at a time. This is already assumed by the current system (the supervisor does start new tasks before old ones exit, but only one can be publishing or else there will be txn failures). For the sake of keeping things simple I think it's OK to keep assuming that.

There is a corner case here - Suppose two replicas decided on a checkpoint and then one of the replica dies. Another replica has not consumed till the checkpoint and have not published segments corresponding to the checkpoint, thus dataSourceMetadata does not yet reflect the checkpoint. Supervisor start a replacement task which will miss this checkpoint and at some point when it reaches its maxRowsInSegmentLimit will send a CheckPoint request and wrong things will happen.

I hope we can do something like that in order to keep things simple, but if we end up not being able to, that's life. I'd at least try to work it all out though. Thanks for bearing with me.

No worries, I am fully onboard with making things simple.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented May 9, 2017

There is a corner case here - Suppose two replicas decided on a checkpoint and then one of the replica dies. Another replica has not consumed till the checkpoint and have not published segments corresponding to the checkpoint, thus dataSourceMetadata does not yet reflect the checkpoint. Supervisor start a replacement task which will miss this checkpoint and at some point when it reaches its maxRowsInSegmentLimit will send a CheckPoint request and wrong things will happen.

What sort of wrong things would happen?

I think in the case you're talking about, the supervisor would get a checkpoint request that is "behind" the currently published datasource metadata. Would it help if the supervisor then told the task something like, "hey, you're really behind, you should toss out what you have and start over from here: <insert current metadata>"

@jihoonson
Copy link
Copy Markdown
Contributor

Hmm, I wrote simple codes for checking the new design would work (add and publish). @gianm @pjain1 does it makes sense to you?

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 14, 2017

@jihoonson add looks OK but publish needs to change to understand sequenceName also, thus it can be used to publish segments for a particular sequenceName.

For Kafka Indexing task publish call should never block and from the code it seems like it can block. I would suggest to have an unbounded queue to which publish tasks can be submitted and publishExecutor will keep on taking tasks from the queue and do the publish. Similarly have a queue for handoff tasks as well and handOffExecutor will keep on polling queue to check for handoffs. publishExecutor once done can push tasks to handOffQueue. It is very similar to what I have done in the KafkaIndexTask in this PR. Does this make sense ?

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 15, 2017

@jihoonson Or IMO better option would be to run the publish method in the same thread which calls the method and not use publishExecutor. This way KafkaIndexTask can use an executor to call this method and do the queueing logic that I described in previous comment and IndexTask can just call the method in same thread and block until it finishes. What do you think ?

@gianm This and previous comment are the reasons why we thought it might be better for tasks to decide how to call and handle publish and not to use executor in the AppenderatorDriver. What do you think ?

@jihoonson
Copy link
Copy Markdown
Contributor

For Kafka Indexing task publish call should never block and from the code it seems like it can block. I would suggest to have an unbounded queue to which publish tasks can be submitted and publishExecutor will keep on taking tasks from the queue and do the publish. Similarly have a queue for handoff tasks as well and handOffExecutor will keep on polling queue to check for handoffs. publishExecutor once done can push tasks to handOffQueue. It is very similar to what I have done in the KafkaIndexTask in this PR. Does this make sense ?

@pjain1 ah right. I missed changing it to be unbounded. publishExecutor with an unbounded capacity sounds good to me, but for handOffExecutor, IndexTask does not need to wait for hand off. Currently it is using NoopSegmentHandoffNotifierFactory to use the same code path of AppenderatorDriver, but it would be good if we can avoid.
This is why I added a new method called waitForHandoff() in the sample implementation. It allows the callers of publish() can decide to call waitForHandoff() subsequently or not.

I'm still difficult to understand what the benefits are when publishExecutor (and handOffExecutor) is in KafkaIndexTask rather than AppenderatorDriver. I think it would be good to use in KafkaIndexTask if AppenderatorDriver returns a ListenableFuture which supports completion callbacks. In this world, the callers of AppenderatorDriver doesn't have to worry about handling threads individually.

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 16, 2017

@jihoonson Execs.newBlockingSingleThreaded("publish-%d", 0) will create an executor with SynchronousQueue which is blocking and not unbounded. Am I missing anything ? Ultimately publishExecutor should be a single threaded non blocking executor working off of an unbounded queue. Does this make sense ?

@jihoonson
Copy link
Copy Markdown
Contributor

@pjain1 ah right. It should be some reasonable large value k >> 0 which doesn't exceed VM limit. I think it should be configurable. Does it make sense?

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 17, 2017

By k you mean the queue capacity ? If k is set to a large number then why not just use Execs.singleThreaded where the queue is unbounded, is there any problem ? Unless it should be bounded to a some small number in case of IndexTask then it makes sense for it to be configurable.

I guess I am OK with both approach, having it configurable or using Execs.singleThreaded. Its your call.

@jihoonson
Copy link
Copy Markdown
Contributor

Ah right. I didn't notice the Execs.singleThreaded() method. It makes more sense.
Does it make sense to use ListenableFuture to you? If so, I'll raise a new PR for the refactoring of AppenderatorDriver. After that, I think we can proceed this PR and #4238 individually.

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented May 17, 2017

I think it is ok for it to return ListenableFuture. Thanks

@jihoonson
Copy link
Copy Markdown
Contributor

Thanks. I think it will take about 2 days. I'll ping you when I raise the PR.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented May 23, 2017

@pjain1 what do you think of #4292?

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jun 1, 2017

@pjain1, thanks for reviewing #4292. Since it's merged now, I think we could continue with this PR. Please let us know when you're ready for another review!

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Aug 16, 2017

Hi @pjain1, I was wondering, are you still working on this? It would still be useful!

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Aug 16, 2017

And do you think we could/should mark this for 0.11?

@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented Aug 16, 2017

@gianm Yes I am still working on it. I am done with the code, working on testing it. I think it can go in 0.11

@gianm gianm added this to the 0.11.0 milestone Aug 16, 2017
@gianm
Copy link
Copy Markdown
Contributor

gianm commented Aug 16, 2017

Thank you @pjain1. I marked the milestone as 0.11.0.

@himanshug
Copy link
Copy Markdown
Contributor

resumed in #4815

@himanshug himanshug closed this Nov 13, 2017
@pjain1
Copy link
Copy Markdown
Member Author

pjain1 commented Nov 13, 2017

Closing in favor of #4815

@pjain1 pjain1 deleted the kafka_indexing branch March 28, 2020 14:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Kafka Indexing Service] Decouple Druid segment partitioning from Kafka partitioning

5 participants