Priority on loading for primary replica#4757
Conversation
|
|
||
| final Optional<ServerHolder> primaryHolderToLoad; | ||
| if (totalReplicantsInCluster <= 0) { | ||
| log.trace("No replicants for %s", segment.getIdentifier()); |
| final Optional<ServerHolder> primaryHolderToLoad; | ||
| if (totalReplicantsInCluster <= 0) { | ||
| log.trace("No replicants for %s", segment.getIdentifier()); | ||
| primaryHolderToLoad = getPrimaryHolder( |
There was a problem hiding this comment.
how about getPriorityHolder instead?
| return stats; | ||
| } | ||
| } else { | ||
| primaryHolderToLoad = Optional.empty(); |
There was a problem hiding this comment.
can this logic flip around? I find the tiny cases easier to read if they are first.
for example change to
if (totalReplicantsInCluster > 0) {
primaryHolderToLoad = Optional.empty();
} else {
.....
}| serverHolderPredicate | ||
| ); | ||
|
|
||
| if (primaryHolderToLoad.isPresent()) { |
There was a problem hiding this comment.
this feels strange to not use a functional workflow here, but it is not clear a functional flow would be easier to read.
There was a problem hiding this comment.
On the contrary, I removed use of Optional and mapping, because IMO it only adds obscurity. "Too functional" for Java
| ++totalReplicantsInCluster; | ||
| } else { | ||
| log.trace("No primary holder found for %s", segment.getIdentifier()); | ||
| return stats; |
There was a problem hiding this comment.
this is hidden in a weird spot. Is this the same behavior as previously?
There was a problem hiding this comment.
Also suggest moving this up to the top of the if statement and flipping the boolean for ease of readability.
There was a problem hiding this comment.
Sure, I'll move it up.
And yeah, I think I have mistakenly skipped the "drop" part; I will add that in.
|
|
||
| return candidates | ||
| .stream() | ||
| .max((s1, s2) -> Ints.compare(s1.getServer().getPriority(), s2.getServer().getPriority())); |
There was a problem hiding this comment.
can this get resolved without materializing the whole candidates array?
| { | ||
| final List<ServerHolder> candidates = Lists.newLinkedList(); | ||
|
|
||
| for (final Map.Entry<String, Integer> entry : tieredReplicants.entrySet()) { |
There was a problem hiding this comment.
This workflow could probably be a lot cleaner if rewritten functionally
| continue; | ||
| } | ||
|
|
||
| final ServerHolder candidate = strategy.findNewSegmentHomeReplicator( |
There was a problem hiding this comment.
If I'm reading this correctly, this is doing a LOT more here compared to what it was previously
There was a problem hiding this comment.
I think it should not be that bad. If the findNewSegmentHomeReplicator method call is of concern, then this change will make N - 1 more calls where N is the number of tiers. If we have about 2 tiers per rule, this will make one more call.
| @@ -57,12 +59,58 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim | |||
|
|
|||
| final Map<String, Integer> loadStatus = Maps.newHashMap(); | |||
There was a problem hiding this comment.
@xanec could you please extract some functionality into dedicated methods from this run() method. It's too long.
There was a problem hiding this comment.
Sure, do you mind if I do some significant significant refactoring?
|
|
||
| final Map<String, Integer> tieredReplicants = getTieredReplicants(); | ||
| for (final String tier : tieredReplicants.keySet()) { | ||
| stats.addToTieredStat(ASSIGNED_COUNT, tier, 0); |
There was a problem hiding this comment.
why does this need initialized now?
There was a problem hiding this comment.
I don't think this needs initialized. All the components underneath lazily add entries as needed
There was a problem hiding this comment.
This is for the expected behavior in once of the tests: https://github.com/druid-io/druid/blob/d43687d578bf3dea98b01d0899bcfbb2125d142e/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java#L631
Without the initialization, stat3 would not have the hot tier and throw NPE. Note that previously, a 0 will be added into the statistics when no assignment is done:
https://github.com/druid-io/druid/blob/d43687d578bf3dea98b01d0899bcfbb2125d142e/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java#L102
Personally, I also feel that no initialization is required but I am not sure if receiving 0 is part of the expected usage.
|
@leventov @drcrallen I have implemented the previously requested changes and did some refactoring. Could you kindly review the code again? Thanks. |
| } | ||
|
|
||
| final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); | ||
| private static Predicate<ServerHolder> createPredicate(final DruidCoordinatorRuntimeParams params) |
There was a problem hiding this comment.
Could this method be given a more speaking name? E. g. "createLoadQueueSizeLimitingPredicate()"
| final String tier, | ||
| final DruidCluster druidCluster, | ||
| final Predicate<ServerHolder> firstPredicate, | ||
| final Predicate<ServerHolder>... otherPredicates |
There was a problem hiding this comment.
Suggested for this method just to accept one predicate, and callers call and() on some predicates, if needed
| ); | ||
| return numAssigned; | ||
| } | ||
| holders.remove(holder); |
There was a problem hiding this comment.
This is to prevent the holder from serving more than one replica of the same segment. During the current run, I believe the assignment will not be immediately reflected in the ServerHolder. Have I mistaken on the expected behavior?
| log.makeAlert("No holders found for tier[%s]", tier).emit(); | ||
| numDropped = 0; | ||
| } else { | ||
| final int numToDrop = entry.getIntValue() - targetReplicants.getOrDefault(tier, 0); |
There was a problem hiding this comment.
Suggested to extract entry.getIntValue() as "currentReplicants" for readability
| for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) { | ||
| final String tier = entry.getKey(); | ||
| // if there are replicants loading in cluster | ||
| if (druidCluster.hasTier(tier) && entry.getIntValue() > currentReplicants.getOrDefault(tier, 0)) { |
There was a problem hiding this comment.
Why if this it true in any tier, we exit the method? maybe still drop segments in other tiers?
There was a problem hiding this comment.
Yeah, I also do not understand the complete rational behind this decision but the test cases do enforce such a behavior. May be it is to prevent some form of "thrashing" whereby segments get loaded and dropped excessively?
|
|
||
| private static int dropForTier( | ||
| final int numToDrop, | ||
| final MinMaxPriorityQueue<ServerHolder> holders, |
There was a problem hiding this comment.
Maybe "tierHolders" or "holdersInTier"
| { | ||
| int numDropped = 0; | ||
|
|
||
| final List<ServerHolder> droppedHolders = new LinkedList<>(); |
There was a problem hiding this comment.
There is no point to use LinkedList here, please use ArrayList
There was a problem hiding this comment.
Sure, but won't a LinkedList will be more suitable here since the size is unknown and we don't need random access? Using a LinkedList will also prevent the problem of array resizing.
There was a problem hiding this comment.
No, if created as ArrayList<>(1) it is strictly always more memory efficient than LinkedList, and amortized cost of adding an element is smaller. LinkedList is almost never a good data structure as is (only intrusive linked lists are useful in some forms, sometimes). Exceptions, when LinkedList is useful itself, are so rare that I've never seen them in my practice, and there are no such in the Druid codebase.
|
|
||
| final List<ServerHolder> droppedHolders = new LinkedList<>(); | ||
| while (numDropped < numToDrop) { | ||
| final ServerHolder holder = holders.pollLast(); |
There was a problem hiding this comment.
Why need to remove holders, and then add back? Couldn't the same logic be expressed merely with iteration?
There was a problem hiding this comment.
This is because we are using pollLast, an iterator will not give us the same order (i.e., in descending available size).
There was a problem hiding this comment.
I inspected all usages of MinMaxPriorityQueue<ServerHolder> in the coordinator code, and it seems to me that poll() or peek() or pollFirst() or peekFirst() is never called on those min-max queues. There are only iterated "directly", elements added to them, and here, min-max is "iterated" in reverse order, via calling pollLast() and then adding elements back.
It means that either
- Min-max queue is not needed, it could be a simple
PriorityQueuein the reverse order from what is used now to create min-max queues. - Or, if some particular order is expected when those queues are iterated, usage of min-max queues is a mistake, because it doesn't guarantee any particular iteration order. In this case, it should be replaced with
TreeSet.
Also @fjy could you please comment here
| if (serverQueue == null) { | ||
| log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); | ||
| @SafeVarargs | ||
| private static List<ServerHolder> getHolderList( |
| stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); | ||
|
|
||
| // tier with primary replica | ||
| final int targetReplicantsInTier = targetReplicants.removeInt(tier); |
There was a problem hiding this comment.
Maybe add @Nullable String primaryTier parameter to assignReplicas(), to avoid this error-prone remove tier - add tier code.
| final DruidCoordinatorRuntimeParams params, | ||
| final DataSegment segment, | ||
| final CoordinatorStats stats, | ||
| @Nullable final String primaryTier |
There was a problem hiding this comment.
Also maybe call it "tierToSkip", to make the intention more explicit.
| { | ||
| MinMaxPriorityQueue<ServerHolder> servers = historicals.get(tier); | ||
| return (servers == null) || servers.isEmpty(); | ||
| return (servers != null) && !servers.isEmpty(); |
There was a problem hiding this comment.
is it covered in unit tests?
| * Iterates through each tier and find the respective segment homes; with the found segment homes, selects the one | ||
| * with the highest priority to be the holder for the primary replica. | ||
| * | ||
| * @param params |
There was a problem hiding this comment.
Please remove empty javadoc stubs
| } else { | ||
| // cache the result for later use. | ||
| strategyCache.put(tier, candidate); | ||
| if ( |
| if ( | ||
| topCandidate == null || | ||
| candidate.getServer().getPriority() > topCandidate.getServer().getPriority() | ||
| ) { |
| } | ||
|
|
||
| return stats; | ||
| /*** |
| return stats; | ||
| /*** | ||
| * | ||
| * @param params |
| if (leftToLoad > 0) { | ||
| return stats; | ||
| // This enforces that loading is completed before we attempt to drop stuffs as a safety measure | ||
| for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) { |
There was a problem hiding this comment.
Please extract this block as a method with boolean result
| @@ -293,12 +293,8 @@ private void drop( | |||
|
|
|||
| // Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything | |||
There was a problem hiding this comment.
With method extracted, this comment line doesn't make much sense to me
|
@xanec could you please remove "Enforce Indentation with Checkstyle" commit from the history? |
0d181a0 to
d7b7c55
Compare
|
@drcrallen do you have more comments here? |
| DruidCoordinatorRuntimeParams params, | ||
| String tier, | ||
| MinMaxPriorityQueue<ServerHolder> servers, | ||
| NavigableSet<ServerHolder> servers, |
There was a problem hiding this comment.
(Optional) Does this need to be anything other than Collection<ServerHolder>?
There was a problem hiding this comment.
For some of the uses of the NavigableSet, it seems that the ordering and uniqueness is implicit in the logic (e.g., in loops). From what I can see, while very very unlikely, changes in these two qualities may alter the behavior. Hence, I have changed the variable to SortedSet instead as a safeguard against future changes to DruidCluster.getSortedHistoricalByTier.
For the other uses, I have changed it to Iterable.
| Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = Maps.newHashMap(); | ||
|
|
||
| for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { | ||
| for (NavigableSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { |
There was a problem hiding this comment.
(Optional) Does this need to be anything other than Collection<ServerHolder>?
| // cleanup before it finished polling the metadata storage for available segments for the first time. | ||
| if (!availableSegments.isEmpty()) { | ||
| for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { | ||
| for (NavigableSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { |
There was a problem hiding this comment.
(Optional) Does this need to be anything other than Collection<ServerHolder>?
|
|
||
| log.info("Load Queues:"); | ||
| for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { | ||
| for (NavigableSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { |
There was a problem hiding this comment.
(Optional) Does this need to be anything other than Collection<ServerHolder>?
| .getTotalReplicants(segment.getIdentifier(), tier); | ||
| final int loadedReplicantsInTier = params.getSegmentReplicantLookup() | ||
| .getLoadedReplicants(segment.getIdentifier(), tier); | ||
| // performs |
| tier, | ||
| targetReplicants.getOrDefault(tier, 0), | ||
| // note: adding 1 to currentReplicantsInTier to account for the one assigned as primary replica | ||
| currentReplicants.getOrDefault(tier, 0) + 1, |
There was a problem hiding this comment.
currentReplicants.getOrDefault(tier, 0) should always be 0 here right? can this just be a hard coded 1? Actually, I suggest changing the logic here a bit and make int numAssigned = 1; immediately after your assignPrimary call, then make this statement numAssigned += .... with numAssigned passed as a parameter, maybe with a code comment that it will always be 1. IMHO makes what is going on easier to follow.
| ); | ||
| return numAssigned; | ||
| } | ||
| holders.remove(holder); |
| final DruidCoordinatorRuntimeParams params, | ||
| final DataSegment segment, | ||
| final DruidCoordinatorRuntimeParams params | ||
| final CoordinatorStats stats |
There was a problem hiding this comment.
this is deceiving, it is a final object reference, but the contents are modified. Suggest adding a method comment to such effect.
| final int targetReplicantsInTier, | ||
| final int currentReplicantsInTier, | ||
| final DruidCoordinatorRuntimeParams params, | ||
| final List<ServerHolder> holders, |
There was a problem hiding this comment.
This is modified in the method call, suggest calling that out in the method docs
There was a problem hiding this comment.
I have moved the retrieval of ServerHolders into the method to maintain "immutability" of parameters.
| final DataSegment segment | ||
| ) | ||
| { | ||
| int numDropped = 0; |
There was a problem hiding this comment.
(Optional) This can be replaced with java 8 awesomeness
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(holdersInTier.descendingIterator(), Spliterator.ORDERED), false)
.limit(numToDrop)
.filter(sh -> sh.isServingSegment(segment))
.mapToInt(sh -> {
sh.getPeon().dropSegment(segment, null);
return 1;
})
.sum();Wait, actually this doesn't work but passes tests, which is not good.
There was a problem hiding this comment.
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(holdersInTier.descendingIterator(), Spliterator.ORDERED), false)
.filter(sh -> sh.isServingSegment(segment))
.limit(numToDrop)
.mapToInt(sh -> {
sh.getPeon().dropSegment(segment, null);
return 1;
})
.sum();is the correct one I think, but highlights a blind spot in the testing.
There was a problem hiding this comment.
I think I am going to skip using monad-chaining on this one because we still need to work in the log.warn() into it and it does not seem quite worth it anymore 🤷♂️
There was a problem hiding this comment.
I've included additional auxiliary fixture to fail the first implementation.
* Priority on loading for primary replica * Simplicity fixes * Fix on skipping drop for quick return. * change to debug logging for no replicants. * Fix on filter logic * swapping if-else * Fix on wrong "hasTier" logic * Refactoring of LoadRule * Rename createPredicate to createLoadQueueSizeLimitingPredicate * Rename getHolderList to getFilteredHolders * remove varargs * extract out currentReplicantsInTier * rename holders to holdersInTier * don't do temporary removal of tier. * rename primaryTier to tierToSkip * change LinkedList to ArrayList * Change MinMaxPriorityQueue in DruidCluster to TreeSet. * Adding some comments. * Modify log messages in light of predicates. * Add in-method comments * Don't create new Object2IntOpenHashMap for each run() call. * Cache result from strategy call in the primary assignment to be reused during the same run. * Spelling mistake * Cleaning up javadoc. * refactor out loading in progress check. * Removed redundant comment. * Removed forbidden API * Correct non-forbidden API. * Precision in variable type for NavigableSet. * Obsolete comment. * Clarity in method call and moving retrieval of ServerHolder into method call. * Comment on mutability of CoordinatoorStats. * Added auxiliary fixture for dropping.
* Priority on loading for primary replica * Simplicity fixes * Fix on skipping drop for quick return. * change to debug logging for no replicants. * Fix on filter logic * swapping if-else * Fix on wrong "hasTier" logic * Refactoring of LoadRule * Rename createPredicate to createLoadQueueSizeLimitingPredicate * Rename getHolderList to getFilteredHolders * remove varargs * extract out currentReplicantsInTier * rename holders to holdersInTier * don't do temporary removal of tier. * rename primaryTier to tierToSkip * change LinkedList to ArrayList * Change MinMaxPriorityQueue in DruidCluster to TreeSet. * Adding some comments. * Modify log messages in light of predicates. * Add in-method comments * Don't create new Object2IntOpenHashMap for each run() call. * Cache result from strategy call in the primary assignment to be reused during the same run. * Spelling mistake * Cleaning up javadoc. * refactor out loading in progress check. * Removed redundant comment. * Removed forbidden API * Correct non-forbidden API. * Precision in variable type for NavigableSet. * Obsolete comment. * Clarity in method call and moving retrieval of ServerHolder into method call. * Comment on mutability of CoordinatoorStats. * Added auxiliary fixture for dropping.
* Priority on loading for primary replica * Simplicity fixes * Fix on skipping drop for quick return. * change to debug logging for no replicants. * Fix on filter logic * swapping if-else * Fix on wrong "hasTier" logic * Refactoring of LoadRule * Rename createPredicate to createLoadQueueSizeLimitingPredicate * Rename getHolderList to getFilteredHolders * remove varargs * extract out currentReplicantsInTier * rename holders to holdersInTier * don't do temporary removal of tier. * rename primaryTier to tierToSkip * change LinkedList to ArrayList * Change MinMaxPriorityQueue in DruidCluster to TreeSet. * Adding some comments. * Modify log messages in light of predicates. * Add in-method comments * Don't create new Object2IntOpenHashMap for each run() call. * Cache result from strategy call in the primary assignment to be reused during the same run. * Spelling mistake * Cleaning up javadoc. * refactor out loading in progress check. * Removed redundant comment. * Removed forbidden API * Correct non-forbidden API. * Precision in variable type for NavigableSet. * Obsolete comment. * Clarity in method call and moving retrieval of ServerHolder into method call. * Comment on mutability of CoordinatoorStats. * Added auxiliary fixture for dropping.
* Priority on loading for primary replica * Simplicity fixes * Fix on skipping drop for quick return. * change to debug logging for no replicants. * Fix on filter logic * swapping if-else * Fix on wrong "hasTier" logic * Refactoring of LoadRule * Rename createPredicate to createLoadQueueSizeLimitingPredicate * Rename getHolderList to getFilteredHolders * remove varargs * extract out currentReplicantsInTier * rename holders to holdersInTier * don't do temporary removal of tier. * rename primaryTier to tierToSkip * change LinkedList to ArrayList * Change MinMaxPriorityQueue in DruidCluster to TreeSet. * Adding some comments. * Modify log messages in light of predicates. * Add in-method comments * Don't create new Object2IntOpenHashMap for each run() call. * Cache result from strategy call in the primary assignment to be reused during the same run. * Spelling mistake * Cleaning up javadoc. * refactor out loading in progress check. * Removed redundant comment. * Removed forbidden API * Correct non-forbidden API. * Precision in variable type for NavigableSet. * Obsolete comment. * Clarity in method call and moving retrieval of ServerHolder into method call. * Comment on mutability of CoordinatoorStats. * Added auxiliary fixture for dropping.
This PR seeks to address a previously-encountered bug that if a
LoadRulehas multiple tiers, prolonged loading on a (lower) tier seems to "block" another tier from loading even when the latter is available for loading. Preliminary investigation suggests that, depending on theBalancerStrategyand/or configurations (e.g.,maxSegmentsInNodeLoadingQueueandreplicationThrottleLimit), the circumstances may result in missed opportunity in loading, which could critical when prompt loading in bulk is required (e.g., recovery of historicals).In the current implementation, the primary replica will be assigned to the tier that comes first in the
tieredReplicantsmap when there is no replica in the cluster and this replica will not be throttled: https://github.com/druid-io/druid/blob/d43687d578bf3dea98b01d0899bcfbb2125d142e/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java#L130-L134The main mechanism in the PR uses the
priorityparameter set on the server to prioritized which server is selected as the holder of the primary replica. The change in this PR simply identifies if the primary replica needs to be loaded (i.e., whentotalReplicantsInCluster <= 0) and will prioritize appropriately (i.e., select among the candidate servers the one with the highest priority), instead of using the arbitrary ordering of the hash map.