Parallelize storage of incremental segments#13982
Parallelize storage of incremental segments#13982abhishekagarwal87 merged 11 commits intoapache:masterfrom
Conversation
|
Thank you for this patch, @PramodSSImmaneni. Do you have some numbers on the time it was taking before and after the change? |
|
Hi @abhishekagarwal87 we were using it on a datasource with about 4000 columns, with the original single thread it would take 3 - 10 seconds to create the incremental segment files. Using 4 or 5 parallelization with this change helped us handle the incoming throughput. |
|
@PramodSSImmaneni , 3 to 10s seems like a reasonable time for a datasource with 4000 columns. It would be nice if you could share some more details on the following points:
If we do realize that parallelization is in fact needed here, it would be an MM/Indexer runtime property rather than a tuning config that has to be passed through all index specs. Users doing an ingestion need not be exposed to this detail. |
|
Side note:
These steps help us understand your issue and solution faster and even helps other people in the community who might be running into similar problems. |
|
@kfaraz The slowness in creation and saving of the incremental index files (because of the large number of columns) was causing ingestion lag to increase continuously and it was falling behind more and more. It would take about 3 to 10 seconds to create the incremental index files after data had been ingested and incremental segment was ready to be persisted (because configured thresholds were reached). This would case the ingestion to fall behind even though there were available cpus on the node. There are multiple segment intervals being ingested at same time so there are multiple index files and these were being saved in a serial fashion. From what I could see there were no dependencies between them and they could be persisted parallelly. We have a mixture of datasources and some are small that don't need this higher degree of parallelism, initially I considered a MM property but then it would apply to all datasources and a larger datasource may be starved while a smaller one is using the extra threads. Having it on a per datasource basis allows it to be configurable. |
|
@PramodSSImmaneni - how many of these incremental indexes were being created in an hour, in your particular case? |
|
@abhishekagarwal87 about 400 - 500 |
|
I have also tested with datasource with a small number of columns, whereas earlier in ingestion we were topping at around 5k thousand rows per second because of segment time spread that was over 24 hours for hourly segment granularity, with 5 persist threads it has been able to ingest at 25k rows per second. |
|
@kfaraz I started a thread on Druid slack in the general channel. Is there another channel that is better suited as there haven't been any comments so far. |
|
@PramodSSImmaneni , We typically use the following channels: #troubleshooting - for assistance in debugging issues |
| Execs.newBlockingThreaded( | ||
| "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", | ||
| maxPendingPersists | ||
| persistThreads, maxPendingPersists |
There was a problem hiding this comment.
I'm wary of the persistExecutor being called by callers assuming it is single-threaded. For example, there's nothing I can see that prevents the close() from occurring while a task is still running. Nothing stands out as obviously broken though. But, I do see comments like this:
// use persistExecutor to make sure that all the pending persists completes before
// starting to abandon segments
persistExecutor
Comments suggest that - if we have multiple threads - we might start processing an abandon request before or while a persist request is in queue. What scenarios/mitigations are there here?
There was a problem hiding this comment.
I see will get back to you on it
There was a problem hiding this comment.
Was looking at this, wouldn't the shutdown on the executor interrupt any running threads anyway. I mean even with single thread the pending persist would not complete. Also when abandoning the segments are not going to get pushed to deep storage and nor would the kafka offsets be committed isn't it. So any pending persists in the queue wouldn't matter.
ektravel
left a comment
There was a problem hiding this comment.
Reviewed the documentation portion of this PR.
ektravel
left a comment
There was a problem hiding this comment.
Documentation changes look good.
|
Merged with latest and fix conflicts. Kindly let me know what else is needed for the MR to be merged. |
…_incremental_persist
580e7a0 to
73df24f
Compare
…_incremental_persist
|
@PramodSSImmaneni , thanks a lot for your patience on this. Please let me know if you are still interested in taking this forward. I'm still a little wary of this change due to reasons that @jasonk000 mentioned. In short, we need to be sure that there are no places in the code which assume Also, in the first iteration, I would prefer if we add this new config only for the mode of ingestion in question here (i.e. Kafka, if I am not mistaken). Please remove the changes for the other modes of ingestion from this PR. If we realize that this has value for other ingestion modes, we will add the config there when needed. |
|
@kfaraz yes I am still interested in taking this forward. I will make changes to restrict it to Kafka. |
|
Thanks, @PramodSSImmaneni , please let me know once you have pushed the latest changes. I will approve and merge the changes after that. Please try to double check the following point as well.
|
b5f3ec2 to
42e8291
Compare
|
@kfaraz reverted changes for Kinesis and kept for Kafka. Merged latest master into it as well. Please see. Also checked again and it doesn't look like any place is relying on it being single threaded, also critical sections like persistHydrant that saves an incremental segment file and creation of commit.json that has list of incremental segments are already protected with locking. |
Description
During ingestion, incremental segments are created in memory for the different time chunks and persisted to disk when certain thresholds are reached (max number of rows, max memory, incremental persist period etc). In the case where there are a lot of dimension and metrics (1000+) it was observed that the creation/serialization of incremental segment file format for persistence and persisting the file took a while and it was blocking ingestion of new data. This affected the real-time ingestion. This serialization and persistence can be parallelized across the different time chunks. This update aims to do that.
The patch adds a simple configuration parameter to the ingestion tuning configuration to specify number of persistence threads. The default value is 1 if it not specified which makes it the same as it is today.
Release note
Key changed/added classes in this PR
StreamAppenderatorAppenderatorImplTheirBazThis PR has: