Skip to content

add CachingClusteredClient benchmark, refactor some stuff#8089

Merged
clintropolis merged 9 commits intoapache:masterfrom
clintropolis:caching-clustered-client-benchmark
Jul 18, 2019
Merged

add CachingClusteredClient benchmark, refactor some stuff#8089
clintropolis merged 9 commits intoapache:masterfrom
clintropolis:caching-clustered-client-benchmark

Conversation

@clintropolis
Copy link
Copy Markdown
Member

@clintropolis clintropolis commented Jul 16, 2019

Description

This PR adds a benchmark for CachingClusteredClient and some refactoring of the query processing pipeline to provide the foundation for testing approaches to parallel broker merges.

Benchmarks can be run with a command like the following:

java -Ddruid.benchmark.cacheDir=./tmp/benches/ -jar benchmarks/target/benchmarks.jar org.apache.druid.benchmark.query.CachingClusteredClientBenchmark

Substituting benchmark cache directory as appropriate. This benchmark could potentially be improved in the future to precompute the results to merge and strictly measure the merge, but for now I am retaining the overall approach of the original from #6629.

Background

I'm having a go at parallel broker merges, making another attempt to achieve the goals of #5913 and #6629, eventually planning to attempt the ForkJoinPool in asyncMode approach suggested by @leventov in this thread. Before that, in order to untangle things a bit, I've taken the benchmarks from #6629 (credit to @jihoonson) and updated/simplified them to take advantage of some of the changes to SegmentGenerator from #6794, to allow a persistent cache for the generated benchmark segments for much faster benchmarking. I've also extracted some of the useful refactorings and got a bit more adventurous. This should help isolate these supporting changes from any future PR which adds parallel merging, reducing review overhead.

Refactoring

CombiningFunction<T>

Added CombiningFunction<T>, a new @FunctionalInterface to replace BinaryFn<Type1, Type2, OutType>, since all actual usages were of the form BinaryFn<T, T, T> and being strictly used in merging sequences/iterators/iterables, etc.

QueryToolChest and ResultMergeQueryRunner

In order to split out the mechanisms useful during merge from the merge implementation, QueryToolChest now has 2 additional functions:

CombiningFunction<ResultType> createMergeFn(Query<ResultType> query)

and

Ordering<ResultType> createOrderingFn(Query<ResultType> query)

For group-by queries, GroupByStrategy also has these method signatures, since GroupByQueryToolchest is delegating these things to the strategy.

These methods are passed into a refactored, non-abstract ResultMergeQueryRunner, as function generators, that given a Query produce either a CombiningFunction or Ordering respectively.

ConnectionCountServerSelectorStrategy is now WeightedServerSelectorStrategy

I did not refactor QueryableDruidServer in quite the same manner as #6629, but I did still modify QueryableDruidServer and QueryRunner to add a getWeight method, as suggested by @drcrallen in this comment thread to make the selector strategy a bit more generic instead of hard casting QueryRunner to a DirectDruidClient to get the number of connections. I'm sort unsure about this one, this refactor might have made a bit more sense in the context of the changes to QueryableDruidServer in #6629, but I still think it's maybe worth doing? I reverted this change since it felt sort of artificial at this point, in favor of doing something like this when we actually need it.

Removed

OrderedMergingIterator, OrderedMergingSequence, and SortingMergeIterator have been removed, since they were strictly used by their tests.


This PR has:

  • been self-reviewed.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.

@jihoonson
Copy link
Copy Markdown
Contributor

LGTM overall. Would you fix the Line 198 of BrokerServerView? It should be QueryableDruidServer retVal = new QueryableDruidServer<>(server, makeDirectClient(server)); now.

…rStrategy and remove getWeight since felt artificial, default mergeResults in toolchest implementation for topn, search, select
new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(10 * 1024 * 1024)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract this 10 * 1024 * 1024 together with the one above in the code if they have to be equal or as different constants if they don't have to be equal and comment about that.

.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.metric("sumLongSequential")
.threshold(20480)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining something about this number, e. g. why it has to be 20480.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to @jihoonson (who wrote the benchmark originally) and we don't believe the number is actually significant, it was just an arbitrarily large number to make the result meaningful. I changed it to 10000 and have added a comment about it. I think it likely this will maybe be moved into a benchmark parameter in the future to compare merges for different thresholds, but I will save that until I actually have different merges to compare.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I set it to 20480 in #6629, but don't remember where this number came from. I think threshold could be any number if it's not too small so that parallel merge can help with merging results.

