Improve parallelism of zookeeper based segment change processing#7088
Improve parallelism of zookeeper based segment change processing#7088leventov merged 22 commits intoapache:masterfrom
Conversation
|
Overview of the main changes:
For now, I have left the CuratorBasedLoadQueuePeon class as it is. We can either get rid of it or we can introduce an additional config which controls which version of Curator based peon to use. Right now I have hardcoded to use CuratorBasedLoadQueuePeonV2 in LoadQueueTaskMaster.
|
|
We have been testing this patch internally for the last week and we have seen significant performance improvements. For our larger clusters, it takes now 20-30 mins for the cluster to download segments from cold storage (S3 in our case) and be ready to serve them. Without the patch it used to take over 5 hrs for the same set of segments. |
There was a problem hiding this comment.
Why this change? It doesn't seem likely to me that the segment loading will be CPU-bound (it's probably I/O bound, I'd guess).
There was a problem hiding this comment.
Before, numLoading threads config wasn't being used. These threads also do the CPU intensive work of uncompressing the segment and memory mapping them. But you are right, the work is probably more I/O bound than CPU. The change I made was to provide more of a lower bound on minimum number of threads which should be safe for this pool. But I could be swayed to change it back to what it was. FWIW, in our internal setup, I have set the config to 2 * number of CPUs and it has been holding up fine.
There was a problem hiding this comment.
It should be configurable.
There was a problem hiding this comment.
The value is configurable. One can use the config druid.segmentCache.numLoadingThreads for this.
There was a problem hiding this comment.
The documentation is not updated, it still says about 10 threads.
@samarthjain could you please update the documentation with a more elaborate discussion of how (and why) anyone would possibly want to change this configuration, and in which direction?
There was a problem hiding this comment.
Use Execs.multiThreaded. It's shorter, and it makes daemon threads automatically, which are preferred since they don't block JVM shutdown.
There was a problem hiding this comment.
Use Execs.makeThreadFactory to make the thread factory; it's shorter and makes daemon threads by default. Also, please don't commit "TODO" comments.
There was a problem hiding this comment.
Please add documentation for this parameter. Also, maybe it makes more sense under druid.coordinator.loadqueuepeon.curator.createZkNodeNumThreads.
There was a problem hiding this comment.
curator prefix already implies zookeeper, I vote for:
druid.coordinator.loadqueuepeon.curator.numThreads
or if other pool sizes will be configurable too then:
druid.coordinator.loadqueuepeon.curator.numCreateThreads
druid.coordinator.loadqueuepeon.curator.numCallbackThreads
druid.coordinator.loadqueuepeon.curator.numMonitorThreads
Also, it would be more clear if threads were named accordingly to parameter names, ie:
ZKNodeDeletionChecker- -> LoadQueuePeon-Monitor-
There was a problem hiding this comment.
There's no reason to keep CuratorLoadQueuePeon around if you'll be replacing it with CuratorLoadQueuePeonV2 here. This is the only usage. You might as well replace it with the V2 code completely.
There was a problem hiding this comment.
Besides, it's much easier to review changes in the original class than in a new class. I guess the only advantage of creating a new class is to provide an extension/feature which is turned off by default.
There was a problem hiding this comment.
Is this pulled out because child.getData() might fail? In which scenario might that fail? If it's not meant to happen, I'd suggest putting the try back where it came from. If it is meant to happen sometimes, then consider adding a NoNodeException to the catch below (so we can have special handling, like a special error message and avoiding doing the delete).
There was a problem hiding this comment.
The try block was moved up to handle exception that can be thrown by
final DataSegmentChangeRequest finalRequest = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
There was a problem hiding this comment.
@samarthjain please add a comment to code that would make Gian to not ask the question that he asked if the comment was there when Gian was reading this code.
There was a problem hiding this comment.
This got changed to debug in #6741, I think. Please keep it there.
There was a problem hiding this comment.
This got changed to debug in #6741, I think. Please keep it there.
There was a problem hiding this comment.
This could stand to be at log.debug, I think. (In the spirit of #6741 & trying to keep log chatter on the coordinator useful by not being too noisy.)
|
@samarthjain - it's also good to know you tested it in prod 😃 |
egor-ryashin
left a comment
There was a problem hiding this comment.
Could you create a unit test for a new CuratorLoadQueuePeonV2?
There was a problem hiding this comment.
Please specify units (I guess it should be MAX_TASK_WAIT_MILLIS).
There was a problem hiding this comment.
This is not aligned with the Druid code style (all args fit the line or each arg is broken down on its own line).
Please don't use multiline ternary operator.
|
@samarthjain could you please self-review the PR with this checklist? |
There was a problem hiding this comment.
removedNodeMonitorExecutor sounds better
There was a problem hiding this comment.
that class would be better with a builder, though I wouldn't ask changing it now
|
Hi @samarthjain, I just noticed this bug: #7192. I'm not sure the same problem exists in ZK-based parallel segment loading, but it would be great if you can have a chance to check it. |
Thanks for the pointer, @jihoonson. I took a look and I can't say for sure if the bug exists with zk-based loading too. PathChildrenCache, which From what I understand, it is possible that historicals can possibly receive request to download the same segment file more than once? In http based loading could this be because there was a timeout because the historicals were not fast enough to process the segment download request resulting in coordinator making the request to historical to download the same segment file again? And somewhere in this code path, some cleanup is not happening correctly? In parallel zk based loading I have added this check in
|
|
@thanks for the reviews, @gianm, @leventov and @egor-ryashin . I have addressed the review comments. Please take a look again when you get a chance. |
|
@samarthjain thanks for checking. In HTTP segment loading, the coordinator can assign the same segment to the same historical if the historical couldn't load the segment in time. I guess it's intended though I don't think it's good. But, in historical, I think this problem doesn't happen if the coordinator doesn't assign the same segment to the same historical. And it looks that the coordinator doesn't according to your last comment? |
There was a problem hiding this comment.
I guess threads imply concurrency and concurrently is redundant.
BTW, how about more concise
Number of threads creating zk ... to be loaded
There was a problem hiding this comment.
I was following the pattern of descriptions of other configs in this section although I like your suggestion. Changed it to Number of threads creating zk nodes corresponding to segments that need to be loaded or dropped.
There was a problem hiding this comment.
need to be dropped
loaded is confusing here
There was a problem hiding this comment.
It should be configurable.
There was a problem hiding this comment.
getZookeeperRemovedNodeMonitorThreads()
There was a problem hiding this comment.
Renamed to getNumZookeeperMonitorThreads
There was a problem hiding this comment.
getZookeeperNodeCreatingThreads()
There was a problem hiding this comment.
Renamed to getNumZookeeperNodeCreatingThreads()
There was a problem hiding this comment.
I cannot figure out what is the rationale for segment queues and repeated runs, couldn't SegmentChangeProcessors work non-stop taking all from the single queue?
There was a problem hiding this comment.
Every thread gets its own queue of requests. This helps reduce contention between threads trying to call drainTo(Collection).
The idea behind repeated runs was to introduce batching so that there is not a flood of zookeeper nodes. Further, it gives historicals some time to service the segment load/drop requests. Without the batching and delay, it is possible that historicals will never get to complete the process segment request causing coordinator to repeatedly delete and create zk nodes.
There was a problem hiding this comment.
Just wonder how high requests rate we are discussing, 1mln/s? The Coordinator balancing process will choke to halt from such amount long before the discussed queue will show a mere slight latency.
I'm sure a single queue will manage the current request rate of Coordinator, besides threads don't need to call drainTo() as they can perfectly consume calling queue.take().
There was a problem hiding this comment.
1mn/second or higher is likely. Coordinator polls for the segment files from the RDS and then proceeds to call load on them in a loop. For a largish cluster, with lots of data and datasources, it is possible to have more than a million segments.
According to javadocs of BlockingQueue, calling drainTo() is likely more efficient than continuously polling the queue which I think would extend to continuously calling take().
/**
* Removes all available elements from this queue and adds them
* to the given collection. This operation may be more
* efficient than repeatedly polling this queue.
Further, it makes the code simpler since one call can go and fetch the batch worth of records instead of calling take() in a loop.
There was a problem hiding this comment.
CuratorLoadQueuePeon runs on the coordinator.
There was a problem hiding this comment.
Coordinator can read 1mln segments, but it probably does it within minutes (not seconds), moreover for each segment Coordinator should determine the location (which is the most time-consuming part), then Coordinator spreads those segments across a large pool of peons, overall definitely not 1mln/s per peon. I don't like to complicate peon logic with multiple queues considering it doesn't give a noticeable performance boost.
There was a problem hiding this comment.
Fair enough. I have changed it to use a single queue.
There was a problem hiding this comment.
I wonder why we need several SegmentChangeProcessors? Segment download time is "infinitely" greater than it takes for SegmentChangeProcessor to send a request, dropping a segment also requires time due to disk IO, so a single SegmentChangeProcessor can fully saturate historical node disk/network throughput.
There was a problem hiding this comment.
For smaller clusters it may workout to have just one thread create these zk nodes. For larger ones, we probably need to parallelize the requests. It's configurable.
There was a problem hiding this comment.
I took a closer look at the code and I realized that I was making the wrong assumption that there is only one peon running on the coordinator. Instead, every historical node gets its own load queue peon on the coordinator. Considering this, I don't think we need to run multiple tasks per peon for creating zk nodes. Just having one task should be enough. Made this change in the latest commit.
There was a problem hiding this comment.
getCuratorCallbackThreads()
There was a problem hiding this comment.
Renamed to getNumCuratorCallBackThreads()
There was a problem hiding this comment.
getZookeeperNodeCreatingThreads()
There was a problem hiding this comment.
@egor-ryashin What do you think about keeping the "Num" part? This book advocates for using the "numSomething" naming and after reading it I changed my former preference of calling things in plural (like threads) to numThreads.
There was a problem hiding this comment.
If Num is common throughout the project, we can use Num.
BTW, as the book is not free, the direct link looks suspiciously like affiliate marketing.
There was a problem hiding this comment.
The documentation is not updated, it still says about 10 threads.
@samarthjain could you please update the documentation with a more elaborate discussion of how (and why) anyone would possibly want to change this configuration, and in which direction?
There was a problem hiding this comment.
Not according to Druid code style
There was a problem hiding this comment.
I wish Intellij or Eclipse would just format this correctly. Working on multiple projects, it gets difficult to remember coding styles of individual projects. And doesn't seem like checkstyle complained about this either.
There was a problem hiding this comment.
Not according to Druid code style
There was a problem hiding this comment.
@samarthjain please add a comment to code that would make Gian to not ask the question that he asked if the comment was there when Gian was reading this code.
There was a problem hiding this comment.
Why 10? Why not dynamic, like for numLoadingThreads? How and why would one possibly want to change this configuration, in response to what conditions? Documentation should answer these questions.
Same questions about other newly added configuration parameters.
There was a problem hiding this comment.
10 is the default value.
There was a problem hiding this comment.
Yeah, but what is the logic behind it? In the very same PR, you have changed the default number of threads for other thread pool from 10 to N_CPU / 2, why not follow the same logic here?
There was a problem hiding this comment.
https://github.com/code-review-checklists/java-concurrency#threading-flow-model - missing in this class
There was a problem hiding this comment.
I wonder why we need several SegmentChangeProcessors? Segment download time is "infinitely" greater than it takes for SegmentChangeProcessor to send a request, dropping a segment also requires time due to disk IO, so a single SegmentChangeProcessor can fully saturate historical node disk/network throughput.
There was a problem hiding this comment.
I wonder how should the operator calculate the delay and batch size? I think it's complex right now.
For example I understand that
delay*batch_size*avg_segment_size<=node_network_throughput
But I can't figure out what number to select here, as they depend on each other. Probably there should be one parameter like max_node_network_throughput and the Peon keeps sending load request until the limit is hit, then the Peon delays requests until the quota is replenished.
There was a problem hiding this comment.
Good question, @egor-ryashin . I ended up following the pattern we have for http based segment loading. The general idea was to allow historicals long enough to load the segment before coordinator tries to recreate the zookeeper node for it. Keep in mind that segment loading is not purely I/O. It also involves decompressing the segment files and mapping them to off-heap memory. But yes, the general formula to come up with the numbers is what you suggested:
(batch_size * average segment_size)/delay <= factor * node_network_throughput
IMHO, having separate parameters for batch size and delay makes more sense since it would force users to think more about how values to set instead of just setting a number like node_network_throughput. I can also update the documentation with the above formula to aid users. Of course, most of the users won't be configuring this. So having sane default values is important.
There was a problem hiding this comment.
The problem with the formula is that avg_segment_size can vary over time, I used the formula merely to find out if we understand each other well.
having separate parameters for batch size and delay makes more sense since it would force users to think more about how values to set instead of just setting a number like node_network_throughput.
The thing is, batch size and delay indirectly depend on the data being processed, if segments become larger the Druid user should somehow calculate the average segment size then reconfigure the Druid cluster and hope that the average is not skewed in some way, while having a max_network_throughput parameter the user can configure once and forget.
Should I say that the Druid user has to know max_network_throughput using both aforementioned implementations, but with the second imlementation the user has to do much less maintenance work.
There was a problem hiding this comment.
If we adopt max_network_throughput, then we would need to measure the rate at which we are adding segments to the processing queue. It is doable and there are rate limiter implementations out there including in Guava. Having said that, not all of the max_network_throughput configured by user can be directly used for rate limiting purposes. Time is spent not only downloading segments, but also in decompressing and memory mapping. So the real rate to limit would be a factor of the throughput. Would that mean we would need to make the factor configurable (which isn't too bad really, just wanted to call it out though).
Any other feedback you have @egor-ryashin? . I would like to get this pull request in before it goes stale beyond recovery :).
There was a problem hiding this comment.
Looks like there already is a config maxSegmentsInNodeLoadingQueue which controls how many segments queued up for a historical node to load.
http://druid.io/docs/latest/configuration/index.html#dynamic-configuration
The default value of this dynamic config is 0 meaning there is no queue and the coordinator is going to assign segments for historicals. In practice though, I have found that this default value ends up causing clusters to be a bit unbalanced. In our production clusters, where we have this pull request, we have set this value to 1000 (which is what the config documentation also recommends).
What this means is that, we don't need to worry about batching or control the rate at which historicals are going to download segments. The above config is already providing that behavior. I am going to get rid off them.
Introduce various configs.
Since we have a separate load peon for every historical, just having a single SegmentChangeProcessor task per historical is enough. This commit also gets rid of the associated config druid.coordinator.loadqueuepeon.curator.numCreateThreads
| } | ||
|
|
||
| public SegmentId getSegmentId() | ||
| public String getSegmentIdentifier() |
There was a problem hiding this comment.
I don't see a point of String-ifying API. Return a SegmentId object and let the users chain-call toString() if they need.
There was a problem hiding this comment.
SegmentHolder is an inner class and almost all the callers who need segmentIdentifier need the string version of it. So it made more sense to me to just return the String instead of having callers call toString() on segmentId.
| @@ -183,28 +172,17 @@ public void dropSegment( | |||
| final LoadPeonCallback callback | |||
There was a problem hiding this comment.
Methods and fields in this class generally lack concurrent access documentation: https://github.com/code-review-checklists/java-concurrency#justify-document. For example, it's not clear if you removed synchronization in dropSegment() method because it's called only from a single thread, or because you think the new code is concurrently safe. If the latter, then dropSegment() seems to be a subject for https://github.com/code-review-checklists/java-concurrency#chm-race
There was a problem hiding this comment.
This class has been around forever and I agree that more could be done to document the thread safety of the class. Having said that, I don't think this PR should be focussing on it. It could possibly be addressed in https://github.com/apache/incubator-druid/projects/4.
Looking at the callers of loadSegment() and dropSegment() I can't say for sure if they could be called by multiple threads. It doesn't seem like it from what I see. However, to better guard against future changes, I am going to slightly change the code to use putIfAbsent instead of doing the racy check-then-act.
There was a problem hiding this comment.
Why did you remove the synchronization in the first place?
The change doesn't quite work without concurrent access documentation. Concurrent access documentation should go hand-in-hand with the concurrently safe code. Without the documentation, how would anybody know why putIfAbsent() is used, and if that is enough?
I would not ask you to add documentation if you didn't remove synchronization and change the concurrency model of this class in the first place. But since you did, you have to be able to answer "hard" questions about this class. There is no other way to verify this PR.
There was a problem hiding this comment.
The old code was synchronizing on currentlyProcessing since it was concurrently accessed by multiple threads. Of special interest were these two places -
and
This was the key reason behind the peon assigning segments serially. It would wait for the previous load/drop request to complete before starting a new one.
There was a problem hiding this comment.
I think there should be a good balance between letting code document itself vs writing multiple lines in plain English which sometimes depending on writing style of the author doesn't always make sense to the reader.
| { | ||
| final SegmentHolder existingHolder = segmentsToDrop.get(segment); | ||
| SegmentHolder segmentHolder = new SegmentHolder(segment, DROP, Collections.singletonList(callback)); | ||
| final SegmentHolder existingHolder = segmentsToDrop.put(segment, segmentHolder); |
There was a problem hiding this comment.
Good catch. That was a miss.
|
@leventov - updated the PR. Does it look good to go now? |
| @@ -152,29 +176,18 @@ public int getNumberOfSegmentsInQueue() | |||
| @Override | |||
| public void loadSegment(final DataSegment segment, final LoadPeonCallback callback) | |||
There was a problem hiding this comment.
callback parameter should be annotated @Nullable
There was a problem hiding this comment.
This really strikes me as something that is too minor for a 6th review. Especially since it is pre-existing code. Sure, it's better, but review has become quite drawn out on this patch.
| return; | ||
| } | ||
| } | ||
| SegmentHolder segmentHolder = new SegmentHolder(segment, LOAD, Collections.singletonList(callback)); |
There was a problem hiding this comment.
Does it make sense that we create a singleton list of null potentially, but a few lines below avoid adding null as a callback?
There was a problem hiding this comment.
Having the null check earlier is probably a bit cleaner but it also seems like a minor point.
There was a problem hiding this comment.
I actually like the way this code reads.
If I end up passing a null element in the singleton list, then downstream code like this
List<LoadPeonCallback> snapshotCallbacks() { synchronized (callbacks) { // Return an immutable copy so that callers don't have to worry about concurrent modification return ImmutableList.copyOf(callbacks); } }
would break. It is not worth the change, IMHO.
There was a problem hiding this comment.
The question is if the downstream code prepared to see nulls in this list? Even if it does, it would be better if it didn't and it was a guarantee that the list never contains nulls. It's less error-prone.
There was a problem hiding this comment.
Moved null checks downstream to SegmentHolder class.
| @@ -183,28 +196,16 @@ public void dropSegment( | |||
| final LoadPeonCallback callback | |||
| } | ||
|
|
||
| callBackExecutor.execute( | ||
| () -> executeCallbacks(segmentHolder) |
There was a problem hiding this comment.
Pointless two level submission. executeCallbacks() submits each callback to itself another one time (this means that it's valuable to annotate each method in this class in which executor it's supposed to be run. Given that there are two different executors, plus "external", client code execution context (which we kind of shouldn't know where runs)
| segmentHolder.getType() == LOAD ? "load" : "drop", | ||
| segmentHolder.getSegmentIdentifier() | ||
| ); | ||
| final ScheduledFuture<?> future = monitorNodeRemovedExecutor.schedule( |
There was a problem hiding this comment.
Please extract this block as a method to reduce the size of run().
There was a problem hiding this comment.
This also feels too minor at this point.
There was a problem hiding this comment.
@gianm could you please stop following every PR that I review and comment on what you think is minor and what you think is not?
There was a problem hiding this comment.
I agree with @gianm . Comments like these, if need be, should be made towards the start of the review. It adds unnecessary friction and discourages contributors in general. Refactoring like this doesn't contribute anything to the functionality, increases the scope of changes and makes figuring out issues in the code through reviews difficult. Having said that, I appreciate issues you identified and some other changes you have suggested @leventov .
There was a problem hiding this comment.
Refactoring like this doesn't contribute anything to the functionality, increases the scope of changes and makes figuring out issues in the code through reviews difficult.
These points that you brought are not special to the proposed refactoring - they are pretty much by definition of any refactoring. So does this mean that refactoring shouldn't be done at all? (Making refactoring in separate PRs is not a solution as well, at least because in practice, nobody happens to do that in Druid, or do very rarely.)
Comments like these, if need be, should be made towards the start of the review
I agree with the point that all comments should be preferably made towards the start of the review, rather than in small batches. It's a place for improvement for me, but we have what we have here already.
So could you please extract the method?
| () -> { | ||
| try { | ||
| if (curator.checkExists().forPath(path) != null) { | ||
| failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path)); |
There was a problem hiding this comment.
So if you submit this action to processingExecutor, can the monitorNodeRemovedExecutor be a just single-thread executor and druid.coordinator.loadqueuepeon.curator.numMonitorThreads config parameter removed?
There was a problem hiding this comment.
Removed monitorNodeRemovedExectuor altogether.
There was a problem hiding this comment.
As you reintroduced this monitorNodeRemovedExecutor, do you want to redirect failAssign() to processingExecutor?
There was a problem hiding this comment.
And if you do this, please document methods as to be executed in the corresponding executors.
There was a problem hiding this comment.
I mistook my change of removing monitorNodeRemovedExecutor as the reason behind integration tests failing. Turned out it wasn't the case. Removed the pool for good.
|
|
||
| /** | ||
| * Needs to be thread safe since it can be concurrently accessed via | ||
| * loadSegment(), actionCompleted(), getSegmentsToLoad() and stop() |
There was a problem hiding this comment.
Please make method refs Javadoc links
|
|
||
| /** | ||
| * Needs to be thread safe since it can be concurrently accessed via | ||
| * dropSegment(), actionCompleted(), getSegmentsToDrop() and stop() |
|
|
||
| /** | ||
| * Needs to be thread safe since it can be concurrently accessed via | ||
| * markSegmentToDrop(), unmarkSegmentToDrop() and getSegmentsMarkedToDrop() |
|
@samarthjain The integration test failures look like they are non-transient |
|
@samarthjain thanks for following through. |
#7068