Early publishing segments in the middle of data ingestion#4238
Early publishing segments in the middle of data ingestion#4238gianm merged 23 commits intoapache:masterfrom
Conversation
|
@jihoonson, one thing to keep in mind is that users will probably want an option for whether to "guarantee" rollup or not. If it's guaranteed then the index task shouldn't publish early, and it probably needs to stick with two passes. If rollup is not guaranteed then we can take some short-cuts. I haven't studied the patch yet so I'm not sure if this is addressed or not. Sorry for commenting without reading but I just wanted to get that comment out :) |
|
@gianm thanks for your comment. I missed that point. I'll update this patch soon. |
|
@gianm btw, with this patch, the rollup is guaranteed when |
|
It's still nice to have a way to guarantee rollup even when |
| */ | ||
| Object startJob(); | ||
|
|
||
| long getPersistedBytes(); |
There was a problem hiding this comment.
let's add javadoc for this new method
| * hash-based partitioning). In the future we may want to also support single-dimension partitioning. | ||
| */ | ||
| private Map<Interval, List<ShardSpec>> determineShardSpecs( | ||
| private ShardSpecs determineShardSpecs( |
There was a problem hiding this comment.
Should this method documentation be updated since it can now return linear shard specs when fixedNumPartitions is false?
| * @param row input row | ||
| * @return a shardSpec | ||
| */ | ||
| ShardSpec getShardSpec(Interval interval, long timestamp, InputRow row); |
There was a problem hiding this comment.
Do you foresee any situations where the timestamp here would be different from the row timestamp?
If not, maybe the timestamp parameter could be dropped and read from the row instead.
There was a problem hiding this comment.
Right. I removed the timestamp parameter.
| private boolean moveSegmentOut( | ||
| final String sequenceName, | ||
| final List<SegmentIdentifier> identifiers, | ||
| boolean ignoreAbsent |
There was a problem hiding this comment.
Can you elaborate more on why ignoreAbsent is needed? It wasn't clear to me why this is needed at the call site with 'true'.
There was a problem hiding this comment.
When moving segments out after publishing them, some of the published segments may be already moved out from activeSegments due to the maxRowsPerSegment limit. I added a comment.
|
|
||
| /** | ||
| * Add a row. Must not be called concurrently from multiple threads. | ||
| * Add a row. This method may internally incur persisting data added so far. Also, if too large data are persisted, |
There was a problem hiding this comment.
I wonder if any problems could arise from introducing this blocking segment publishing step to add() (uploading segments could take a while).
If so, would it be feasible/better to do the segment publishing asynchronously (maybe complicated since you could still run into disk space issues unless the add/drain rates are managed effectively, but would allow the input to continue to be processed while the segment push to deep storage is occurring).
There was a problem hiding this comment.
Currently, IndexTask is the only place where allows publishing segments in add(), and it is expected to use batch firehoses. So, I think usually there should be no issues. The one thing rose in my mind is that http connections can be expired if the underlying firehose is based on http and tries to keep the http connections during the whole ingestion step. But, this also can be avoided by using ReplayableFirehose or PrefetchableTextFilesFirehose introduced at #4193.
There was a problem hiding this comment.
@jihoonson what do you think about an alternate design:
- Change
addtoAddResult add(InputRow row, String sequenceName, Supplier<Committer> committerSupplier)where AddResult includes a SegmentIdentifier and something like numRowsAdded. - Change
finishtopublishand have it return aFuture<SegmentsAndMetadata>instead ofSegmentsAndMetadata - Make the task responsible for deciding when to call
publish(instead ofaddlike this patch has). The index task would make its decision based on numRowsAdded from AddResult. - Rename the class from FiniteAppenderatorDriver to AppenderatorDriver. (It's no longer restricted to a finite stream!)
The idea behind this change is to make the AppenderatorDriver a good foundation for all kinds of incremental publishing, not just this one but also realtime incremental publishing like #4178. The Future return from publish is good because it prevents blocking realtime ingestion, and making the task responsible for decision about when to incrementally publish is good because the Kafka task will have some more complex logic around making sure all the replicas are in sync.
There was a problem hiding this comment.
@gianm @pjain1 @dclim sounds good to me, but i have a question.
Change
finishtopublishand have it return aFuture<SegmentsAndMetadata>instead ofSegmentsAndMetadata
here, which operation results in Future<SegmentsAndMetadata>? More precisely, you mean the new publish() should return a future for segment handoff? or pushing segments? Currently, push() is an async operation which returns a future for pushing segments, while publish() is an atomic sync operation which writes metadata to the underlying dbms.
gianm
left a comment
There was a problem hiding this comment.
Some of the goals here are similar to #4178 so I'm thinking we should consider the design of the two PRs together.
@jihoonson, other than the comments in this patch, I had some others in #4227 (comment) about what we lose by publishing early.
Could you please add some documentation to ingestion/tasks.md explaining the different modes of operation of the "index" task, especially:
- pros and cons
- how a user should control the mode
|
|
||
| /** | ||
| * Add a row. Must not be called concurrently from multiple threads. | ||
| * Add a row. This method may internally incur persisting data added so far. Also, if too large data are persisted, |
There was a problem hiding this comment.
@jihoonson what do you think about an alternate design:
- Change
addtoAddResult add(InputRow row, String sequenceName, Supplier<Committer> committerSupplier)where AddResult includes a SegmentIdentifier and something like numRowsAdded. - Change
finishtopublishand have it return aFuture<SegmentsAndMetadata>instead ofSegmentsAndMetadata - Make the task responsible for deciding when to call
publish(instead ofaddlike this patch has). The index task would make its decision based on numRowsAdded from AddResult. - Rename the class from FiniteAppenderatorDriver to AppenderatorDriver. (It's no longer restricted to a finite stream!)
The idea behind this change is to make the AppenderatorDriver a good foundation for all kinds of incremental publishing, not just this one but also realtime incremental publishing like #4178. The Future return from publish is good because it prevents blocking realtime ingestion, and making the task responsible for decision about when to incrementally publish is good because the Kafka task will have some more complex logic around making sure all the replicas are in sync.
|
|
||
| /** | ||
| * Add a row. Must not be called concurrently from multiple threads. | ||
| * Add a row. This method may internally incur persisting data added so far. Also, if too large data are persisted, |
| Object startJob(); | ||
|
|
||
| /** | ||
| * Returns the size of data currently stored in local storage. The persistedBytes is changed when |
There was a problem hiding this comment.
Not strictly true, since persistAll will return before persistedBytes is updated (the persisting happens in another thread). That might lead to persisting more data than we actually want to persist, since we're not counting either the following:
- data in memory in the currently active IncrementalIndex
- data in IncrementalIndexes that are pending persist in the persist thread
I'm not sure what's best to do here. Maybe we can track both rows and bytes and have a trigger to publish if either one exceeds a limit?
There was a problem hiding this comment.
I tried to trace the actual byte size of segments stored both in memory and local storage, but it looks to difficult to estimate the serialized byte size of not serialized objects without serialization.
So, I added another method getTotalRowCount() which returns the total number of rows in memory and disk instead of this one.
Using row numbers as a kind of threshold is not good because operators should estimate proper values by themselves, so I think this should be improved to return number of bytes later.
…nt-early-publish
…nt-early-publish
| ## Sharding the Data | ||
|
|
||
| Druid shards are called `segments` and Druid always first shards data by time. In our compacted data set, we can create two segments, one for each hour of data. | ||
| ### Roll-up modes |
There was a problem hiding this comment.
This new doc section is great.
| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| | ||
| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| | ||
| |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| | ||
| |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](./design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with `appendToExisting` of IOConfig and `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no| |
There was a problem hiding this comment.
It's not clear to me exactly what this means. Is it saying that forceExtendableShardSpecs can't be used with forceGuaranteedRollup? Or just that the combination of appendToExisting and forceExtendableShardSpecs can't be used?
There was a problem hiding this comment.
ah, I just read the next section, and it sounds like this means the combination.
There was a problem hiding this comment.
hmm, now that I read the code, it seems that I misread the docs. The code is checking that neither config is set.
There was a problem hiding this comment.
Changed to "This flag cannot be used with either appendToExisting of IOConfig or forceExtendableShardSpecs"
|
|
||
| On the contrary, in the incremental publishing mode, segments are incrementally published, that is they can be published in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory and disks of the node running that task until the total number of collected rows exceeds `maxTotalRows`. Once it exceeds, the index task immediately publishes all segments created until that moment, cleans all published segments up, and continues to ingest remaining data. | ||
|
|
||
| To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with `forceExtendableShardSpecs` of TuningConfig and `appendToExisting` of IOConfig together. |
There was a problem hiding this comment.
it'd be clearer to say that this can't be used with either appendToExisting or forceExtendableShardSpecs.
|
|
||
| rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd); | ||
| final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; | ||
| rowsCurrentlyInMemory += numAddedRows; |
There was a problem hiding this comment.
This used to be an AtomicInteger but is now a regular int. Are you sure it's safe? Is all the code that uses it required to be run in the same thread, according the the Appenderator contract? If not then please beef up the contract.
There was a problem hiding this comment.
Reverted and beefed up the contract.
| : tuningConfig; | ||
| this.tuningConfig = tuningConfig == null ? new IndexTuningConfig() : tuningConfig; | ||
|
|
||
| if (this.ioConfig.isAppendToExisting() && this.tuningConfig.isForceGuaranteedRollup()) { |
There was a problem hiding this comment.
Avoid warnings on construction, since constructors get called on deserialization in many contexts.
Also, is this useful, given there is another check is the task itself?
There was a problem hiding this comment.
Ah, it looks fine to be removed.
| // Even though IndexTask uses NoopHandoffNotifier which does nothing for segment handoff, | ||
| // the below code is needed to update the total number of rows added to the appenderator so far. | ||
| // See AppenderatorDriver.registerHandoff() and Appenderator.drop(). | ||
| // A hard-coded timeout is used here because the below get() is expected to return immediately. |
There was a problem hiding this comment.
Longer timeout, in case of GC or some other kind of whacky pause longer than expected?
There was a problem hiding this comment.
Changed to 30 seconds
|
@gianm thanks. I addressed your comments. |
| * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used | ||
| * asynchronously. | ||
| * <p> | ||
| * The add, clear, persist, persistAll, and push methods should all be called from the same thread. |
There was a problem hiding this comment.
Hmm, I think these may still be needed. AppenderatorImpl's methods might not work properly if called concurrently from different threads. Also, nothing really needs to do that anyway. So I think we should leave in the restrictions on calling the major methods from the same thread.
There was a problem hiding this comment.
add(), persist(), push() and drop() methods can be called by different threads in this patch. push() and persist() can be called by AppenderatorDriver.publishExecutor, and drop() is called by CoordinatorBasedSegmentHandoffNotifier.scheduledExecutor. Actually, persist() is called by AppenderatorImpl.pushExecutor even before this patch.
There was a problem hiding this comment.
Ah, I see! Ok, I will take one last look then.
There was a problem hiding this comment.
Is something making sure that "add" won't be called while these other methods are being called? What I'm concerned about is preventing a case where the Committer passed to "publish" is out of sync with what's being "add"ed.
I think if we need to relax the constraint that the methods are all called from the same thread, we should at least require they not be called from different threads at the same time. Maybe we can enforce this with synchronization? Otherwise, I'm concerned that commit metadata will get out of sync with the data itself.
Do you have any thoughts on this?
There was a problem hiding this comment.
Btw, the original idea was that the methods that accept Committer objects should all be called from the same thread, because that ensures that the committed metadata on disk is in sync with the data itself.
There was a problem hiding this comment.
@gianm Hmm, I missed it. I think synchronization is not enough because we need to guarantee that the execution order matches the method call order. Otherwise, we need a kind of sorted queue which provides such a guarantee. I think the easiest way is enforcing the methods are called by the same thread as before.
One thing I'm concerned with is changing Appenderator.push() looks inevitable to implement the same retry mechanism of AppenderatorDriver.publish(). All Appenderator.add(), Appenderator.push(), and Appenderator.persist() methods should be called by the same thread calling AppenderatorDriver, and to guarantee this in AppenderatorDriver.publish(), Appenderator.push() should provide the retry mechanism which retries the push until complete because the calling thread of push() should be pushExecutor. I think it makes sense to add a new push() method which takes a max retry count.
What do you think?
There was a problem hiding this comment.
I think synchronization is not enough because we need to guarantee that the execution order matches the method call order. Otherwise, we need a kind of sorted queue which provides such a guarantee. I think the easiest way is enforcing the methods are called by the same thread as before.
Yeah, I think that if it's feasible to require that the methods are called by the same thread, then that's simplest from the point of view of the Appenderator implementation.
I think it makes sense to add a new push() method which takes a max retry count.
You mean Appenderator.push(), and having that method do its own retries? That makes sense to me. The ability to do retries in the Appenderator certainly makes life easier for its callers.
It sounds like it also makes it easier to ensure that the Committer metadata is always in sync with data written to disk, which is essential for correct behavior.
There was a problem hiding this comment.
Thanks. I fixed the sync problem in committed metadata.
Also, I changed AppenderatorDriver.publish() to not retry. Instead, Appenderator does retry itself only for pushing segments. This is because exceptions from other parts are usually IOException while persisting segments or NPE due to logic error, etc and I don't think these exceptions are recoverable by executing multiple times.
…nt-early-publish
|
Fixed conflicts |
|
@gianm would you review again the latest patch? |
gianm
left a comment
There was a problem hiding this comment.
Everything outside of AppenderatorImpl LGTM. Just have a few comments on AppenderatorImpl.
| { | ||
| @Override | ||
| public Object call() throws Exception | ||
| if (persistExecutor != null) { |
There was a problem hiding this comment.
I don't think persistExecutor can be null unless startJob is not called. But, the javadoc for startJob says it should be called before using any other methods.
So I wonder: is this null check really needed? If so then why?
There was a problem hiding this comment.
Oops, meant to say I don't think it can be null. Edited the comment.
There was a problem hiding this comment.
I saw a test failure due to NPE from here, but can't find what it was.. Maybe there were some bugs which are fixed now.
| for (SegmentIdentifier identifier : identifiers) { | ||
| final Sink sink = sinks.get(identifier); | ||
| if (sink == null) { | ||
| throw new NullPointerException("No sink for identifier: " + identifier); |
There was a problem hiding this comment.
new ISE("No sink for identifier: %s", identifier) is slightly more Druid-idiomatic. (This isn't a null reference passed by a caller, so we usually use IllegalStateException for kind of thing rather than NullPointerException)
| mergedFile, | ||
| sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) | ||
| // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types | ||
| final DataSegment segment = RetryUtils.retry( |
There was a problem hiding this comment.
I wonder if the retrying the push should be the responsibility of the pusher, rather than the Appenderator. S3DataPusher already does its own retries, so the retries here are not really helping anything for S3. The pushers are also in a position to potentially be smarter about retries (the S3 one won't retry "access denied" for example; see S3Utils.retryS3Operation; so it can fail faster when retrying would be useless).
What do you think?
There was a problem hiding this comment.
It makes sense and sounds even better. I think its better to make the pushers to retry by themselves in a follow-up pr because there are somethings to do like checking the S3RETRY predicate can be applied to other types of pushers. Maybe it's possible to make the retrying smarter. What do you think?
There was a problem hiding this comment.
Sure, I think it's fine to do it in another pr.
|
@gianm thank you for the review! |
* Early publishing segments in the middle of data ingestion * Remove unnecessary logs * Address comments * Refactoring the patch according to apache#4292 and address comments * Set the total shard number of NumberedShardSpec to 0 * refactoring * Address comments * Fix tests * Address comments * Fix sync problem of committer and retry push only * Fix doc * Fix build failure * Address comments * Fix compilation failure * Fix transient test failure
| final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = new TreeMap<>( | ||
| Comparators.intervalsByStartThenEnd() | ||
| ); | ||
| int thrownAway = 0; |
There was a problem hiding this comment.
This variable is never updated. The if below conditioning of this variable to be positive is dead code. See #7737.
Fix #4227. Additionally, after this patch, IndexTask can be more improved by reading data from Firehose only once. I'll fix it in a follow-up pr.
This change is