[WIP] Mapside aggregating hadoop indexer#2670
Conversation
|
Waiting #2650 |
|
@navis , how about the performance improved? |
|
have not read the impl yet, but this is a nice concept IMO. my one request at a high level is that it should either be off by default or have a very conservative default for maxRowsInMemory, to prevent people from hitting OOMEs on upgrade. |
|
hadoop combiner allows you to truly merge "all" the rows that are mergable at mapper because it is pretty much like the reducer and hadoop will carefully supply it all the rows that were mergeable together. |
|
My experience in general is that combiners are good if reducers are a big bottleneck and mappers are not. But if things are more balanced between mappers and reducers, then doing in-heap aggregation on the mappers incurs less mapper overhead than using a combiner (combiners involve serde and sorting costs). So it could be better or worse depending on the workload. It's kind of a point of contention in the hadoop world though (some projects prefer combiners and some prefer in-heap aggregation…). Some even support both at the same time :) It would certainly be good to know real numbers of this approach vs useCombiner for your workload, @navis. Even better if you could also share numbers of this approach + useCombiner used together vs either one alone. |
|
@gianm let say very first row and very last row in the dataset are mergeable together, then only way you will merge them without combiner if you held the IncrementalIndex for that group key till very last moment. Now multiply that by the total number of distinct group keys mapper gets if you truly want to merge everything that could be merged. |
|
@himanshug it's not necessary to merge everything, the idea with in-heap aggregation is that it's just best effort, but that can be good enough for a substantial reduction in data sent to the reducers without incurring the overhead of guaranteed merging at both the mapper and reducer levels. would be good to confirm for some real world workloads whether or not this approach works better than a combiner. |
There was a problem hiding this comment.
It needs to check whether inputRow is instance of SegmentInputRow.
It seems that combiningAggs should be used instead of aggregators in that case.
|
IMO this patch looks really good especially when the input data is roughly sorted by timestamp, and my machine logs are like that. |
|
It needs more elaboration but, |
fe9b575 to
67d1100
Compare
There was a problem hiding this comment.
How about use bucket intervals to make each IncrementalIndex to reduce memory footprint(maybe)? like the below.
config.getGranularitySpec().bucketInterval(timestamp).get();
0a0b334 to
f90078e
Compare
|
I think @gianm already explained well (thanks!) on the intention of this patch. Current serde for inputRow is making very big complex binary and seemed not possible to use binary combiner (forgot the exact name of this in hadoop). Then for combining rows hadoop must read object form binary and write it again, which is big work for both cpu and memory. |
1eae336 to
1ee16a8
Compare
1ee16a8 to
6c4d922
Compare
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Current hadoop indexer pushes rows one by one to context but if timestamp of rows are not varied (like hourly batch) that much we can aggregate rows in mapper memory first. I know there is combiner in hadoop but it's infamously inefficient and even hive didn't used that.