*/
public interface BinaryFn<Type1, Type2, OutType>
@FunctionalInterface
public interface CombiningFunction<T>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the standard BinaryOperator or add a Javadoc comment justifying the existence of CombiningFunction.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. All of our custom functions gave me tunnel vision and I forgot that Java exists 😅. I don't think the existence of CombiningFunction is really justified since it's just for vanity/cosmetic reasons to clarify what it's doing, which I don't think are necessary.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this interface with the latest commit to this PR.

@@ -40,9 +40,11 @@ public BaseSequence(
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> fn)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can now make initValue final.

* Creates an ordering comparator that is used to order results. This ordering function is used in the defaul
* {@link ResultMergeQueryRunner} provided by {@link QueryToolChest#mergeResults(QueryRunner)}
*/
public Ordering<ResultType> createOrderingFn(Query<ResultType> query)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Comparator or explain in the Javadoc comment why Ordering is used here instead of Comparator.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently producing an Ordering because the CombiningSequence created by ResultMergeQueryRunner takes that instead of a Comparator. However, it looks like the Ordering in CombiningSequence can be swapped to use a regular Comparator, so I think this could likely be safely changed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have swapped uses of Ordering with Comparator in CombiningSequence, ResultMergeQueryRunner, the QueryToolChest implementations, and the GroupByStrategy implementations.

public int compare(Result<Object> r1, Result<Object> r2)
{
return r1.getTimestamp().compareTo(r2.getTimestamp());
(r1, r2) -> r1.getTimestamp().compareTo(r2.getTimestamp()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Comparator.comparing(Result::getTimestamp)


public class TopNMetricSpecOptimizationsTest
{
private static final List<AggregatorFactory> aggs = Lists.newArrayList(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please call static final constant with all caps

&& !segmentWatcherConfig.getWatchedDataSources().contains(input.rhs.getDataSource())) {
return false;
}
this.segmentFilter = metadataAndSegment -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type of metadataAndSegment is not obvious, please use (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> ...

if (arg2 == null) {
return arg1;
}
private final CombiningFunction<Integer> plus = (arg1, arg2) -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this function static. I would call it PLUS_NULLABLE to highlight why it's not just Integer::sum.

return arg2;
}
return arg1;
(arg1, arg2) -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please extract GuavaUtils.firstNonNull and use a method reference here? The doc for that method should note that Objects.firstNonNull() cannot be used itself because it's one of those methods causing Guava incompatibility (see #6948). Please also add Guava's Objects.firstNonNull() this method to forbidden-apis.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is in a lot of places, but this is the only place specifically where it's done to just return one or the other, not as a mechanism to bail early before doing some sort of combine. Is it still worth moving to GuavaUtils or should it just be a private static method of this class?

Additionally, this method seems @Nullable which makes it have a different contract than Objects.firstNonNull, which requires one of the 2 arguments not be null. Prohibiting it on this functions behalf doesn't seem necessary, so are we doing it for a reason in #6948?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and did this ^ with the latest commits, extracting to GuavaUtils.firstNonNull and adding to forbidden api

* null.
*/
@Nullable
public static <T> T firstNonNull(T arg1, T arg2)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please annotate both parameters @Nullable


/**
* Creates an ordering comparator that is used to order results. This ordering function is used in the defaul
* Creates an ordering comparator that is used to order results. This comparator is used in the defaul
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "defaul"

* {@link ResultMergeQueryRunner} provided by {@link QueryToolChest#mergeResults(QueryRunner)}
*/
public Ordering<ResultType> createOrderingFn(Query<ResultType> query)
public Comparator<ResultType> createComparator(Query<ResultType> query)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: maybe call it createResultComparator.

*/
@Nullable
default Ordering<Row> createOrderingFn(Query<Row> queryParam)
default Comparator<Row> createComparator(Query<Row> queryParam)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think createResultComparator would be a clearer name for this method

@clintropolis
Copy link
Copy Markdown
Member Author

Thanks for review @leventov and @jihoonson!

@clintropolis clintropolis merged commit 03e55d3 into apache:master Jul 18, 2019
@clintropolis clintropolis deleted the caching-clustered-client-benchmark branch July 18, 2019 20:16
@clintropolis clintropolis added this to the 0.16.0 milestone Aug 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants