Change RealtimeIndexTask to use AppenderatorDriver#5261
Change RealtimeIndexTask to use AppenderatorDriver#5261jihoonson merged 12 commits intoapache:masterfrom
Conversation
|
@kevinconaway thanks for the contribution! I'll do my code review shortly. BTW, please check the CI failure (https://travis-ci.org/druid-io/druid/jobs/329503371). |
|
Thanks! I forgot about to run the forbidden API check locally. |
|
The build failures here seem unrelated to my commit: https://travis-ci.org/druid-io/druid/builds/329658417?utm_source=github_status&utm_medium=notification |
|
@jihoonson will you have time to review this PR this week? |
|
Sure, I'll review in a few days. In the meantime, please check the CI failure. It looks related to this PR. |
…ime-indextask-appenderator
…a race condition where the handoff timeout expires before the segment is published
jihoonson
left a comment
There was a problem hiding this comment.
@kevinconaway I did my first review. Also please fix the conflict.
| } | ||
| } | ||
|
|
||
| private Appenderator newAppenderator( |
There was a problem hiding this comment.
I don't mind making these static but will you please help me understand why you prefer them to be static versus instance methods? That will help me conform to your style in the future
There was a problem hiding this comment.
Well, I don't have a strong opinion here. It's better for me to read the code because I can expect that any member variables or methods of the class are not used inside in static methods. It's up to you.
| ); | ||
| } | ||
|
|
||
| private AppenderatorDriver newDriver( |
|
|
||
| @JsonIgnore | ||
| private volatile Plumber plumber = null; | ||
| private final Queue<ListenableFuture<SegmentsAndMetadata>> pendingPublishes; |
There was a problem hiding this comment.
Should we track all segments in the pendingPublish state? It looks like it would be fine by tracking only the segments in the pendingHandoff state.
There was a problem hiding this comment.
I originally had it that way but it introduces a race condition when the user specifies a handoff timeout.
When the user specifies a handoff timeout, its possible that the handoff timeout expires before the segment is published. That was why RealtimeIndexTask#testHandoffTimeout was intermittently failing on travis.
If this isn't a concern, I can refactor that away and modify the test.
There was a problem hiding this comment.
Race conditions should be fixed, but I'm not sure where it can exist. Would you elaborate more on this?
What I first thought was keeping all segments in the pendingHandoff state like below.
pendingHandoffs.add(
Futures.transform(
driver.publish(publisher, committerSupplier.get(), Collections.singletonList(sequenceName)),
(AsyncFunction<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> driver.registerHandoff(segmentsAndMetadata)
)
);There was a problem hiding this comment.
The race condition was in RealtimeIndexTask#testHandoffTimeout. That test expects the handoff to timeout but still have the segments be published.
If you wait on the combined (publish + handoff) future with a timeout, its possible that the timeout expires before the segment is published
| private final Queue<ListenableFuture<SegmentsAndMetadata>> pendingHandoffs; | ||
|
|
||
| @JsonIgnore | ||
| private volatile AppenderatorDriver driver = null; |
There was a problem hiding this comment.
Can be moved to run() as a local variable.
| private final Queue<ListenableFuture<SegmentsAndMetadata>> pendingPublishes; | ||
|
|
||
| @JsonIgnore | ||
| private final Queue<ListenableFuture<SegmentsAndMetadata>> pendingHandoffs; |
There was a problem hiding this comment.
If we don't have to keep pendingPublishes, then this can be a List<ListenableFuture<SegmentsAndMetadata>> by letting only the main thread update this list like below.
private void publishSegments(
TransactionalSegmentPublisher publisher,
Supplier<Committer> committerSupplier,
String sequenceName
)
{
...
pendingHandoffs.add(
driver.publishAndRegisterHandoff(publisher, committerSupplier.get(), Collections.singletonList(sequenceName))
);
}| "Cannot acquire a lock for interval[%s]", | ||
| interval | ||
| ); | ||
| final VersioningPolicy versioningPolicy = interval -> { |
There was a problem hiding this comment.
AppenderatorDriver uses an ActionBasedAllocator (https://github.com/druid-io/druid/pull/5261/files#diff-917318f978c8bd3d86decbfdc0166c59R660) which internally acquires a lock for the interval of the allocated segment. If this versioningPolicy is for acquiring a lock, it is not needed.
BTW, I could see that your motivation was to support user-defined versioningPolicy in RealtimeIndexTask (https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/druid-user/AATjuCUFS0E/36OIMaJjAAAJ). This also makes me feel that we need to remove this.
There was a problem hiding this comment.
I left the versioningPolicy as it was with the previous incarnation of RealtimeIndexTask, the only change I made was to clean up the anonymous inner class by converting it to a lambda.
I'm happy to remove this if it isn't needed but I didn't feel comfortable removing it without the background on why it was there in the first place
There was a problem hiding this comment.
Any Druid's index tasks should acquire a lock for a pair of (dataSource, interval) before writing some data to a segment. This is to prevent multiple tasks from writing for the same interval at the same time.
RealtimeIndexTask was using a VersioningPolicy which is responsible for acquiring a lock for the given interval. This might make sense because the segment version is decided from the acquired lock. However, I feel it's sort of hacky because this VersioningPolicy encompasses some side effects like communicating with overlords.
In addition, this VersioningPolicy internally uses LockAcquireAction which always acquires a lock of the new version. This means, if VersioningPolicy is called for a late arrival, LockAcquireAction returns a new version for the interval of that late arrival, thereby eventually overshadowing all other segments ingested to the same interval so far. SegmentAllocateAction can handle this problem properly and returns a new segmentId for late arrivals which doesn't overshadow other segments ingested by the same task.
BTW, does this PR also targets to support ingestion of late arrivals? I guess it does from this discussion (#4774 (comment)). If so, please add a test for it.
| }); | ||
| } | ||
|
|
||
| private String makeSequenceName(int sequenceNumber) |
| final Committer committer = committerSupplier.get(); | ||
| final CountDownLatch persistLatch = new CountDownLatch(1); | ||
| plumber.persist( | ||
| driver.persist( |
There was a problem hiding this comment.
Segments should be published and handed off as well as being persisted. driver.publishAndRegisterHandoff() will do all needed things.
There was a problem hiding this comment.
Please add a test to check the remaining segments at this point are published well.
There was a problem hiding this comment.
I'm not sure thats the correct behavior. From my understanding, this block will only be executed when the task is "stopped gracefully". In that case, I believe the behavior is to persist the state as is with the assumption that the task will be resumed or restarted.
In the happy path case, the segments will have already been persisted. If the task exists with an exception, this block won't get executed.
Thoughts? If my analysis is correct, I can try to make the behavior more explicit in the code
There was a problem hiding this comment.
Below is the current behavior:
https://github.com/druid-io/druid/blob/80419752b53b1fe27cc59e6e07a0e8e89cfe5869/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java#L444
It only calls plumber.finishJob() if the task was not "gracefully stopped"
There was a problem hiding this comment.
Yeah, you're correct. But, why is publishing segments moved from the finally block to the try block? It makes me confused.
|
@gianm @kevinconaway |
|
I'm fine with that if that is the direction that you all would like me to go. |
|
I agree. Actually AppenderatorDriverRealtimeIndexTask has new requirements which were not supported by RealtimeIndexTask. I'm fine by adding a new taskType rather than changing the existing one. |
|
Sounds good, I'll update the PR to add a new task type
What do you mean here? Are you talking about the ability to ingest late arriving data and append to existing segments, or something else? |
|
Are you OK with this new class sharing the |
…rt changes to RealtimeIndexTask
…ime-indextask-appenderator
…rt changes to RealtimeIndexTask
@kevinconaway yes, that's exactly what I meant.
I'm fine if the existing RealtimeIndexTask and the new one share exactly same configurations. |
|
I've updated the PR to include a new task, I applied the changes that I had made to |
jihoonson
left a comment
There was a problem hiding this comment.
Thanks for the quick update. I have some more comments.
windowPeriod,versioningPolicy,rejectionPolicyFactory,persistThreadPriority, andmergeThreadPriorityare not used inAppenderatorDriverRealtimeIndexTask. Please log a warning if they are set withAppenderatorDriverRealtimeIndexTask.- RealtimeIndexTask is mainly used by Tranquility. Do you also have a plan to update Tranquility to support this new task type?
- Have you had a chance to test in some production environments?
| } | ||
| } | ||
|
|
||
| if (!gracefullyStopped) { |
There was a problem hiding this comment.
I think this and https://github.com/druid-io/druid/pull/5261/files#diff-43f02f36e4f32ed1d7052b97e9e2fc52R409 are for finalizing the task, and if so, it should be more clear if they are in the same place. I think https://github.com/druid-io/druid/pull/5261/files#diff-43f02f36e4f32ed1d7052b97e9e2fc52R409 doesn't have to be in the finally block and can be moved to here. Also, it would be great for reading code if you can split the long code into some smaller methods.
There was a problem hiding this comment.
Also, it would be great for reading code if you can split the long code into some smaller methods.
Sure, I can do that. I had hewed to the same structure and layout as RealtimeIndexTask because I thought that they might be easier to review since you all are probably familiar with how that task works. I'll refactor it.
| } | ||
|
|
||
| @Test(timeout = 60_000L) | ||
| public void testLateData() throws Exception |
There was a problem hiding this comment.
Would you explain how this tests late arrivals?
There was a problem hiding this comment.
Sure. It basically adds two input rows. One "now" and one "now - 2 days". It then tests that both events were added and the data is queryable
firehose.addRows(
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"),
// Data is from 2 days ago, should still be processed
ImmutableMap.of("t", now.minus(new Period("P2D")).getMillis(), "dim2", "bar", "met1", 2.0)
)
);
// ....
// Do some queries.
Assert.assertEquals(2, sumMetric(task, null, "rows"));
Assert.assertEquals(3, sumMetric(task, null, "met1"));
👍
No, we don't really use Tranquility so I'm not as familiar with the code base (or Scala for that matter).
We have been running a previous incarnation of this change (#4774) for months now without any issues. I've been testing this locally and in our staging environment and its performing identically thus far. |
|
Hi @kevinconaway, would you let me know when you're available for finishing this PR? I'm asking because #5297 is a blocker issue for 0.12.0 which will cause conflicts with this PR. I think it would be easier for me to resolve the conflicts because I reviewed this PR, so I'm waiting for this issue to be merged. |
|
I will submit the changes tomorrow, is that OK? |
|
Yes, it is. Thanks! |
…exTask. Combine publish and handoff futures in to single future
|
I've made some readability improvements. I also combined the publish + handoff in to a single future per your request. Please let me know what else I can change. Thanks! |
jihoonson
left a comment
There was a problem hiding this comment.
@kevinconaway thanks for the quick fix. I left more comments. Please consider them.
| } | ||
| } | ||
| }; | ||
| final DataSegmentAnnouncer lockingSegmentAnnouncer = createLockingSegmentAnnouncer(toolbox); |
There was a problem hiding this comment.
You can use the segmentAnnouncer of TaskToolbox. Segment announcing is required to support query processing on realtime data, but similar to the versioningPolicy, locking is not necessary for announcing segments due to the same reason we talked here. KafkaIndexTask is a good example showing how a realtime task acquires locks, allocating and announcing segments, etc.
There was a problem hiding this comment.
If I remove the "locking segment announcer", the AppenderatorDriverRealtimeIndexTaskTest#testRestore test fails with:
Caused by: io.druid.java.util.common.ISE: Segments not covered by locks for task: index_realtime_test_ds_0_2018-02-03T00:57:06.929Z_mohhpfgo
at io.druid.indexing.common.actions.TaskActionPreconditions.checkLockCoversSegments(TaskActionPreconditions.java:45) ~[classes/:?]
at io.druid.indexing.common.actions.SegmentTransactionalInsertAction.perform(SegmentTransactionalInsertAction.java:107) ~[classes/:?]
at io.druid.indexing.common.actions.SegmentTransactionalInsertAction.perform(SegmentTransactionalInsertAction.java:47) ~[classes/:?]
at io.druid.indexing.common.actions.LocalTaskActionClient.submit(LocalTaskActionClient.java:64) ~[classes/:?]
at io.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTask.lambda$run$1(AppenderatorDriverRealtimeIndexTask.java:240) ~[classes/:?]
at io.druid.segment.realtime.appenderator.AppenderatorDriver.lambda$publish$139(AppenderatorDriver.java:695) ~[classes/:?]
at com.google.common.util.concurrent.Futures$1.apply(Futures.java:713) ~[guava-16.0.1.jar:?]
at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861) ~[guava-16.0.1.jar:?]
... 3 more
I debugged this a bit and here is what I believe is happening:
- On the first run of the task, the task locks are acquired when the segments are allocated via
SegmentAllocateAction - The first task is stopped gracefully and the segment identifiers are persisted as the metadata
- When the second task runs and restores the metadata, it does not acquire any locks on the restored segment intervals and when the publish happens, the above error happens because there are no locks acquired by the task.
Thoughts? Is that accurate?
There was a problem hiding this comment.
I think this can be resolved by borrowing some ideas from KafkaIndexTest
| warnIfNotNull(tuningConfig.getWindowPeriod(), "windowPeriod is not used by this task"); | ||
| warnIfNotNull(tuningConfig.getVersioningPolicy(), "versioningPolicy is not used by this task"); | ||
| warnIfNotNull(tuningConfig.getRejectionPolicyFactory(), "rejectionPolicyFactory is not used by this task"); | ||
| warnIfNotNull(tuningConfig.getPersistThreadPriority(), "persistThreadPriority is not used by this task"); |
There was a problem hiding this comment.
tuningConfig.getPersistThreadPriority() returns an integer which cannot be null. This is because this option has a default value.
Probably better to add a new type of tuningConfig which doesn't allow these configurations.
| warnIfNotNull(tuningConfig.getVersioningPolicy(), "versioningPolicy is not used by this task"); | ||
| warnIfNotNull(tuningConfig.getRejectionPolicyFactory(), "rejectionPolicyFactory is not used by this task"); | ||
| warnIfNotNull(tuningConfig.getPersistThreadPriority(), "persistThreadPriority is not used by this task"); | ||
| warnIfNotNull(tuningConfig.getMergeThreadPriority(), "mergeThreadPriority is not used by this task"); |
There was a problem hiding this comment.
Same.
Probably better to add a new type of tuningConfig which doesn't allow these configurations.
There was a problem hiding this comment.
OK, I'll add a new tuningConfing
| AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); | ||
|
|
||
| if (addResult.isOk()) { | ||
| if (addResult.isPersistRequired()) { |
There was a problem hiding this comment.
addResult.isPsersistRequired() always returns false because driver.add(inputRow, sequenceName, committerSupplier) is called which internally allowing automatic persisting.
|
|
||
|
|
||
| if (handoffTimeout > 0) { | ||
| allHandoffs.get(handoffTimeout, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Here, futures are actually the tasks publishing as well as handing off segments. So, the timeout should be publishAndHandoffTimeout rather than handoffTimeout. I suggest to add a new tuningConfig type for adding a new configuration which is only used in this task.
…sk. Revert changes to RealtimeTuningConfig
|
I've updated the PR to add a new |
jihoonson
left a comment
There was a problem hiding this comment.
@kevinconaway thanks! I left one final comment.
| import javax.annotation.Nullable; | ||
| import java.io.File; | ||
|
|
||
| @JsonTypeName("appenderator") |
There was a problem hiding this comment.
It would be good if this is synced with the Jackson type name (index_realtime_appenderator) of AppenderatorDriverRealtimeIndexTask.
There was a problem hiding this comment.
What do you suggest? realtime_appenderator ?
There was a problem hiding this comment.
I suggest index_realtime_appenderator (https://github.com/druid-io/druid/pull/5261/files#diff-df997e60df22e57ef92bff9af75fa9d5R58). This is because users will be easier to remember if they use the same name.
Or I'm ok with changing both type names to realtime_appenderator.
There was a problem hiding this comment.
Do you think there is value in keeping the same pattern as the existing realtime task?
As of now, the RealtimeIndexTask task ID is index_realtime with a corresponding tuningConfig value of realtime. If I kept the same pattern for this new task, the ID would be index_realtime_appenderator (as it currently is) and I would change the tuningConfig to be realtime_appenderator.
Thoughts?
There was a problem hiding this comment.
Sounds good. It looks better to keep the current pattern.
There was a problem hiding this comment.
No problem, updated.
There was a problem hiding this comment.
Thanks for the quick fix!
…e naming pattern as RealtimeIndexTask
|
@jihoonson Not sure why this particular build failed, it seems unrelated |
|
@kevinconaway just restarted the failed job. |
|
@jihoonson Is this particular test known to be flaky? Are you able to get the test output off the build nodes? |
|
@kevinconaway yeah, looks like flaky. The failed task passes in my mac. |
|
OK thanks. What are the next steps for this PR? |
|
I think it's ready for merging. @himanshug do you have further comments? |
|
@jihoonson I'm sorry but I haven't really looked at the patch. Given that it is a new task now that can be tested independently in prod and wouldn't impact existing users. I'm fine with your approval on the PR. Feel free to merge. |
|
@himanshug thanks. I'll merge shortly. |
|
@kevinconaway raised #5355. Please take a look. |
|
@jihoonson Thoughts on backporting this to 0.12? |
|
@kevinconaway basically, I think we don't have to backport this to 0.12 because this is a new feature rather than improving an existing one. However, I think it would be fine if this doesn't cause a lot of conflicts for backporting. More people can test this new task type earlier instead of waiting for a next release or backporting this by themselves. @gianm @jon-wei what do you think? |
The reason I ask is that I'd like to get off our fork and start using this feature when 0.12 is released, rather than waiting for 0.13 |
|
@kevinconaway I'm sorry to say, but we usually backports only bug fixes, so I would say it's difficult to make an exception for this PR without a reasonable reason. We already created 0.12.0-rc3 (https://groups.google.com/forum/#!topic/druid-development/DZx-opThc50) which will be released soon unless another bug is found. |
|
No worries, thanks for following up |
|
Thanks for understanding! |
|
@kevinconaway are you a Tranquility user? I'm curious whether you've looked into modifying Tranquility to make use of this appenderator-based realtime task |
|
@dclim We're not using Tranquility. I don't know Scala well enough to confidently hack on it. The last time I looked at it, it wasn't up to date with even Druid 0.10 but that was some time ago |
|
@kevinconaway cool, thank you for the contribution! |
@gianm This is a follow up PR to the discussion we had on #4774
This PR changes the
RealtimeIndexTaskto use theAppenderatorDriverinstead of theRealtimePlumber. This allows the realtime indexing to behave similar to the kafka indexing service and ingest events from arbitrary time periods without concerns for windowing.Implementation notes:
I added a maxRowsPerSegment configuration to
RealtimeTuningConfig. I defaulted this to 5 million, the same value that is used inIndexTuningConfigWhen maxRowsPerSegment is reached for any segments, all segments are published and handed off in the background. This isn't necessarily ideal because we may end up pushing small segments if there is one large one that reached maxRowsPerSegment
The API of
AppenderatorDriverdidn't support publishing a single segment so I opted to keep this behavior for now. I suppose this is also the way that the KIS behaves so we are consistent there.The above behavior means that segments are not merged and pushed on a time-based interval as they were in
RealtimePlumber.The changes I made to
TaskLifecyleTestare a little hacky but I wasn't sure of the best way to refactor that test. Suggestions welcome