Conversation
1) Added publishExecutor and handoffExecutor for background publishing and handing segments off 2) Change add() to not move segments out in it
| |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| | ||
| |`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)| | ||
| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| | ||
| |`publishTimeout`|Long|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |
There was a problem hiding this comment.
How does this relate to completionTimeout in ioConfig? Seems like they overlap in functionality and their defaults don't totally make sense together.
There was a problem hiding this comment.
I think there is no need of publishTimeout as if the publish is failing it should be retried infinitely. This is because if one publish fails then subsequent publishes will anyways fail. There is already pending completion timeout config as Supervisor which takes care that the task does not keep on running infinitely. What do you think ?
There was a problem hiding this comment.
It sounds make sense. handoffConditionTimeout also looks not necessary. I marked it as deprecated as well.
| * | ||
| * @param identifier segment to examine | ||
| * @return true if exists. | ||
| */ |
There was a problem hiding this comment.
Is the idea that if containsSegment(identifier) is true then "identifier" should also be the list returned by getSegments()?
There was a problem hiding this comment.
Right. I wanted to avoid copying segments every time, but removed now because it isn't used anymore.
| private final Map<String, NavigableMap<Long, SegmentIdentifier>> activeSegments = new TreeMap<>(); | ||
|
|
||
| // sequenceName -> list of segmentIdentifiers | ||
| private final Map<String, List<SegmentIdentifier>> publishPendingSegments = new HashMap<>(); |
There was a problem hiding this comment.
Could you describe in the comment what this means? Something short like the "segments we're currently adding to" from activeSegments is ok.
| * @return segments and metadata published if successful, or null if segments could not be handed off due to | ||
| * transaction failure with commit metadata. | ||
| * @param segmentsAndMetadata | ||
| * @return |
There was a problem hiding this comment.
Could you fill out @param and @return with some details about how to use this method? It looks like the param is meant to be something returned from publish, and the return is the same with the driver metadata stripped out, and only the caller metadata remaining.
| // wait for handoffMonitor starts | ||
| log.info("Waiting handoff monitor starts"); | ||
| synchronized (waitForHandoffMonitorStart) { | ||
| waitForHandoffMonitorStart.wait(handoffConditionTimeout); |
There was a problem hiding this comment.
Object.wait should be called in a loop -- see https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait--
There was a problem hiding this comment.
Thanks. Removed now.
| "Segment handoff wait timeout. Segments not yet handed off: [%s]", | ||
| Joiner.on(", ").join(appenderator.getSegments()) | ||
| ); | ||
| synchronized (waitForHandoffMonitorStart) { |
There was a problem hiding this comment.
What's the purpose of notifying here and waiting down below?
There was a problem hiding this comment.
I thought there can be a race condition, but removed now.
| Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata()); | ||
| } | ||
|
|
||
| @Test(timeout = 5000L, expected = TimeoutException.class) |
There was a problem hiding this comment.
I prefer to use @Rule ExpectedException and set it right before I expect the exception to be thrown, like this.
expectedException.expect(ParseException.class);
expectedException.expectMessage("Unknown type[class io.druid.hll.HLLCV1]");
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);The reasons are that it helps you be sure the exception got thrown on the right line (instead of something earlier), and you can check more details about the exception, like message and causes and stuff.
There was a problem hiding this comment.
In addition -- there should be more tests here for the incremental handoff case. The existing tests aren't exercising it very well, and it's important to cover.
There was a problem hiding this comment.
I agree on checking the error message as well as the type of exception, but, in this case, TimeoutException is thrown by Future.get() and doesn't contain any message. Is it better to check the error message is empty?
There was a problem hiding this comment.
In the case of no message, I think there's no reason to check the error message.
There was a problem hiding this comment.
Thanks. I added a test for testing incremental handoff.
| final TransactionalSegmentPublisher publisher, | ||
| final Committer committer | ||
| ) throws InterruptedException | ||
| public ListenableFuture<SegmentsAndMetadata> registerHandoff(SegmentsAndMetadata segmentsAndMetadata) |
There was a problem hiding this comment.
Is there a real need to have a handoffExecutor? It seems like this method could be implemented just by the segment handoff notifier callbacks. We could create a settable future here, and then when all of the handoff callbacks have fired, we can set the future from the last callback.
There was a problem hiding this comment.
I am not sure about this method. I think we need to do two things
- Register segment handoff callback with
SegmentHandoffNotifierso that when successful hand off happens appenderator can drop the handed off segments. - Wait for all the segments in the appenderator to be handed off. That can be done by
waitingonhandoffMonitorwhich will get notified every time a callback is done from theSegmentHandoffNotifier. And the check will done in a loop which checks whether there are still any segments present in the appenderator.
There is already code that does both things, it is just that having two methods registerHandoff and waitForHandOff would be useful. I agree with @gianm we do not need any executor for handoff.
There was a problem hiding this comment.
The first method will be called multiple times through out the task duration whenever we are done with some set of segments when maxRowsInSegment limit is hit. waitForHandOff would be used only in the end of the task after all data meant for the task is ingested and before the task finishes. @gianm What do you think ?
There was a problem hiding this comment.
I see a potential problem here. If handoffConditionTimeout is set and publishes are happening in another executor in a way that on publish failure it is being retired infinitely then even though no publish succeeded because of the handoffConditionTimeout a task may finish successfully.
There was a problem hiding this comment.
@gianm right. I removed handoffExecutor.
There was a problem hiding this comment.
Not exactly sure but I believe this code -
https://github.com/druid-io/druid/blob/master/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java#L271 can be used as it is for waitForHandOff
and this -
https://github.com/druid-io/druid/blob/master/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java#L463 for registerHandOff
Also lets see what @gianm has to say
There was a problem hiding this comment.
Hmm, is it required to call get() on ListenableFutures which are the results of registerHandoff() anyway because that's the only way to detect failures during Appenderator.drop()? I think this should be detected because unnecessary segments which are not dropped due to failures will consume the local storage eventually.
There was a problem hiding this comment.
Also lets see what @gianm has to say
Could you elaborate a bit more on how you imagine the Kafka task using the proposed registerHandoff and waitForHandoff methods? I think I don't really understand it enough to comment right now.
Hmm, is it required to call get() on ListenableFutures which are the results of registerHandoff() anyway because that's the only way to detect failures during Appenderator.drop()?
The driver currently suppresses those failures anyway -- it turns them into alerts and log messages. Although maybe it makes sense to bubble them up and fail the task, on the grounds that will cause the task to get restarted by the supervisor on a maybe-healthier node. I'm not really sure what's best.
There was a problem hiding this comment.
I was thinking that KafkaIndexTask will call registerHandoff multiple times through out the task duration whenever some set of segments for a sequenceName hits the maxRowsInSegment limit and they are published. waitForHandOff will be called only in the end of the task after all publishes have succeeded and before the task finishes, the task will block on this call till handoffConditionTimeout. Let me know if more clarification is required.
There was a problem hiding this comment.
Although maybe it makes sense to bubble them up and fail the task, on the grounds that will cause the task to get restarted by the supervisor on a maybe-healthier node. I'm not really sure what's best.
I think this is a good idea. I added more tests for failures.
|
@jihoonson, the api generally looks good to me, looks like the general idea is to open up some of the previously internal functionality. Let's see what @pjain1 thinks too. Are we going to need a method in FiniteAppenderatorDriver to return a list of the active sequenceNames? Currently, the KafkaIndexTask "knows" what it is because it's provided by the supervisor as baseSequenceName and it just adds a partition number. But if it's cycling through lots of them then it might need some more help. |
|
Sorry I missed this one, will have a look today or tomorrow. |
| final SegmentsAndMetadata published; | ||
| final long publishTimeout = ingestionSchema.getTuningConfig().getPublishTimeout(); | ||
| if (publishTimeout == 0) { | ||
| published = driver.publish( |
There was a problem hiding this comment.
I think as per the current KafkaIndexTask implementation the task should just block until publish succeeds
There was a problem hiding this comment.
Thanks. I removed.
1) Remove publishTimeout for KafkaIndexTask 2) Simplifying registerHandoff() 3) Add increamental handoff test
@gianm would you elaborate on how you are thinking to use active sequenceNames for KafkaIndexTask? I'm not sure how it is helpful. |
I was thinking that KafkaIndexTask has to call |
Ah, I see. But, isn't KafkaIndexTask the one which is responsible for managing and updating sequenceNames for incremental handoff? It will be probably possible by passing its sequenceNames to the Committer's metadata like below. final Map<Integer, String> sequenceNames = Maps.newHashMap(); // a map of partition -> sequeceName
...
return new Committer()
{
@Override
public Object getMetadata()
{
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new KafkaPartitions(
ioConfig.getStartPartitions().getTopic(),
snapshot
),
"sequenceNames",
sequenceNames
);
}
@Override
public void run()
{
// Do nothing.
}
};
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceName,
committerSupplier
);
if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
driver.publishAndRegisterHandoff(publisher, committerSupplier.get(), ImmutableList.of(sequenceName));
// Update published sequenceName
sequenceNames.put(record.partition(), newSequenceName);
}
} |
|
@gianm I have similar thoughts as @jihoonson that sequenceName would be generated by the KafkaIndexTask. I was thinking that there will be a baseSequenceName passed by the Supervisor and to this a monotonically increasing number can be suffixed, incremented on each checkpoint. The task will maintain and persist metadata about all sequenceNames that exist and state of segments for each sequenceName whether the segments are open (meaning that the task can get events before the checkpoint for this sequenceName) or not. When all the segments for a sequenceName are published then the metadata about that sequenceName can be removed from the task. During task restore, if state of any sequenceName is not open then the task will start publishing segments for that sequenceName. Did you had something else in mind ? |
|
Ah, I forgot that KafkaIndexTask can layer in its own metadata! Nevermind what I said then, I think you both have the right idea. |
|
@jihoonson that's all from my side |
|
Design LGTM |
|
@pjain1 does that mean you're +1? Just need one more design review then. |
|
@gianm I am not sure about what has been decided about the sequenceName metadata. Is the Driver going to maintain and persist it or the task ? Apart from this, the design that we have decided on looks good to me. |
|
@pjain1 task will maintain the sequenceNames metadata. I think your detailed description should work. |
|
ok. I am good with the design. 👍 |
| publisher, | ||
| committerSupplier.get(), | ||
| sequenceNameToShardSpecMap.keySet() | ||
| ).get(ingestionSchema.getTuningConfig().getPublishTimeout(), TimeUnit.MILLISECONDS); |
| ListenableFuture<?> drop(SegmentIdentifier identifier); | ||
|
|
||
| /** | ||
| * Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only |
There was a problem hiding this comment.
'synchronously will the call' -> 'synchronously with the call'?
|
design 👍 |
* Early publishing segments in the middle of data ingestion * Remove unnecessary logs * Address comments * Refactoring the patch according to #4292 and address comments * Set the total shard number of NumberedShardSpec to 0 * refactoring * Address comments * Fix tests * Address comments * Fix sync problem of committer and retry push only * Fix doc * Fix build failure * Address comments * Fix compilation failure * Fix transient test failure
* Early publishing segments in the middle of data ingestion * Remove unnecessary logs * Address comments * Refactoring the patch according to apache#4292 and address comments * Set the total shard number of NumberedShardSpec to 0 * refactoring * Address comments * Fix tests * Address comments * Fix sync problem of committer and retry push only * Fix doc * Fix build failure * Address comments * Fix compilation failure * Fix transient test failure
* Refactoring Appendertor Driver (apache#4292) * Rename FiniteAppenderatorDriver to AppenderatorDriver (apache#4356) * Add totalRowCount to appenderator * add localhost as advertised hostname (apache#4689) * kafkaIndexTask unannounce service in final block (apache#4736) * warn if topic not found (apache#4834) * Kafka: Fixes needlessly low interpretation of maxRowsInMemory. (apache#5034)
Related to #4178 and #4238.
The key changes are
publishTimeoutis newly added toTuningConfigfor IndexTask and KafkaIndexTask.I didn't rename FiniteAppenderatorDriver to AppenderatorDriver yet because it will make difficult to figure out the changes made in this pr. I'll rename it before merging to master.
@gianm @pjain1 please review this patch.
This change is